Harry James
07/07/2023, 12:56 PMfrom dagster import StaticPartitionsDefinition, asset, AssetIn, AllPartitionMapping, \
materialize, DagsterInstance
partitions = StaticPartitionsDefinition(["1", "2"])
@asset(partitions_def=partitions)
def asset_a():
return
@asset(ins={"asset_a": AssetIn("asset_a", partition_mapping=AllPartitionMapping())})
def asset_b(asset_a):
return
if __name__ == "__main__":
instance = DagsterInstance.get()
for partition_key in partitions.get_partition_keys():
materialize(
assets=[asset_a, asset_b],
selection=[asset_a],
partition_key=partition_key,
instance=instance,
)
materialize(assets=[asset_a, asset_b],
selection=[asset_b],
instance=instance)
Currently for this mock pipeline I have to call materialize 3 times. My real world pipeline is much more complicated and would require a very messy set of calls to materialize to work properly.
Using dagit it is very straightforward however. I click materialize all and it knows to materialize both partitions for asset_a before materializing asset_b. See screenshots.
In an ideal world I expected to be able to pass a list of partition keys similar to dagit and dagster work out the rest. Not being able to kick this off as described from python is a real blocker on my workflow so any help would be greatly appreciated!
materialize(assets = all_asssets, partition_key = ["1", "2"])
I think it relates to this issue https://github.com/dagster-io/dagster/issues/14905