Ansel Boero
01/31/2023, 9:10 AMdagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
Thank you very much for your help :)jamie
01/31/2023, 5:20 PMAnsel Boero
01/31/2023, 6:04 PMdef handle_output(self, context: OutputContext, obj: Union[DataFrame, None] ) -> None:
if isinstance(obj, DataFrame):
path = os.path.join(self._base_path, context.run_id, f"{context.step_key}.pq")
obj.write.parquet(path)
def load_input(self, context: InputContext) -> Union[DataFrame, None]:
path = os.path.join(self._base_path, context.upstream_output.run_id,
f"{context.upstream_output.step_key}.pq")
return context.resources.pyspark.spark_session.read.parquet(path)
handle_output loads the dataframe generated by AssetA and load_input takes care of transforming the file into a DataFrame ready for consumption by AssetB.
The IOManager works well if AssetA and AssetB are run in the same run.
If AssetB is launched in a different run I get the following error (due to InputContext.upstream_output)
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
Thank you again for your support.Will Holley
02/13/2023, 7:06 PMcontext.step_context.pipeline_run.root_run_id
Ansel Boero
02/13/2023, 7:58 PMcontext.step_context.pipeline_run.root_run_id
outside the IO Manager's load_input
function, right?
Because I don't think the upstream context has access to root_run_id
, unfortunately.Will Holley
02/13/2023, 8:22 PMload_input
.context.upstream_output
won't be able to access it but it' available within the pipeline context.owen
02/13/2023, 9:06 PMWill Holley
02/13/2023, 9:08 PMowen
02/13/2023, 9:24 PMWill Holley
02/13/2023, 9:24 PMload_input
will query records by the run_id
.owen
02/13/2023, 9:26 PMWill Holley
02/13/2023, 9:28 PMowen
02/13/2023, 9:33 PMrun_id
. The run_id is random, so you have to rely on dagster machinery to figure out which one is the most recent, but timestamps have the nice property of being ordered, so you could just query for all artifacts with the most recent timestamp inside your load_input
function.some/path/{write_timestamp}/{run_id}/
Will Holley
02/13/2023, 9:45 PMowen
02/13/2023, 9:49 PMWill Holley
02/13/2023, 9:50 PM1. Scrape a website
2. Transform the HTML
3. *take further steps based on the parsed content*
owen
02/14/2023, 12:36 AMWill Holley
02/14/2023, 3:28 PMroot_run_id
to make reference the artifacts materialized in step 1.owen
02/14/2023, 5:07 PM