Hello! I currently have a Dagster pipeline that sc...
# ask-community
l
Hello! I currently have a Dagster pipeline that schedules a DBT run every 5 minutes. I want to improve my DBT run by introducing Dagster partitions. Basically, during each scheduled run, I want to extract a list of
customer_ids
(through a query, or through S3), and use this result to partition my DBT run (
dbt run --vars "{"customer_id": $customer_id}"
). I managed to get it working through a manual run, with the partitions and assets defined as:
Copy code
def _get_all_customer_ids(database: Database):
    query = """
    SELECT DISTINCT _customer_id AS customer_id
    FROM source
    """
    return [str(row.customer_id) for row in database.select(query)]


organizations_partitions_def = StaticPartitionsDefinition(
    _get_all_customer_ids(database=clickhouse_database)
)

@dbt_assets(manifest=dbt_manifest_path, partitions_def=organizations_partitions_def)
def clickhouse_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    customer_id = context.partition_key
    dbt_vars = json.dumps({"customer_id": customer_id})
    args = ["run", "--vars", dbt_vars]
    yield from dbt.cli(args, context=context).stream()
But when I have the following scheduler activated:
Copy code
dbt_assets_schedule = build_schedule_from_dbt_selection(
    [clickhouse_dbt_assets],
    job_name="name",
    cron_schedule="*/5 * * * *",
)
I get the following error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
How can I define a partitioned scheduled run?