https://dagster.io/ logo
#ask-community
Title
# ask-community
j

James Robinson

05/17/2023, 3:54 PM
Hi all, I am developing an asset that is essentially the concatenation of many downstream assets (e.g. a 'USA' asset being the union of several state-level assets). I am keen to update the USA asset on the materialization of any of its downstream, state-level assets. Therefore, I was thinking of partitioning the USA asset on a dynamic state field so that when the MA asset is updated, the USA MA partition was updated. I feel like there should be a simple way of doing this, but I can't seem to find it. Potentially, with a
multi_asset_sensor
, or a dynamic
AssetIn
being declared in an
op
, perhaps? Is anyone able to help? Many thanks!
j

jamie

05/17/2023, 4:36 PM
I think you could do this with a dynamic partition and a partition mapping
Copy code
from dagster import AllPartitionMapping, AssetIn,DynamicPartitionsDefinition, Definitions, asset

dynamic_states = DynamicPartitionsDefinition(name="dynamic_states")

@asset(
        partitions_def=dynamic_states
)
def states(context):
    return context.asset_partition_key_for_output()


@asset(
    ins={
        "states": AssetIn(
            partition_mapping=AllPartitionMapping(),
        )
    },
)
def country(context, states):
    <http://context.log.info|context.log.info>(states)

defs = Definitions(
    assets=[states, country]
)
this with an auto-materialization policy should update the
country
asset whenever a new
states
partition is materialized
j

James Robinson

05/17/2023, 4:44 PM
thanks for responding @jamie - I think Auto-materializing assets and dynamic partitions sounds useful! How would this work if I have several bespoke state assets:
Copy code
import pandas as pd

@asset
def state_ca():
    # bespoke code required
    return pd.DataFrame([{"state": "CA", "value": 1}])

@asset
def state_ma():
    return pd.DataFrame([{"state": "MA", "value": 1}])
and the
country
asset concats them all together
j

jamie

05/17/2023, 4:53 PM
in that case you would have to have those assets as inputs to the downstream asset
Copy code
@asset
def state_ca():
    # bespoke code required
    return pd.DataFrame([{"state": "CA", "value": 1}])

@asset
def state_ma():
    return pd.DataFrame([{"state": "MA", "value": 1}])

@asset 
def country(state_ca: pd.DataFrame, state_ma: pd.DataFrame):
   ...
j

James Robinson

05/17/2023, 5:02 PM
Ah okay, so if i had 50 separate state assets, they would all have to be declared as inputs?
j

jamie

05/17/2023, 6:14 PM
yeah if each state asset needs a unique function then you’d need to define them all and list them all as inputs
👍 1
j

James Robinson

05/18/2023, 8:11 AM
Good to know. Thanks again @jamie!
Hi @jamie, I have been looking into this as it a big requirement of ours and I have come up with something using the dynamic partitions that works just as needed without having to load each of the dependent partitions when they are not required. I just wanted to check if there is anything alarming about it. Essentially, I have written a multi-asset sensor based on the asset sensor docs and the partition docs that checks if any asset within my custom definition (all state assets only) has materialized. A
RunRequest
will then trigger the national asset to run with the appropriate partition. When that asset is triggered with the given state partition, it loads the appropriate state asset from the custom definition, and stores it in the appropriate partition of the national table.
Copy code
import dagster as dag
states_partitions_def = dag.DynamicPartitionsDefinition(name="states")

@dag.asset
def state_ca():
    ...

@dag.asset
def state_ma():
    ...

# Definition instead of RepositoryDefinition?
state_assets = dag.Definitions(assets=[state_ma,state_ca])
monitored_assets = list(
    state_assets
    .get_repository_def()
    .assets_defs_by_key
    .keys()
)


@dag.asset(partitions_def=states_partitions_def)
def states(context):
    state = context.asset_partition_key_for_output()
    state_asset = state_assets.load_asset_value([f"state_{state}"])
    return state_asset

states_job = dag.define_asset_job(
    name="states_job", 
    selection=dag.AssetSelection.keys(["states"]), 
    partitions_def=states_partitions_def
)


@dag.multi_asset_sensor(
    job=states_job, 
    monitored_assets=monitored_assets
)
def state_sensor(context: dag.MultiAssetSensorEvaluationContext):
    states = []
    asset_events = context.latest_materialization_records_by_key()
    for asset_key, materialisation in asset_events.items():
        if materialisation is not None:
            state = asset_key.path[-1].split("_")[-1]
            states.append(state)
            context.advance_cursor({asset_key: materialisation})

    context.instance.add_dynamic_partitions(partitions_def_name="states", partition_keys=states)
    return [dag.RunRequest(partition_key=state) for state in states]
This then meant that when
state_ca
is materialized, the sensor triggers the
states_job
job with the partition
'ca'
, which then triggers the
states
asset with the partition value of
'ca'
. This loads the
state_ca
partition into the
'ca'
partition of the
states
table as expected. It also extends nicely if we wanted to partition the
state_ca
and
state_ma
tables by date, and the
states
table by date and state.
j

jamie

05/22/2023, 1:37 PM
this looks good to me! Very cool solution, i haven’t seen anything like this before, but i don’t see any reason why it wouldn’t work. All of the dagster concepts are used as expected, so i think this should be fine re - Definitions instead of RepositoryDefinitions: Definitions is the right API to use here, RepositoryDefintions is a old concept we are replacing with Defintitions. RepositoryDefinition is still supported, just not recommended
🌈 1
❤️ 1
j

James Robinson

05/22/2023, 1:41 PM
amazing, thanks very much for the feedback throughout. Really appreciate it 🙂
4 Views