How can I map a config to an `@op` within a `@grap...
# dagster-feedback
s
How can I map a config to an
@op
within a
@graph_asset
? I have an
@op
that yields all the records in an S3 inventory file, taking the path to the inventory file in the config argument:
Copy code
class S3InventoryConfig(Config):
    inventory_url: str = Field(
        None,
        description="URL to the inventory file",
    )

@op(out=DynamicOut())
def s3_inventory(
    context: OpExecutionContext, 
    s3: S3Resource, 
    s3_config: S3InventoryConfig,
) -> Generator[any, None, None]:
    # implementation ...
    pass
I have an
@graph_asset
where I want to use this:
Copy code
@graph_asset()
def some_asset():
   files = s3_inventory().collect()
   return some_other_op(files)
@graph_asset
doesn't seem to take any config params, so I either have to call
AssetDefinitions.from_graph()
myself. So I tried following the ConfigMapping documentation to create a graph with a config mapping and use that with
AssetsDefinition.from_graph()
to create an asset:
Copy code
some_s3_config = S3InventoryConfig(
    inventory_url="<s3://bucket/path/to/inventory.csv>"
)

@config_mapping
def my_graph_mapping(_):
    return RunConfig(
        ops={"s3_inventory": some_s3_config}
    )

@graph(config=my_graph_mapping)
def some_graph():
    files = s3_inventory().collect()
    return some_other_op(files)

some_asset = AssetsDefinition.from_graph(
    some_graph,
)
But this fails with an
Value at the root must be dict
error when validating the config mapping. It seems that
@config_mapping
in this case should return a
dict
instead of a
RunConfig
I'm guessing because it's not annotating an
@job
. So I change it from
RunConfig
to a
dict
, but then
some_s3_config
fails validation because it should also be a
dict
for some unknown reason. So I end up with:
Copy code
@config_mapping
def my_graph_mapping(_):
    return { "ops": {"s3_inventory": some_s3_config.dict() } }
This all ends up being way more verbose than expected and cumbersome to work with.
plus1 1
s
Following... My use case is that I have an asset generated from a graph of 3 ops: 1) upload 2) run 3) delete. The asset essentially uploads a script, then runs it and finally deletes it. I would like to pass a config to the run op so that every time I need to materialise the asset, it will ask me for the config to pass to the run op. Cheers
I think this issue has to be how the config is passed from the UI LaunchPad to the backend code. I get this error
Copy code
dagster._core.errors.DagsterInvalidConfigError: In job __ASSET_JOB at stack testing_asset_graph:
Op "testing_asset_graph" with definition "testing_asset_graph" has a configuration error. It has produced config a via its config_fn that fails to pass validation in the ops that it contains. This indicates an error in the config mapping function itself. It must produce correct config for its constituent ops in all cases. The correct resolution is to fix the mapping function. Details on the error (and the paths on this error are relative to config mapping function "root", not the entire document):
    Error 1: Received unexpected config entries "['execution', 'loggers', 'ops', 'resources']" at the root. Expected: "['delete_op', 'run_op', 'upload_op']."
    Error 2: Missing required config entry "run_op" at the root. Sample config for missing entry: {'run_op': {'config': {'a_param': '...', 'another_param': '...'}}}

  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 499, in get_external_execution_plan_snapshot
    create_execution_plan(
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/execution/api.py", line 722, in create_execution_plan
    resolved_run_config = ResolvedRunConfig.build(job_def, run_config)
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/objects.py", line 181, in build
    op_config_dict = composite_descent(
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 104, in composite_descent
    return {
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 104, in <dictcomp>
    return {
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 168, in _composite_descent
    _apply_config_mapping(
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 300, in _apply_config_mapping
    raise_composite_descent_config_error(current_stack, mapped_ops_config, evr)
  File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 356, in raise_composite_descent_config_error
    raise DagsterInvalidConfigError(message, evr.errors, failed_config_value)
This is the LaunchPad configuration
Copy code
ops:
  testing_asset_graph:
    config:
      a_param: Hello
      another_param: Folks
and this is the code snippet to generate the asset for the graph of ops