Let`s assume I have 5 assets: a1, a1_cleaned, a2, ...
# ask-community
g
Let`s assume I have 5 assets: a1, a1_cleaned, a2, a2_cleaned and combined. a1,a2 are both daily partitioned. And their cleaned pendants are without a partition. However, their state (SCD2) depends on the partitions being consumed. so how can I ensure that the final combine asset only trigger when all the cleaned intermediates are created for the current partition ?
@sandy https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/sftp_sensor_asset_real.py#L235 from https://github.com/dagster-io/dagster/discussions/7306 is my current baseline. - but that either triggers on ANY update of the intermediates or when the partitions are there for each of them. However, my problem is with the intermediate cleaned ones there is an intermediate step. How can this be modeled in the sensor?
When thinking about it again it is perhaps much simpler: As the asset-change events are not extracted from predecessors but by key - I will simple need to include the desired key in the firing condition.
Nonetheless, it would be interesting if also predecessors could be accessed dynamically.
s
🎉 1
g
Though it is a bit more tricky when thinking about it - or is it overly complex? assuming a1 for 2022-01-01 triggers a1_cleaned (non partitioned) how can I derive that the ASSET_MATERIALIZATION event of a1_cleaned belongs to or includes data up to some specific partition (2022-01-01 in this case).
Should somehow additional metadata for for the Materialization event be used?
s
I think that putting metadata on the AssetMaterializations with the latest partition that they include would be the right approach. Currently, this is a little complex, but doable. First step: in the sensor for
a1_cleaned
, put a tag on the run with the partition for `a1`:
Copy code
yield RunRequest(tags={"latest_partition": ...})
Then, in your IOManager, grab that tag and add it as metadata that will show up on the materialization for `a1_cleaned`:
Copy code
partition_tag_value = context.step_context._plan_data.pipeline_run.tags["latest_partition"]
context.add_output_metadata({"latest_partition": })
Then, in the sensor for
combined
, you can access the metadata on
a1_cleaned
with something like the following:
Copy code
materialization = record.event_log_entry.dagster_event.event_specific_data.materialization
    latest_partition = [me for me in materialization.metadata_entries if me.label == "latest_partition"][0].entry_data.text
g
Sounds great - I still have to try this. But it leaves one edge case open: In case there is a backfill of older partitions. I.e. a2 would already be there and I am backfilling a1 how can I tell dagster to reset that partition to wait such as both a1, and a2 need to fulfill this condition again (after the backfill)?
I think there is one problem in your approach:
tags={"latest_partition": ...}
is ill-defined as a1 is partitioned, a1_cleaned is not partitoned and the run_sensor which listens to changes has a hard time getting the partition on a1. So where is the latest materialization available for a1 (to listen/subscribe to?
But I do not want to listen again ... rather pass the one which was worked on down
rather pass it directly
dummy code:
Copy code
from dagster import asset, AssetGroup, repository, DailyPartitionsDefinition
my_partitions_def = DailyPartitionsDefinition(start_date="2022-04-10")
@asset(partitions_def=my_partitions_def)
def a1():
    return 1


@asset
def a1_cleaned(a1):
    return 11


@asset(partitions_def=my_partitions_def)
def a2():
    return 2


@asset
def a2_cleaned(a2):
    return 22

@asset
def a1_a2_combined(a1_cleaned, a2_cleaned):
    return a1_cleaned + a2_cleaned


group_dummy = AssetGroup([a1, a2, a1_cleaned, a2_cleaned, a1_a2_combined])


asset_job_dummy = group_dummy.build_job("dummy_assets")

@repository
def my_job():
    return [asset_job_dummy]
I wonder though:
group_dummy = AssetGroup([a1, a2, a1_cleaned, a2_cleaned, a1_a2_combined])
contains all the assets (and also in the launchpad the full run configuration would be triggered for a global refresh of all the assets in the group. Could I get around the sensors and jobs per asset in this case and thus no longer need this quite complicated handling/propagation of the partitioned metadata?
@claire
c
Hi geoHeil. To answer your question of how the sensor for a1_cleaned recognizes the partitions of a1 that were materialized, you could do something like this within the sensor:
Copy code
records = context.instance.get_event_records(
            EventRecordsFilter(
                event_type=DagsterEventType.ASSET_MATERIALIZATION,
                asset_key=AssetKey("upstream_daily_partitioned_asset_1"),
            )
        )
materialized_partitions = set([record.event_log_entry.dagster_event.step_materialization_data.materialization.partition for record in records])
g
So far I implemented what @sandy suggested here https://github.com/geoHeil/dagster-ssh-demo/blob/master/SSH_DEMO/sensors/transitive_asset_sensors.py#L60 i.e. round trip of the metadata. But this does not seem to be really intuitive. But I think your approach is not what works: as combined depnds on a1_cleaned (which has no partition but its state depends on some stateful aggregation for a specific time range/set of partitions) which itself depends on a1 (partitioned). And as far as I can see @claire your approach would not be able to consume this tag / additional metadata, only display the partitions for an existing (partitioned) asset.
c
Hi geoHeil, to answer your question why sensors/jobs are currently necessary, for now asset jobs can only be executed if all assets share the same partitions definition, but the functionality to execute assets with different partitions is something we hope to add soon.
Would you mind clarifying what problem you identified with Sandy's approach above? Is it finding the latest materialized partition in
a1
in the sensor for
a1_cleaned
?
g
No. Sandys approach is fine. But quite complex i.e. hard to maintain potentially and requires a lot of custom code.
Also, I wonder a) how to perhaps push down more of the filtering using the GRAPHQL API of Dagster. Currently for one asset all the partitions are iterated. This might not be ideal b) the problem with Sandys approach is for edge cases: When backfilling data (i.e. not the latest partition but older partitions). The metadata tags might no longer be in sync. Do you see any way to cope for this scenario?
c
I would still suggest calling
get_event_records
on the instance rather than using the graphQL API, because graphQL API calls generally are just another layer of code that make calls to the instance. I think that when backfilling and the metadata tags are out of order, you may just have to query for all materializations in
a1
and see from the event log entries which partitions have been materialized, and compare those partitions against the metadata for the
a1_cleaned
materializations.