Brent Shulman
02/14/2024, 10:46 PMScout
02/14/2024, 10:46 PMdagster
and dagster-airbyte
installed in your environment.
shell
pip install dagster dagster-airbyte
2. Define your Airbyte syncs: Use the build_airbyte_assets
function to define assets for each of your Airbyte syncs. You will need the Airbyte Connection ID for each sync.
python
from dagster_airbyte import build_airbyte_assets
airbyte_assets_sync1 = build_airbyte_assets(
connection_id="connection_id_for_sync1",
destination_tables=["table1", "table2"]
)
airbyte_assets_sync2 = build_airbyte_assets(
connection_id="connection_id_for_sync2",
destination_tables=["table1", "table2"]
)
airbyte_assets_sync3 = build_airbyte_assets(
connection_id="connection_id_for_sync3",
destination_tables=["table1", "table2"]
)
3. Define partitioned assets: Create a partitioned asset for each set of tables that are produced by the Airbyte syncs. You can define a PartitionsDefinition
that represents the different Airbyte syncs as partitions.
python
from dagster import PartitionedConfig, asset, PartitionsDefinition
def get_partition_key_for_sync(sync_id):
# Your logic to determine the partition key based on the sync_id
return partition_key
partitions_def = PartitionsDefinition(
name="airbyte_sync_partitions",
partition_keys=[get_partition_key_for_sync(sync_id) for sync_id in ["sync1", "sync2", "sync3"]],
)
@asset(partitions_def=partitions_def)
def downstream_asset(context, upstream):
sync_id = context.partition_key
# Your logic to process the data from the Airbyte sync based on the partition key
pass
4. Connect the assets: Define the dependencies between your Airbyte assets and the downstream asset. You will need to use the partition keys to connect the correct Airbyte sync with the corresponding partition of the downstream asset.
python
# Example of connecting assets with partition keys
downstream_asset.set_upstream(airbyte_assets_sync1, partition_key="sync1_partition_key")
downstream_asset.set_upstream(airbyte_assets_sync2, partition_key="sync2_partition_key")
downstream_asset.set_upstream(airbyte_assets_sync3, partition_key="sync3_partition_key")
5. Materialize the assets: When you run your Dagster job, you can specify whichScout
02/14/2024, 10:48 PM"connection_id_for_sync1"
with your actual Airbyte Connection IDs, and implement the logic for get_partition_key_for_sync
and the processing logic inside downstream_asset
according to your specific use case. Please note that the code examples provided are based on the documentation and may need to be adapted to fit your exact scenario. If you encounter any issues or have further questions, feel free to ask for additional guidance.Brent Shulman
02/14/2024, 10:51 PM