https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Chris Roth

03/15/2023, 9:01 PM
I'm just starting to try out partitioning and I'm running into trouble with making static partitions that can be interacted with in dagit. Here's a simple example that reproduces the problem:
Copy code
assets.py
from sys import getsizeof

from dagster import Output, asset


@asset()
def eat_it(context) -> Output[int]:
    <http://context.log.info|context.log.info>(f"Eating {context.partition_key}")
    return Output(
        value=len(context.partition_key),
        metadata={
            "serial": context.partition_key,
            "size": getsizeof(len(context.partition_key)),
        },
    )
and
Copy code
__init__.py
from dagster import (
    AssetSelection,
    Definitions,
    StaticPartitionsDefinition,
    define_asset_job,
    load_assets_from_modules,
)

from . import assets

fruit_partitions_def = StaticPartitionsDefinition(
    ["apple", "orange", "pineapple", "banana", "grapes"]
)

vegetable_partitions_def = StaticPartitionsDefinition(
    ["carrot", "celery", "potato",]
)

fruit_job = define_asset_job(
    "fruit",
    AssetSelection.all(),
    partitions_def=fruit_partitions_def,
)

vegetable_job = define_asset_job(
    "vegetable",
    AssetSelection.all(),
    partitions_def=vegetable_partitions_def,
)


defs = Definitions(
    assets=load_assets_from_modules(
        [assets],
        group_name="fruits_and_vegetables",
    ),
    jobs=[fruit_job, vegetable_job],
    resources={},
)
I expected dagit would show me partitions I could materialize, but there aren't any:
e

Edson Henrique

03/15/2023, 9:51 PM
you need to make your asset partitioned, on @asset decorator, add the parameter
partitions_def
looking to your script, you need a dynamic partition definition, because sometimes it can eat fruits, but sometimes can eat vegetables. so you can
Copy code
from dagster import DynamicPartitionsDefinition

foodsPartition = DynamicPartitionsDefinition(name="foods")

@asset(partitions_def = foodsPartition)
def eat_it(context) -> Output[int]:
....
all the rest remains the same
c

Chris Roth

03/16/2023, 12:13 AM
Everything except the jobs work, including materializing an asset. For the jobs it states
dagster._check.CheckError: Invariant failed. Description: Assets defined for node 'eat_it' have a partitions_def of Dynamic partitions definition foods, but job 'fruit' has non-matching partitions_def of 'apple', 'orange', 'pineapple', 'banana', 'grapes'.
when importing into dagit. Thank you for the help @Edson Henrique
If I change the partitions defs like this
Copy code
fruit_partitions_def = DynamicPartitionsDefinition(
    name="foods"
)

vegetable_partitions_def = DynamicPartitionsDefinition(
    name="foods"
)
it loads now, but both partition groups are empty. After making the partition defs is there another way to initialize them?
y

yuhan

03/16/2023, 3:29 AM
for the error in job, i think now you can omit the partition_defs on the job definition
c

Chris Roth

03/16/2023, 3:32 AM
@yuhan, I'd like a way to populate the partition list, not remove it. Does that make sense?
y

yuhan

03/16/2023, 4:10 AM
Copy code
from dagster import asset, Output, MultiPartitionsDefinition, StaticPartitionsDefinition

multi_p = MultiPartitionsDefinition(
    {
        "fruit": StaticPartitionsDefinition(["carrot", "celery", "potato"]),
        "vegetable": StaticPartitionsDefinition(
            ["apple", "orange", "pineapple", "banana", "grapes"]
        ),
    }
)


@asset(partitions_def=multi_p)
def eat_it(context) -> Output[int]:
    <http://context.log.info|context.log.info>(f"Eating {context.partition_key}")
    return Output(
        value=len(context.partition_key),
        metadata={
            "serial": context.partition_key,
            "size": getsizeof(len(context.partition_key)),
        },
    )


fruit_job = define_asset_job(
    "fruit_and_vegetable",
    AssetSelection.all(),
    partitions_def=multi_p,
)


defs = Definitions(
    assets=[eat_it],
    jobs=[fruit_job],
    resources={},
)
MultiPartitionsDefinition
might help in your case: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitions#multi-dimensionally-partitioned-assets the limitation is for asset jobs, the partitions need to be consistent with the partitions defined on assets. so if you’d like to vary partition_defs in jobs for the same assets, currently dagster will error. so i’d recommend trying to model it as Multi Partitions. but because it’s multi-dimensional, it does need a tuple of (fruit, vegetable), so you may want to explicitly set it to be something like:
Copy code
multi_p = MultiPartitionsDefinition(
    {
        "fruit": StaticPartitionsDefinition(["carrot", "celery", "potato", "no fruit"]),
        "vegetable": StaticPartitionsDefinition(
            ["apple", "orange", "pineapple", "banana", "grapes", "no vegetable"]
        ),
    }
)