https://dagster.io/ logo
#ask-community
Title
# ask-community
y

Yevhen Samoilenko

08/15/2022, 11:42 AM
Hi. I have a problem with upgrading to the 1.0.3 version. The basic setup looks like this:
Copy code
DAILY_PARTITIONS_DEF = DailyPartitionsDefinition(start_date=START_DATE)

def _assign_to_group(asset: AssetsDefinition, env: str, group_name: str = "default"):
    return asset.with_prefix_or_group(
        group_names_by_key={
            asset_key: f"{env}_{group_name}" for asset_key in asset.keys
        }
    )
    
def check_if_daily(x: AssetsDefinition):
    return x.partitions_def == DAILY_PARTITIONS_DEF
    
dg_daily_assets_by_env = {
    env: [
        _assign_to_group(asset, env, "daily")
        for asset in dg_assets_by_env[env]
        if check_if_daily(asset)
    ]
    for env in ENVS
}

dg_daily_jobs_by_env = {
    env: define_asset_job(
        name=f"dg_daily_{env}_job",
        selection=AssetSelection.groups(f"{env}_daily"),
        partitions_def=DAILY_PARTITIONS_DEF,
    )
    for env in dg_daily_assets_by_env.keys()
}
Basically, I assign group "env_daily" to all assets with daily partitions_def. And create a daily partitions job for assets with this group. But dagster fails with error:
Copy code
raise CheckError(f"Invariant failed. Description: {desc}")
dagster._check.CheckError: Invariant failed. Description: Assets defined for node 'qa__pq_blah_op' have a partitions_def of Hourly, starting 2021-11-01-00:00 UTC., but job 'dg_daily_qa_job' has non-matching partitions_def of Daily, starting 2021-11-01 UTC..
So selection somehow selects hourly assets for the daily job. Maybe I'm doing something wrong?
🤖 1
also i noticed that in this case:
Copy code
daily_job_1 = AssetGroup(assets=assets, resource_defs=_resource_defs).build_job(
    name="daily_job",
)

daily_job_2 = define_asset_job(
    name="daily_job",
    selection=AssetSelection.assets(*assets),
    partitions_def=DAILY_PARTITIONS_DEF,
)
daily_job_1 was working perfectly, but with daily_job_2 setup i get this error: "dagster._core.errors.DagsterInvalidDefinitionError: When an assets job contains multiple partitions assets, they must have the same partitions definitions"
so it turns out that AssetSelection.assets(*assets) != AssetGroup(assets=assets), but as I understand both of them should select the same assets
s

sandy

08/15/2022, 8:39 PM
that's unexpected. would you be able to share how you're constructing your repository? or ideally a code example I could try out myself to reproduce the issue?
3 Views