Slackbot
03/13/2023, 10:00 AMTim Castillo
03/13/2023, 1:25 PMdagster-airbyte check
and dagster-airbyte
apply?Arman S
03/13/2023, 1:47 PMUserWarning: Error loading repository location with_dagster.py:ValueError: Airbyte connections are not in sync with provided configuration, diff:
I run it like this dagit -f myfile.py
Arman S
03/13/2023, 1:48 PMTim Castillo
03/13/2023, 1:50 PMdagster-airbyte check
and dagster-airbyte apply
? The error message suggests that Dagster hasn't made the Airbyte connectors yet. ie.
https://docs.dagster.io/guides/dagster/airbyte-ingestion-as-code#step-4-validate-changes
https://docs.dagster.io/guides/dagster/airbyte-ingestion-as-code#step-5-apply-changesArman S
03/13/2023, 2:28 PMArman S
03/14/2023, 9:41 AMdagster project scaffold --name airbyte_project
2. Added below code to assets.py file within airbyte_project
3. Run the check you mentioned above dagster-airbyte check --module airbyte_project.assets:reconciler
The above check failed with the following error message:
AttributeError: module 'airbyte_project.assets' has no attribute 'reconciler'
Also, if I run dagster dev at the root directory my code fails with this error:
Error loading repository location airbyte_project:ValueError: Airbyte connections are not in sync with provided configuration, diff:
+ cereals-csv:
I am using the example you provided , just not sure how to run it
from dagster import (
ScheduleDefinition,
define_asset_job,
AssetSelection,
repository,
build_asset_reconciliation_sensor,
)
from dagster_airbyte import (
AirbyteManagedElementReconciler,
load_assets_from_connections,
)
from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode
from dagster_airbyte import airbyte_resource
airbyte_instance = airbyte_resource.configured({"host": "localhost", "port": "8000"})
cereals_csv_source = FileSource(
name="cereals-csv",
url="<https://docs.dagster.io/assets/cereal.csv>",
format="csv",
provider=FileSource.HTTPSPublicWeb(),
dataset_name="cereals",
)
local_json_destination = LocalJsonDestination(
name="local-json", destination_path="/local/cereals_out.json"
)
cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[cereals_connection],
)
# airbyte_reconciler.apply()
# load airbyte connection from above pythonic definitions
airbyte_assets = load_assets_from_connections(
airbyte=airbyte_instance,
connections=[cereals_connection],
key_prefix=["cereals"],
)
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all()
)
my_job = define_asset_job(
"my_job", AssetSelection.groups("download-cereals").downstream()
)
my_job_schedule = ScheduleDefinition(
name="my_job_schedule", job=my_job, cron_schedule="*/30 * * * *"
)
@repository()
def assets_modern_data_stack():
return [airbyte_assets, my_job, my_job_schedule]
Tim Castillo
03/14/2023, 1:54 PMArman S
03/14/2023, 2:33 PMdagster dev
I get the following error
2023-03-14 14:30:56 +0000 - dagster.daemon.SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
UserWarning: Error loading repository location airbyte_project:dagster._core.errors.DagsterInvariantViolationError: No repositories, jobs, pipelines, graphs, asset groups, or asset definitions found in "airbyte_project".
Tho, I have added @repository
in assetsArman S
03/14/2023, 2:49 PM