Vinnie
05/11/2023, 12:23 PMAutoMaterializePolicies
are working as intended for multi-partitioned assets. Having a multi-partitioned asset that includes a static and a dynamic partitioned set, creating a new dynamic partition and kicking off a run for all static partitions under the latest dynamic partition will only trigger the materialization on downstream assets (which are set to eager) if all partitions of the upstream asset have been materialized. In case one or more of the dynamic partitions haven’t been materialized (and aren’t queued, so dagster can’t know they will be materialized soon), the downstream run won’t be launched.Vinnie
05/11/2023, 12:24 PMmy_dynamic = DynamicPartitionsDefinition(name="my_test")
@asset
def my_asset(context):
context.instance.add_dynamic_partitions(
my_dynamic.name,
["test_one", "test_two"],
)
@asset(
partitions_def=MultiPartitionsDefinition(
{"static": StaticPartitionsDefinition(["a", "b"]), "dynamic": my_dynamic}
)
)
def my_partitioned_asset(context):
return context.partition_key
@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
def my_downstream_asset(context, some_thing_idk):
return some_thing_idk
johann
05/11/2023, 1:33 PMjohann
05/11/2023, 1:40 PMmy_downstream_asset
isn’t launching until all upstream partitions are filled is that my_downstream_asset
is unpartitioned. The input it gets from my_partitioned_asset
is {'test_one/b': 'test_one|b', 'test_one/a': 'test_one|a', 'test_two/a': 'test_two|a', 'test_two/b': 'test_two|b'}
. If my_downstream_asset
were also partitioned, it would run upon completion of each upstream partitionVinnie
05/11/2023, 2:37 PMVinnie
05/11/2023, 2:39 PMjohann
05/11/2023, 2:39 PMVinnie
05/11/2023, 2:40 PMjohann
05/11/2023, 2:40 PMjohann
05/11/2023, 2:41 PMowen
05/11/2023, 2:51 PMVinnie
05/11/2023, 3:37 PMAndras Somi
05/13/2023, 12:16 PMAndras Somi
05/13/2023, 12:19 PMowen
05/15/2023, 4:13 PMA -> B -> C
, then A
and B
update, then A
is updated again, the system won't try to update C
until B
has pulled in that new change from A
.
In the case of unpartitioned -> partitioned dependencies, we assume that all downstream partitions are impacted by the upstream unpartitioned change, meaning C
would consider all the historical partitions of A
to be in an inconsistent state.
This is something we're looking into adjusting (as this is not desirable behavior in this case!), but would be interested in knowing more about your setup and how you expect these things to connect.
If public_comps_daily
does not have any unpartitioned upstreams, mind sharing a bit more of your asset graph?Andras Somi
05/15/2023, 5:31 PMowen
05/15/2023, 10:05 PMowen
05/15/2023, 10:06 PMAndras Somi
05/16/2023, 8:13 AMIs it based on the timestamp of the upstream unpartitioned data? I.e something like “I will only materialize the 2023-05-01 partition once the upstream unpartitioned asset is materialized”?Yes, this is the case. Here the upstream is essentially a collection of ID’s that might incrementally grow with a daily run. Each partition in the downstream contains daily data related to those upstream IDs that were present at that point in time in the collection. I’m not interested in backfilling earlier partitions to contain newly added IDs, I’m happy to keep them as they were. Nevertheless I’ll rethink the structure. Thanks a lot for your inputs!
owen
05/16/2023, 5:05 PMowen
05/16/2023, 5:07 PMsandy
05/16/2023, 6:38 PMAndras Somi
05/16/2023, 7:55 PMLatestPartitionMapping()
logic is valid here, but we picked non_argument_deps
because the operations are done in SQL and there is no need to move the any of the upstream data into memory.owen
05/16/2023, 10:05 PM@asset(non_argument_deps={"foo"})
def my_asset():
...
to:
@asset(
ins={"foo": AssetIn(dagster_type=Nothing, partition_mapping=LastPartitionMapping())}
)
def my_asset():
...
non_argument_deps
is essentially just shorthand for the latter, with this AssetIn
formulation having the added benefit of allowing you to set additional properties (such as partition_mapping)Andras Somi
05/17/2023, 5:24 AMAndras Somi
05/17/2023, 8:36 AMowen
05/17/2023, 4:30 PM