https://dagster.io/ logo
#ask-community
Title
# ask-community
b

Bennett Norman

10/18/2022, 6:18 PM
Hello! I’ve created an IO Manager that creates a sqlite database for each dagster run and reads and writes dataframes to the database. The IO Manager works well but when I rerun a portion of the DAG I get this error:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
and this warning:
Copy code
No previously stored outputs found for source StepOutputHandle(step_key='asset1', output_name='result', mapping_key=None). This is either because you are using an IO Manager that does not depend on run ID, or because all the previous runs have skipped the output in conditional execution.
I only get this error when I rerun assets. Is it possible to create an IO Manager that doesn’t overwrite assets on each run? I’m using
context.get_identifier()
to get the run_id in the IO Manager.
c

chris

10/18/2022, 9:12 PM
hey Bennett! Would you mind posting the full stack trace for that error? Also, when re-executing, what's the mechanism? From dagit using from failure, using a subset, etc
b

Bennett Norman

10/19/2022, 4:54 AM
Hi Chris! This happens when I re-execute from a failure or a subset of assets. Here is the full stack trace:
Copy code
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 322, in core_dagster_event_sequence_for_step
    for event_or_input_value in ensure_gen(
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 501, in load_input_object
    yield from _load_input_with_input_manager(input_manager, load_input_context)
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/Users/bendnorman/catalyst/pudl/notebooks/work-in-progress/dagster_prototypes/graph_nesting/asset_iomanager.py", line 64, in load_input
    filepath = self._get_path(context)
  File "/Users/bendnorman/catalyst/pudl/notebooks/work-in-progress/dagster_prototypes/graph_nesting/asset_iomanager.py", line 44, in _get_path
    run_id = context.get_identifier()[0]
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 391, in get_identifier
    return self.upstream_output.get_identifier()
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/context/output.py", line 554, in get_identifier
    run_id = self.run_id
  File "/Users/bendnorman/opt/anaconda3/envs/pudl-dev/lib/python3.10/site-packages/dagster/_core/execution/context/output.py", line 221, in run_id
    raise DagsterInvariantViolationError(
c

chris

10/19/2022, 5:25 AM
Ah so I think you're going to want to be using
get_asset_identifier
, if the
asset_key
is set on the input/output context.
b

Bennett Norman

10/20/2022, 12:08 AM
Thanks! I Added
get_asset_indentifier()
to my io manager but I got the same error. I reread the docs for the `fs_io_manager`:
These filesystem IO managers, along with
fs_io_manager
, store op outputs at a unique path identified by the run ID, step key, and output name. These IO managers will output assets at a unique path identified by the asset key.
I noticed a run_id directory is created for each run when using ops but assets are just overwritten. Is this a fundamental difference between ops and assets? Is it possible to create an io manager that saves assets for each run instead of overwriting the assets?
c

chris

10/20/2022, 8:24 PM
Hmm interesting. Same error, same stack trace? Are you still calling
get_identifier
somewhere?
get_asset_identifier
shouldn't ever be calling for the
run_id
.
You're correct that the path-ing is a fundemental difference between assets and ops. The idea is that a software-defined asset represents a single physical "thing" somewhere, and that runs are always to produce that same thing in that same location.
It's technically possible to be storing in a path based on run_id, but best practices entail having a single place in storage that you're materializing to for a given asset key.
b

Bennett Norman

10/20/2022, 9:06 PM
I see. I’ll play around with overwriting assets in a single db similar to the snowflake_io_manager example. Rerun subsets of our asset DAG will likely result in foreign key constraint errors. What is common practice for users who write to DBs using dagster? How do they avoid FK errors while rerunning subsets of the DAG?
14 Views