# ask-community

Louis Deschuttere

02/16/2024, 8:48 PM
Hello! I currently have a Dagster pipeline that schedules a DBT run every 5 minutes. I want to improve my DBT run by introducing Dagster partitions. Basically, during each scheduled run, I want to extract a list of
(through a query, or through S3), and use this result to partition my DBT run (
dbt run --vars "{"customer_id": $customer_id}"
). I managed to get it working through a manual run, with the partitions and assets defined as:
Copy code
def _get_all_customer_ids(database: Database):
    query = """
    SELECT DISTINCT _customer_id AS customer_id
    FROM source
    return [str(row.customer_id) for row in]

organizations_partitions_def = StaticPartitionsDefinition(

@dbt_assets(manifest=dbt_manifest_path, partitions_def=organizations_partitions_def)
def clickhouse_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    customer_id = context.partition_key
    dbt_vars = json.dumps({"customer_id": customer_id})
    args = ["run", "--vars", dbt_vars]
    yield from dbt.cli(args, context=context).stream()
But when I have the following scheduler activated:
Copy code
dbt_assets_schedule = build_schedule_from_dbt_selection(
    cron_schedule="*/5 * * * *",
I get the following error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Cannot access partition_key for a non-partitioned run
How can I define a partitioned scheduled run?