Yevhen Samoilenko
08/15/2022, 11:42 AMDAILY_PARTITIONS_DEF = DailyPartitionsDefinition(start_date=START_DATE)
def _assign_to_group(asset: AssetsDefinition, env: str, group_name: str = "default"):
return asset.with_prefix_or_group(
group_names_by_key={
asset_key: f"{env}_{group_name}" for asset_key in asset.keys
}
)
def check_if_daily(x: AssetsDefinition):
return x.partitions_def == DAILY_PARTITIONS_DEF
dg_daily_assets_by_env = {
env: [
_assign_to_group(asset, env, "daily")
for asset in dg_assets_by_env[env]
if check_if_daily(asset)
]
for env in ENVS
}
dg_daily_jobs_by_env = {
env: define_asset_job(
name=f"dg_daily_{env}_job",
selection=AssetSelection.groups(f"{env}_daily"),
partitions_def=DAILY_PARTITIONS_DEF,
)
for env in dg_daily_assets_by_env.keys()
}
Basically, I assign group "env_daily" to all assets with daily partitions_def. And create a daily partitions job for assets with this group. But dagster fails with error:
raise CheckError(f"Invariant failed. Description: {desc}")
dagster._check.CheckError: Invariant failed. Description: Assets defined for node 'qa__pq_blah_op' have a partitions_def of Hourly, starting 2021-11-01-00:00 UTC., but job 'dg_daily_qa_job' has non-matching partitions_def of Daily, starting 2021-11-01 UTC..
So selection somehow selects hourly assets for the daily job. Maybe I'm doing something wrong?Yevhen Samoilenko
08/15/2022, 2:39 PMdaily_job_1 = AssetGroup(assets=assets, resource_defs=_resource_defs).build_job(
name="daily_job",
)
daily_job_2 = define_asset_job(
name="daily_job",
selection=AssetSelection.assets(*assets),
partitions_def=DAILY_PARTITIONS_DEF,
)
daily_job_1 was working perfectly, but with daily_job_2 setup i get this error: "dagster._core.errors.DagsterInvalidDefinitionError: When an assets job contains multiple partitions assets, they must have the same partitions definitions"Yevhen Samoilenko
08/15/2022, 2:41 PMsandy
08/15/2022, 8:39 PM