I have a job which should conduct a backfill for p...
# dagster-plus
g
I have a job which should conduct a backfill for partitioned assets. The assets and the job have the daily partitions definition attached. However, only the normal-non backfill jobs shows the assets as green after materialization. The backfill job successfully materializes the assets. However, the partition does not show up as green - and furthermore the backfill is also not possible (no backfills for <<<job_name>>> is written on the page of this job
s
I'm not entirely following what you mean by "However, only the normal-non backfill jobs shows the assets as green after materialization". Mind expanding? Is there a screenshot you could share? What version are you on? cc @Ben Gotow - I know we had some bugs related to the Launch Backfill dialog recently
g
the latest 1.1.7
I cannot run any backfill - this is the TLDR
not sure what screenshot to share here - I see the normal partitions tab in dagit - but have no chance to launch the backfill. button - or select any partitions during click of the materialize button
for a job based off of assets
d
If this is in cloud, do you have a link to the partitions page in question?
g
sure - I even sent you this link on Friday or Saturday to share it in the dagster private slack ... 😉
d
ah sorry, yes you did
s
The Launch Backfill button is no longer there, but you should be able to replicate the same functionality by selecting partitions when you click the Materialize button on the tab that has the asset graph Daniel sent me a link to your job. I noticed that the assets inside it don't appear to be partitioned - is that expected? In general, partitioned asset jobs are only meant to be used with partitioned assets
g
Well it is statefully partitioned SCD2 assets. And these need a backfill
so I is not idempotent (i.e. delete 2022-01-01 and overwrite) rather delete the whole state and backfill from the raw data
Only the raw data assets are partitioned in the traditional sense
s
in Dagster, backfills by definition work over partitioned assets - the point of a Dagster backfill is to be able to launch separate work for each partition. do you want to have separate runs for different subcomponents of your SCD2 assets? or just a single run to backfill the entire asset? if the latter, I'd recommend adding boolean configuration options to those assets that you can toggle to recompute them from scratch, instead of using Dagster backfills
g
so you mean a boolean config plus a tqdm for loop ... hm this might perhaps be the better solution
Would you suggest to create a config value (which must be then passed to all the assets) or use a tag instead?
s
Our normal recommendation for this kind of thing would be a config value that’s passed to all the assets, but I could see how a tag would be more convenient if you don’t care about being able to validate the config before launching the run
g
One question still remains open for me: where to put the for loop. I would have loved to use the partitions/dagster itself to run the loop - potentially parallelized, neatly isolated like normal jobs and nicely logged as well. Are you saying with this backfill=true config passed should I have a manual for loop inside the asset which a) discovers available raw partitions b) deletes the existing state c) backfills (loops the loop) or could/should parts of this be in a sensor which is launching jobs against dagster? What are best practices for such a case (I guess SCD2 stateful tables are rather common)
s
I think I'm struggling a bit to understand how you can have executions for different raw partitions happen independently but they're not idempotent could you potentially model the SCD2 asset as a partitioned asset where each partition depends on the prior partition?
g
Is this a possibility? How could/would this work?
Indeed, they are certainly not independent. But it would be nice to use the normal backfill UI (with all its nice observability for the individual runs) compared to coding up a 2nd code path (including the for loop) inside each asset or perhaps the IO manager
s
partition self-dependencies are something that we just added support for. self-dependencies aren't yet respected in the backfill code, but I'm working on adding that right now. here's an example:
Copy code
@asset(
        partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"),
        ins={
            "a": AssetIn(
                partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
            )
        },
    )
    def a(a):
        ...
g
This is super interesting! I was not yet aware of this functionality. Do you already have a timeline when you plan to add this in for backfill?
s
my dream is to get it done this week, but realistically it will be more like early january
D 1
the asset reconciliation sensor already supports it. i.e. if partition N fails, it won't try to run partition N + 1 until partition N is filled
g
Really awesome! Please keep me updated on the progress of this feature.
I think this would be the perfect fit for this case
Given a partitions definition like:
Copy code
DailyPartitionsDefinition(start_date='2022-01-01', end_offset=1)
would the corresponding
TimeWindowPartitionMapping
for each asset:
Copy code
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
still look like this? In particular, I want to double check the
end_offset
.
s
yes - that should still work
g
_TimeWindowPartitionMapping.__new__() got an unexpected keyword argument 'start_offset'
something works differently - or you have the described functionality still only on your feature branch
s
it was included in the most recent release - are you possibly on an earlier version?
g
it was 1.1.6 - now with 1.1.7 this works. However, it is still unclear to me how to pass the TimeWindowPartitionMapping to the define_asset_job - I can only pass the partition definition as an argument.
s
You don’t need to pass it to the job - just on the AssetIn like in my example above
The self dependencies are respected by the asset reconciliation sensor when scheduling materializations. They don’t have an effect on scheduled jobs
g
I have a little bit different situation: you have asset a with self-reference to a. I have a ---> b where a is delivering full copies of the data daily and b is the scd2 table. b gets a daily update of a and runs the merge into operation given the fresh data and its state. What I would understand is you are saying here is that I need to explicitly get hold of the existing state as the self reference ... this seems quite complicated (but perhaps the only sane way)
If you want I can show and explain you my stateful scd2 IO manager I am using currently - it is implicitly deriving the reference to the current state. Perhaps this might be a nice way how to deal with the scd2 (once dagit allows to surface this self reference for backfills)
s
my understanding of your original goal was that you wanted to be able to launch a backfill that filled in your scd2 table in a set of sequential steps, and you had to fill in day N before day N + 1. is that still right? if so, I don't think you should need to modify your I/O manager. you can specify the self-partition-dep without actually needing to have your I/O manager load that data: you can use a Nothing type on the
AssetIn
that corresponds to the self-dep. in our next release (next Thursday), asset backfills will respect the ordering of self-partition-deps
g
Yes this would be ideal
But so far - I am using delta lake there are different code paths I.e write for the initial partition and then merge into - including manual deletions for the subsequent ones.
Still, I need to combine the state with the fresh partition. And am confused how your example is handling this already. I can share an extended example with you on a video call if you want
s
I don't think I'll be able to video call this week, but if you have a github gist I could take a look?
g
no worries - we could also do next. This one is also bad for me. Here you go: https://gist.github.com/geoHeil/12ce1e1403e474b44a84fd267323acb4 perhaps this alleviates the need for a call
I hope this (`
Copy code
def a_scd2(context, a: pyspark.sql.DataFrame)
`) makes my line of thought clearer to you - but perhaps you can tell me now how I am mis-thinking the handling of the self-references.
s
I just took a look at your example. I don't think you should need to change anything except to add a Nothing self-dependency. It would look something like this:
Copy code
from dagster import asset, DailyPartitionsDefinition, TimeWindowPartitionMapping, AssetIn, Nothing


@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def a():
    ...


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"),
    ins={
        "a_scd2": AssetIn(
            dagster_type=Nothing,
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
        )
    },
)
def a_scd2(a):
    ...
(there's actually a bug that causes the above to error currently, but it will get fixed in the next release)
g
But is this best practice? Or would you recommend to refactor and retrieve the self reference from dagster (without the nothing dependency)?
Is:
@op 'a_scd2' decorated function has parameter 'a_scd2' that is one of the input_defs of type 'Nothing' which should not be included since no data will be passed for it.
this the error you are talking about?
@sandy is the issue fixed now with the latest release?
s
yes - that issue with Nothing arguments to AssetIn is now fixed