I have 3 airbyte syncs. Each contains the same dat...
# ask-ai
b
I have 3 airbyte syncs. Each contains the same data. I want to process each of those with the same downstream asset code, but using one partition per airbyte sync. How do I do this
s
To achieve this, you can use partitioned assets in Dagster. Partitioned assets allow you to represent a collection of objects that are all computed the same way, but with different data, such as in your case where you have the same data structure coming from three different Airbyte syncs. Here's a step-by-step guide based on the documentation: 1. Install the necessary packages: Make sure you have
dagster
and
dagster-airbyte
installed in your environment.
Copy code
shell
   pip install dagster dagster-airbyte
2. Define your Airbyte syncs: Use the
build_airbyte_assets
function to define assets for each of your Airbyte syncs. You will need the Airbyte Connection ID for each sync.
Copy code
python
   from dagster_airbyte import build_airbyte_assets
   
   airbyte_assets_sync1 = build_airbyte_assets(
       connection_id="connection_id_for_sync1",
       destination_tables=["table1", "table2"]
   )
   
   airbyte_assets_sync2 = build_airbyte_assets(
       connection_id="connection_id_for_sync2",
       destination_tables=["table1", "table2"]
   )
   
   airbyte_assets_sync3 = build_airbyte_assets(
       connection_id="connection_id_for_sync3",
       destination_tables=["table1", "table2"]
   )
3. Define partitioned assets: Create a partitioned asset for each set of tables that are produced by the Airbyte syncs. You can define a
PartitionsDefinition
that represents the different Airbyte syncs as partitions.
Copy code
python
   from dagster import PartitionedConfig, asset, PartitionsDefinition
   
   def get_partition_key_for_sync(sync_id):
       # Your logic to determine the partition key based on the sync_id
       return partition_key
   
   partitions_def = PartitionsDefinition(
       name="airbyte_sync_partitions",
       partition_keys=[get_partition_key_for_sync(sync_id) for sync_id in ["sync1", "sync2", "sync3"]],
   )
   
   @asset(partitions_def=partitions_def)
   def downstream_asset(context, upstream):
       sync_id = context.partition_key
       # Your logic to process the data from the Airbyte sync based on the partition key
       pass
4. Connect the assets: Define the dependencies between your Airbyte assets and the downstream asset. You will need to use the partition keys to connect the correct Airbyte sync with the corresponding partition of the downstream asset.
Copy code
python
   # Example of connecting assets with partition keys
   downstream_asset.set_upstream(airbyte_assets_sync1, partition_key="sync1_partition_key")
   downstream_asset.set_upstream(airbyte_assets_sync2, partition_key="sync2_partition_key")
   downstream_asset.set_upstream(airbyte_assets_sync3, partition_key="sync3_partition_key")
5. Materialize the assets: When you run your Dagster job, you can specify which
partition you want to materialize. This will ensure that the downstream asset processes the data from the correct Airbyte sync. Remember to replace placeholders like
"connection_id_for_sync1"
with your actual Airbyte Connection IDs, and implement the logic for
get_partition_key_for_sync
and the processing logic inside
downstream_asset
according to your specific use case. Please note that the code examples provided are based on the documentation and may need to be adapted to fit your exact scenario. If you encounter any issues or have further questions, feel free to ask for additional guidance.
b
Is there a way to define an assets inputs based on the partition?