I’m not sure if `AutoMaterializePolicies` are work...
# dagster-feedback
v
I’m not sure if
AutoMaterializePolicies
are working as intended for multi-partitioned assets. Having a multi-partitioned asset that includes a static and a dynamic partitioned set, creating a new dynamic partition and kicking off a run for all static partitions under the latest dynamic partition will only trigger the materialization on downstream assets (which are set to eager) if all partitions of the upstream asset have been materialized. In case one or more of the dynamic partitions haven’t been materialized (and aren’t queued, so dagster can’t know they will be materialized soon), the downstream run won’t be launched.
Minimal implementation to reproduce
Copy code
my_dynamic = DynamicPartitionsDefinition(name="my_test")

@asset
def my_asset(context):
    context.instance.add_dynamic_partitions(
        my_dynamic.name,
        ["test_one", "test_two"],
    )


@asset(
    partitions_def=MultiPartitionsDefinition(
        {"static": StaticPartitionsDefinition(["a", "b"]), "dynamic": my_dynamic}
    )
)
def my_partitioned_asset(context):
    return context.partition_key


@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
def my_downstream_asset(context, some_thing_idk):
    return some_thing_idk
j
Taking a look
So the reason that
my_downstream_asset
isn’t launching until all upstream partitions are filled is that
my_downstream_asset
is unpartitioned. The input it gets from
my_partitioned_asset
is
{'test_one/b': 'test_one|b', 'test_one/a': 'test_one|a', 'test_two/a': 'test_two|a', 'test_two/b': 'test_two|b'}
. If
my_downstream_asset
were also partitioned, it would run upon completion of each upstream partition
v
Is it expected behavior that unpartitioned downstream assets only launch when all upstream partitions finish materializing? I only caught this because of a bug on a script that created a dynamic partition without launching a run for it, and now all subsequently created dynamic partitions don’t trigger automatic materializations.
I’ve since deleted the partition that was created without launching a run, but was under the impression that any new materializations would have caused downstream assets to be updated
j
I believe the unpartitioned asset would fail if you materialized it with missing upstream partitions, you could test that out though
v
Hm, maybe it could in some cases, in my case it’s just a dbt asset so it’s a non-argument dep.
j
We also have a UI in progress for auto materializing that will surface reasons for why a run was or wasn’t kicked off, which should at least clarify
Ah non argument dep is interesting. @owen is more familiar here than I am, but I think we’d treat it the same in the auto materialize case
o
yep all the above is correct — we don’t distinguish between those cases, although I can see how it would be useful in this case. what are the upstream / downstream assets doing? in general, there are some assets which can function with missing inputs and others which don’t, and this isn’t strictly tied to non_argument_deps, although it is correlated (the UPathIOManager allows you to disambiguate this with some special metadata). right now there’s no first class way of indicating this to the framework so we go with the cautious approach, but i can imagine a world in which we could distinguish between these options (maybe via the partition mapping)
v
Makes sense, thanks for the info!
a
I’m piggybacking this thread for a probably related issue (kick me out if it’s not). I just can’t seem to find the reason why my asset C is not automaterialized here. • From the two parents A is partitioned (full green), B is not, • both were last automaterialized at the same time (13 May, 10:32), after their ancestors were refreshed from a scheduled job. • C depends on both A and B as a non-argument dependency and has eager automaterialize policy set up • All three assets are really lightweight, they usually run in seconds • As far as I can tell daemons, sensors, etc. were all running fine, and other parts of the asset graph were automaterialized correctly. • We are on 1.3.3.
Is it probably related to non-argument deps between partitioned and non-partitioned assets? Or the fact that it has both partitioned and non-partitioned parents? Where should I look for clues?
o
hi @Andras Somi! we're planning on shipping a UI in the next couple weeks that will help a ton with answering these sorts of questions, but I think in this case, I have suspicion that this has to do with the partitioned parent. does public_comps_daily have any upstream unpartitioned assets? right now, the way the logic works is that it will avoid materializing downstream assets if any of its parents are in an "inconsistent state". this would mean that either they are missing partitions that the downstream asset would consume, or that they have not incorporated the newest versions of their parents. For example, if you have
A -> B -> C
, then
A
and
B
update, then
A
is updated again, the system won't try to update
C
until
B
has pulled in that new change from
A
. In the case of unpartitioned -> partitioned dependencies, we assume that all downstream partitions are impacted by the upstream unpartitioned change, meaning
C
would consider all the historical partitions of
A
to be in an inconsistent state. This is something we're looking into adjusting (as this is not desirable behavior in this case!), but would be interested in knowing more about your setup and how you expect these things to connect. If
public_comps_daily
does not have any unpartitioned upstreams, mind sharing a bit more of your asset graph?
a
@owen this is interesting, thanks. After rereading your description a few times I think I get the issue. In fact public_comps_daily has only unpartitioned ancestors, it's kinda wedged in between unpartitioned assets in the graph (which looks weird regardless of this issue it causes, I have to find a better way to model this thing). I really look forward to the UI updates. My mental model obviously misses important details. On the other hand if I passed the partitioned parent via a "normal" argument dep (not non_argument_dep) to C with explicitly mapping the latest partition to it I guess this graph would work, ie. C would automaterialize, right? Then the historical partitions wouldn't be in play, regardless of their state. BTW I'm not sure that a change in an unpartitoned parent should always "invalidate" historical partitions. I think I have legitimate cases where this is not the desired behavior. Also I sometimes wonder whether "locking" certain (or all) historical partitions in some way (eg. by "age") would make sense. They would show up as existing materialized partitions but would be excluded from backfills or defining the state of the asset (solving this issue above).
o
Totally agree that there are legitimate cases where this is not desirable (and in fact, it's potentially more common that this is undesirable) The partition mapping approach would indeed work as a workaround, but I am curious if we can model this situation more directly. In an ideal world, how would you know when a given partition of your partitioned asset is "ready" to be materialized? Is it based on the timestamp of the upstream unpartitioned data? I.e something like "I will only materialize the 2023-05-01 partition once the upstream unpartitioned asset is materialized"? This would somewhat align with the upstream unpartitioned asset being calculated in an incremental / append-only manner, where old data is not deleted when the asset is rematerialized.
In my mind, the point of leverage here might be in modeling that connection correctly
a
Is it based on the timestamp of the upstream unpartitioned data? I.e something like “I will only materialize the 2023-05-01 partition once the upstream unpartitioned asset is materialized”?
Yes, this is the case. Here the upstream is essentially a collection of ID’s that might incrementally grow with a daily run. Each partition in the downstream contains daily data related to those upstream IDs that were present at that point in time in the collection. I’m not interested in backfilling earlier partitions to contain newly added IDs, I’m happy to keep them as they were. Nevertheless I’ll rethink the structure. Thanks a lot for your inputs!
o
@Andras Somi thanks for that info! as an example scenario, if the partitioned asset failed one day (for whatever reason), would it essentially be impossible to fill in that missing partition the next day, as the process only knows how to take a snapshot of the unpartitioned data at a certain point in time?
ah and as another question, does the downstream unpartitioned asset truly depend on all partitions of the upstream partitioned asset? or does it actually just use the latest day's value?
s
btw, here's a github issue that might be relevant @Andras Somi: https://github.com/dagster-io/dagster/issues/14305
a
@sandy Thanks! @owen #1 given that the upstream is a collection of IDs and we know when we added them it’s possible to recalculate historical partitions in the downstream taking the snapshot of the upstream as it were at that point in time (though I don’t think we actually do). #2 it uses the latest value, so the
LatestPartitionMapping()
logic is valid here, but we picked
non_argument_deps
because the operations are done in SQL and there is no need to move the any of the upstream data into memory.
o
Ah I see (thanks again for answering so many questions here!) -- I think in this case, you can get the best of both worlds by switching from:
Copy code
@asset(non_argument_deps={"foo"})
def my_asset():
    ...
to:
Copy code
@asset(
    ins={"foo": AssetIn(dagster_type=Nothing, partition_mapping=LastPartitionMapping())}
)
def my_asset():
    ...
non_argument_deps
is essentially just shorthand for the latter, with this
AssetIn
formulation having the added benefit of allowing you to set additional properties (such as partition_mapping)
a
Hm, that's great. This combination never occured to me though it really makes sense. I have to adjust my mental model again to not think of non-argument and argument deps as completely different things. Thanks!
7m2rv9.jpg
😂 5
o
no problem! blob salute