Chris Roth
03/15/2023, 9:01 PMassets.py
from sys import getsizeof
from dagster import Output, asset
@asset()
def eat_it(context) -> Output[int]:
<http://context.log.info|context.log.info>(f"Eating {context.partition_key}")
return Output(
value=len(context.partition_key),
metadata={
"serial": context.partition_key,
"size": getsizeof(len(context.partition_key)),
},
)
and
__init__.py
from dagster import (
AssetSelection,
Definitions,
StaticPartitionsDefinition,
define_asset_job,
load_assets_from_modules,
)
from . import assets
fruit_partitions_def = StaticPartitionsDefinition(
["apple", "orange", "pineapple", "banana", "grapes"]
)
vegetable_partitions_def = StaticPartitionsDefinition(
["carrot", "celery", "potato",]
)
fruit_job = define_asset_job(
"fruit",
AssetSelection.all(),
partitions_def=fruit_partitions_def,
)
vegetable_job = define_asset_job(
"vegetable",
AssetSelection.all(),
partitions_def=vegetable_partitions_def,
)
defs = Definitions(
assets=load_assets_from_modules(
[assets],
group_name="fruits_and_vegetables",
),
jobs=[fruit_job, vegetable_job],
resources={},
)
I expected dagit would show me partitions I could materialize, but there aren't any:Edson Henrique
03/15/2023, 9:51 PMpartitions_def
Edson Henrique
03/15/2023, 9:56 PMfrom dagster import DynamicPartitionsDefinition
foodsPartition = DynamicPartitionsDefinition(name="foods")
@asset(partitions_def = foodsPartition)
def eat_it(context) -> Output[int]:
....
all the rest remains the sameChris Roth
03/16/2023, 12:13 AMdagster._check.CheckError: Invariant failed. Description: Assets defined for node 'eat_it' have a partitions_def of Dynamic partitions definition foods, but job 'fruit' has non-matching partitions_def of 'apple', 'orange', 'pineapple', 'banana', 'grapes'.
when importing into dagit.
Thank you for the help @Edson HenriqueChris Roth
03/16/2023, 12:21 AMfruit_partitions_def = DynamicPartitionsDefinition(
name="foods"
)
vegetable_partitions_def = DynamicPartitionsDefinition(
name="foods"
)
it loads now, but both partition groups are empty. After making the partition defs is there another way to initialize them?yuhan
03/16/2023, 3:29 AMChris Roth
03/16/2023, 3:32 AMyuhan
03/16/2023, 4:10 AMfrom dagster import asset, Output, MultiPartitionsDefinition, StaticPartitionsDefinition
multi_p = MultiPartitionsDefinition(
{
"fruit": StaticPartitionsDefinition(["carrot", "celery", "potato"]),
"vegetable": StaticPartitionsDefinition(
["apple", "orange", "pineapple", "banana", "grapes"]
),
}
)
@asset(partitions_def=multi_p)
def eat_it(context) -> Output[int]:
<http://context.log.info|context.log.info>(f"Eating {context.partition_key}")
return Output(
value=len(context.partition_key),
metadata={
"serial": context.partition_key,
"size": getsizeof(len(context.partition_key)),
},
)
fruit_job = define_asset_job(
"fruit_and_vegetable",
AssetSelection.all(),
partitions_def=multi_p,
)
defs = Definitions(
assets=[eat_it],
jobs=[fruit_job],
resources={},
)
yuhan
03/16/2023, 4:13 AMMultiPartitionsDefinition
might help in your case: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#multi-dimensionally-partitioned-assets
the limitation is for asset jobs, the partitions need to be consistent with the partitions defined on assets. so if you’d like to vary partition_defs in jobs for the same assets, currently dagster will error.
so i’d recommend trying to model it as Multi Partitions. but because it’s multi-dimensional, it does need a tuple of (fruit, vegetable), so you may want to explicitly set it to be something like:
multi_p = MultiPartitionsDefinition(
{
"fruit": StaticPartitionsDefinition(["carrot", "celery", "potato", "no fruit"]),
"vegetable": StaticPartitionsDefinition(
["apple", "orange", "pineapple", "banana", "grapes", "no vegetable"]
),
}
)