Louis Deschuttere
02/16/2024, 8:48 PMcustomer_ids
(through a query, or through S3), and use this result to partition my DBT run (dbt run --vars "{"customer_id": $customer_id}"
).
I managed to get it working through a manual run, with the partitions and assets defined as:
def _get_all_customer_ids(database: Database):
query = """
SELECT DISTINCT _customer_id AS customer_id
FROM source
"""
return [str(row.customer_id) for row in database.select(query)]
organizations_partitions_def = StaticPartitionsDefinition(
_get_all_customer_ids(database=clickhouse_database)
)
@dbt_assets(manifest=dbt_manifest_path, partitions_def=organizations_partitions_def)
def clickhouse_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
customer_id = context.partition_key
dbt_vars = json.dumps({"customer_id": customer_id})
args = ["run", "--vars", dbt_vars]
yield from dbt.cli(args, context=context).stream()
But when I have the following scheduler activated:
dbt_assets_schedule = build_schedule_from_dbt_selection(
[clickhouse_dbt_assets],
job_name="name",
cron_schedule="*/5 * * * *",
)
I get the following error:
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
How can I define a partitioned scheduled run?