https://dagster.io/ logo
#ask-community
Title
# ask-community
s

Slackbot

03/13/2023, 10:00 AM
This message was deleted.
t

Tim Castillo

03/13/2023, 1:25 PM
Hi! Thanks for reaching out. Are you getting any error messages that we could help with? What other steps have you done, ie. running
dagster-airbyte check
and
dagster-airbyte
apply?
a

Arman S

03/13/2023, 1:47 PM
@Tim Castillo Thanks for your reply. I get the following error:
Copy code
UserWarning: Error loading repository location with_dagster.py:ValueError: Airbyte connections are not in sync with provided configuration, diff:
I run it like this
dagit -f myfile.py
i just want to understand how to define a source if it doesn't exist and a destination if it doesn't exist and run it
t

Tim Castillo

03/13/2023, 1:50 PM
Have you run
dagster-airbyte check
and
dagster-airbyte apply
? The error message suggests that Dagster hasn't made the Airbyte connectors yet. ie. https://docs.dagster.io/guides/dagster/airbyte-ingestion-as-code#step-4-validate-changes https://docs.dagster.io/guides/dagster/airbyte-ingestion-as-code#step-5-apply-changes
a

Arman S

03/13/2023, 2:28 PM
@Tim Castillo dagster-airbyte check requires a module I was hoping to run this as a stand alone script for now to just test before turning it into a project which I guess I need to create with scaffold
so I went and did the following: 1. Created a new project with
dagster project scaffold --name airbyte_project
2. Added below code to assets.py file within
airbyte_project
3. Run the check you mentioned above
dagster-airbyte check --module airbyte_project.assets:reconciler
The above check failed with the following error message:
AttributeError: module 'airbyte_project.assets' has no attribute 'reconciler'
Also, if I run dagster dev at the root directory my code fails with this error:
Error loading repository location airbyte_project:ValueError: Airbyte connections are not in sync with provided configuration, diff:
+ cereals-csv:
I am using the example you provided , just not sure how to run it
Copy code
from dagster import (
    ScheduleDefinition,
    define_asset_job,
    AssetSelection,
    repository,
    build_asset_reconciliation_sensor,
)
from dagster_airbyte import (
    AirbyteManagedElementReconciler,
    load_assets_from_connections,
)
from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode
from dagster_airbyte import airbyte_resource


airbyte_instance = airbyte_resource.configured({"host": "localhost", "port": "8000"})

cereals_csv_source = FileSource(
    name="cereals-csv",
    url="<https://docs.dagster.io/assets/cereal.csv>",
    format="csv",
    provider=FileSource.HTTPSPublicWeb(),
    dataset_name="cereals",
)

local_json_destination = LocalJsonDestination(
    name="local-json", destination_path="/local/cereals_out.json"
)

cereals_connection = AirbyteConnection(
    name="download-cereals",
    source=cereals_csv_source,
    destination=local_json_destination,
    stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)


airbyte_reconciler = AirbyteManagedElementReconciler(
    airbyte=airbyte_instance,
    connections=[cereals_connection],
)


# airbyte_reconciler.apply()


# load airbyte connection from above pythonic definitions
airbyte_assets = load_assets_from_connections(
    airbyte=airbyte_instance,
    connections=[cereals_connection],
    key_prefix=["cereals"],
)

update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

my_job = define_asset_job(
    "my_job", AssetSelection.groups("download-cereals").downstream()
)

my_job_schedule = ScheduleDefinition(
    name="my_job_schedule", job=my_job, cron_schedule="*/30 * * * *"
)


@repository()
def assets_modern_data_stack():
    return [airbyte_assets, my_job, my_job_schedule]
t

Tim Castillo

03/14/2023, 1:54 PM
In your code, the reconciler is called “airbyte_reconciler” and not just “reconciler” so you'll have to update your calls to check and apply appropriately
a

Arman S

03/14/2023, 2:33 PM
Thanks @Tim Castillo I updated that and applied the connector. However when I run this with
dagster dev
I get the following error
Copy code
2023-03-14 14:30:56 +0000 - dagster.daemon.SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
UserWarning: Error loading repository location airbyte_project:dagster._core.errors.DagsterInvariantViolationError: No repositories, jobs, pipelines, graphs, asset groups, or asset definitions found in "airbyte_project".
Tho, I have added
@repository
in assets
The source/destination and connection gets created fine in Airbyte. It's just cannot seem to get dagster to run this workflow
4 Views