Hi all, thanks for creating this amazing tool, I'm...
# ask-community
d
Hi all, thanks for creating this amazing tool, I'm new at dagster, I'm trying to build a pipeline that process data from different days, but if a partition is missed that is ok, we can still proceed, I saw this comment in discussions https://github.com/dagster-io/dagster/discussions/10880 about passing
allow_missing_partitions
to the Ins metadata, but seems like its not working with
graph_assets
I'll appreciate any help, and maybe I'm doing things wrong My Code:
Copy code
@graph_asset(
    partitions_def=feeds_partitions_def,
    ins={"daily_feed_articles": AssetIn(metadata={"allow_missing_partitions": True})},
)
def articles_feed(daily_feed_articles):
    return calculate_feed_score(deduplicate_articles(daily_feed_articles))
The error:
Copy code
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "article_partitions" of step "articles_feed.deduplicate_articles":

  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 372, in core_dagster_event_sequence_for_step
    for event_or_input_value in step_input.source.load_input_object(step_context, input_def):
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 187, in load_input_object
    yield from _load_input_with_input_manager(loader, load_input_context)
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 780, in _load_input_with_input_manager
    with op_execution_error_boundary(
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/nagster/storage/daily_feed_articles/2023-08-22/6'

  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 787, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 426, in load_input
    return self._load_multiple_inputs(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 326, in _load_multiple_inputs
    obj = self._load_partition_from_path(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 306, in _load_partition_from_path
    raise e
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 295, in _load_partition_from_path
    obj = self.load_from_path(context=context, path=path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/fs_io_manager.py", line 283, in load_from_path
    with path.open("rb") as file:
         ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/pathlib.py", line 1044, in open
    return io.open(self, mode, buffering, encoding, errors, newline)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The above exception occurred during handling of the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/nagster/storage/daily_feed_articles/2023-08-22/6'

  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 290, in _load_partition_from_path
    obj = self.load_from_path(context=context, path=path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/fs_io_manager.py", line 283, in load_from_path
    with path.open("rb") as file:
         ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/pathlib.py", line 1044, in open
    return io.open(self, mode, buffering, encoding, errors, newline)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
j
hey @Darwing Medina Lacayo - a couple follow up questions 1. if you use plain assets (ie not a graph asset) with
allow_missing_partitions=True
do you see the same issue? 2. what I/O manager are you using?
and 3. what version of dagster are you on?
d
1. I'm going to test with a plain asset 2. I'm using the default file iomanager 3. 1.4.7
j
ok! let me know how that goes. It’ll be helpful to see if this is a problem just for graph_assets, or if there’s a larger issue. the file system io manager should respect
allow_missing_partitions=True
, but there could be an edge case or other complication with graph assets in the mix
❤️ 1
d
hey @jamie I tested with a single asset and got the same error:
Copy code
@asset(partitions_def=daily_feed_partitions_def, compute_kind="request")
def raw_asset(
    context: AssetExecutionContext, db: Database, api: ArticlesResource
) -> Output[list[dict[str, Any]]]:
    return _get_raw_asset(context, db, api, "value")
Copy code
@asset(
    partitions_def=daily_feed_partitions_def,
    ins={
        "raw_asset": AssetIn("raw_asset", metadata={"allow_missing_partitions": True}),
    },
    compute_kind="python"
)
def parsed_asset(context: AssetExecutionContext, raw_asset: list[dict[str, Any]]) -> Output[list[ArticleResult]]:
    return _parse_articles(context, raw_asset, "google")
Copy code
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "raw_asset" of step "parsed_asset":
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 372, in core_dagster_event_sequence_for_step
    for event_or_input_value in step_input.source.load_input_object(step_context, input_def):
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 187, in load_input_object
    yield from _load_input_with_input_manager(loader, load_input_context)
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 780, in _load_input_with_input_manager
    with op_execution_error_boundary(
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/nagster/storage/raw_asset/2023-08-22/17'
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 787, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 416, in load_input
    return self._load_single_input(path, context, backcompat_path)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 254, in _load_single_input
    raise e
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 239, in _load_single_input
    obj = self.load_from_path(context=context, path=path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/fs_io_manager.py", line 283, in load_from_path
    with path.open("rb") as file:
         ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/pathlib.py", line 1044, in open
    return io.open(self, mode, buffering, encoding, errors, newline)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The above exception occurred during handling of the following exception:
FileNotFoundError: [Errno 2] No such file or directory: '/app/nagster/storage/raw_asset/2023-08-22|17'
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 245, in _load_single_input
    obj = self.load_from_path(context=context, path=backcompat_path)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dagster/_core/storage/fs_io_manager.py", line 283, in load_from_path
    with path.open("rb") as file:
         ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/pathlib.py", line 1044, in open
    return io.open(self, mode, buffering, encoding, errors, newline)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
I want to mention that this was executed as a backfill
j
ok - this sounds like a bug. can you open a github issue with your various replication cases and error messages? i’ll get it directed to the right person https://github.com/dagster-io/dagster/issues/new/choose
❤️ 1