https://dagster.io/ logo
Title
a

Ansel Boero

01/31/2023, 9:10 AM
Hello Dagsters! I have problems with a job if executed in multiple runs (OpA: RootRunID, OpB: subsequent run) using a custom IOManager. Let's Suppose a job consists of OpA --> OpB. OpA saves a csv in a folder called {run_id} (handled by IOManager's handle_output). In case OpB is executed in a new run, how can I make the InputContext get the run_id of OpA in the load_input' s IOManager function? Via InputContext I cannot get the RootRunId. And I get this error when I try to call InputContext.get_identifier() (In the new run I'm only executing OpB because OpA was successful in the RootRun)
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
Thank you very much for your help :)
j

jamie

01/31/2023, 5:20 PM
Hi @Ansel Boero in general, ops aren’t meant to use the data created by ops in a different jobs. Typically we would put the ops together in the same job. if that won’t work for your use case, can you elaborate a bit more on what you’re trying to achieve? i can help come up with a solution
a

Ansel Boero

01/31/2023, 6:04 PM
Hi @jamie, Thank you for your support. The assets (or op) are in the same job. However, it may happen that B fails (for various reasons, connectivity issues, for instance) and only needs to rerun B. I'm actually working with two assets, AssetA-->AssetB. I would like to save the file obtained from assetA (pickle, csv, doesn't matter) with only one constraint: for Audit reasons I have to save the file in a folder with a name that can be associated with the run ({run_id} or {root_run_id} seemed ideal to me). I would later like AssetB to be able to open the file saved by AssetA (through load_input's IOManager). The problem is that assetB, if launched alone in a run subsequent to the rootRun, is unable to find the run_id value belonging to the InputContext (AssetA). This is the code used to create the IOManager
def handle_output(self, context: OutputContext, obj: Union[DataFrame, None] ) -> None:
        if isinstance(obj, DataFrame):
            path = os.path.join(self._base_path, context.run_id, f"{context.step_key}.pq")
            obj.write.parquet(path)

def load_input(self, context: InputContext) -> Union[DataFrame, None]:
        path = os.path.join(self._base_path, context.upstream_output.run_id, 
            f"{context.upstream_output.step_key}.pq")
        return context.resources.pyspark.spark_session.read.parquet(path)
handle_output loads the dataframe generated by AssetA and load_input takes care of transforming the file into a DataFrame ready for consumption by AssetB. The IOManager works well if AssetA and AssetB are run in the same run. If AssetB is launched in a different run I get the following error (due to InputContext.upstream_output)
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
Thank you again for your support.
:rainbow-daggy: 1
w

Will Holley

02/13/2023, 7:06 PM
Also encountering this issue where Asset A writes an artifact using a customer I/O Manager and its run ID, Asset B materialization fails and needs to be re-run, but cannot load the Asset A output because it doesn't have access to the Root Run ID (and in fact, when re-running in a new job, it does not have access to the upstream output Run ID at all).
@Ansel Boero – I ended up getting this to work using
context.step_context.pipeline_run.root_run_id
a

Ansel Boero

02/13/2023, 7:58 PM
Thank you very much @Will Holley for your feedback, But you ran
context.step_context.pipeline_run.root_run_id
outside the IO Manager's
load_input
function, right? Because I don't think the upstream context has access to
root_run_id
, unfortunately.
w

Will Holley

02/13/2023, 8:22 PM
No, inside of
load_input
.
You're correct that
context.upstream_output
won't be able to access it but it' available within the pipeline context.
I'm considering that custom IO managers may be more trouble than they're worth and persisting my artifacts outside of them in addition to using the default IO manager.
o

owen

02/13/2023, 9:06 PM
hi! just at a high level, io managers for assets should not use the run id as part of the location. this is partly because of the issues you are encountering, and partly philosophical (an asset is generally a single entity, so it should have a static location). our built in io managers function this way — i’m wondering if it would be reasonable for your use cases to avoid using run id as well
w

Will Holley

02/13/2023, 9:08 PM
@owen thanks for responding. I've been using custom I/O managers to store artifacts for use outside of Dagster (e.g. write a database record) and store each artifact alongside the run ID that generated it and other metadata.
o

owen

02/13/2023, 9:24 PM
it sounds like that's just on the writing side though, is that right? as in, when the artifact is read in by a downstream asset, it probably doesn't care about the other metadata (it just needs the artifact). If that's the case, I still think it makes sense for the path/physical location of the artifact to be static, and the other metadata to be attached in a different way.
w

Will Holley

02/13/2023, 9:24 PM
I've been using the metadata to identify the asset i.e.
load_input
will query records by the
run_id
.
Ultimately i'm killing 2 birds with one stone – writing the artifact and persisting the asset between steps
o

owen

02/13/2023, 9:26 PM
got it -- so the key issue here is that you want to keep multiple of these artifacts around at any given time (one for each time the asset was materialized), but have the load_input only pull in the most recent artifact?
w

Will Holley

02/13/2023, 9:28 PM
Exactly, primarily because artifacts tend to be some selection of relevant records rather than an entire set but the relevancy isn't compatible with a partition definition
Does that make sense?
o

owen

02/13/2023, 9:33 PM
one thought I had is that it might make things easier to use a timestamp as the "identifier" for these artifacts, rather than the
run_id
. The run_id is random, so you have to rely on dagster machinery to figure out which one is the most recent, but timestamps have the nice property of being ordered, so you could just query for all artifacts with the most recent timestamp inside your
load_input
function.
also @Ansel Boero I'm thinking a similar scheme might work for you (after re-reading your original post). Essentially, creating paths of the form
some/path/{write_timestamp}/{run_id}/
w

Will Holley

02/13/2023, 9:45 PM
My issue there is compatibility with asset sensors
Which kick off all of my jobs unless i'm running 1-offs or backfills
Basically the asset sensor indicates to the head op of an asset group which records to use
o

owen

02/13/2023, 9:49 PM
mm so you're not always interested in the most recent records? how do you know / specify that a given run_id has the records that you're interested in?
w

Will Holley

02/13/2023, 9:50 PM
Upstream, record insertion is 1:1 with runs
I considered using the run ids as the primary keys (Postgres) but w/o sufficient documented context it's probably too be confusing
You can think about it like
1. Scrape a website
2. Transform the HTML
3. *take further steps based on the parsed content*
Each website scrape receives dedicated resources that are set up and torn down for that site. Initially I was running #2 in bulk but it's cheaper compute-wise to process the records as a stream.
o

owen

02/14/2023, 12:36 AM
sorry, I think I might have lost the thread a bit on this one so apologies if this suggestion is off base. Is (1) the step that you want to store artifacts for? and then (2) is hooked up to an asset sensor that listens for materializations of (1)? If so, could you add the run_id of the materialization of (1) as configuration to the execution of (2) inside your asset sensor? Then the io_manager could read that upstream_run_id config within its load_input function.
w

Will Holley

02/14/2023, 3:28 PM
@owen that's what i'm doing
Here's the issue: 2 is the head node within an asset group and if one of the downstream nodes fails, on re-try I need to use
root_run_id
to make reference the artifacts materialized in step 1.
Which, IMO, isn't that big of an issue, just convoluted. Overnight I ended up deprecating the I/O managers in question and replacing them w/ a caching function that writes the aforementioned artifacts while letting Dagster handle inter-op I/O w/ the default handler.
o

owen

02/14/2023, 5:07 PM
ah I see -- I think your solution makes sense in that case!
:dagster: 1