Sean Quinlan
07/11/2023, 4:59 PM@op
within a @graph_asset
?
I have an @op
that yields all the records in an S3 inventory file, taking the path to the inventory file in the config argument:
class S3InventoryConfig(Config):
inventory_url: str = Field(
None,
description="URL to the inventory file",
)
@op(out=DynamicOut())
def s3_inventory(
context: OpExecutionContext,
s3: S3Resource,
s3_config: S3InventoryConfig,
) -> Generator[any, None, None]:
# implementation ...
pass
I have an @graph_asset
where I want to use this:
@graph_asset()
def some_asset():
files = s3_inventory().collect()
return some_other_op(files)
@graph_asset
doesn't seem to take any config params, so I either have to call AssetDefinitions.from_graph()
myself. So I tried following the ConfigMapping documentation to create a graph with a config mapping and use that with AssetsDefinition.from_graph()
to create an asset:
some_s3_config = S3InventoryConfig(
inventory_url="<s3://bucket/path/to/inventory.csv>"
)
@config_mapping
def my_graph_mapping(_):
return RunConfig(
ops={"s3_inventory": some_s3_config}
)
@graph(config=my_graph_mapping)
def some_graph():
files = s3_inventory().collect()
return some_other_op(files)
some_asset = AssetsDefinition.from_graph(
some_graph,
)
But this fails with an Value at the root must be dict
error when validating the config mapping. It seems that @config_mapping
in this case should return a dict
instead of a RunConfig
I'm guessing because it's not annotating an @job
. So I change it from RunConfig
to a dict
, but then some_s3_config
fails validation because it should also be a dict
for some unknown reason. So I end up with:
@config_mapping
def my_graph_mapping(_):
return { "ops": {"s3_inventory": some_s3_config.dict() } }
This all ends up being way more verbose than expected and cumbersome to work with.Sergio Pintaldi
08/29/2023, 11:01 AMSergio Pintaldi
08/30/2023, 2:14 AMdagster._core.errors.DagsterInvalidConfigError: In job __ASSET_JOB at stack testing_asset_graph:
Op "testing_asset_graph" with definition "testing_asset_graph" has a configuration error. It has produced config a via its config_fn that fails to pass validation in the ops that it contains. This indicates an error in the config mapping function itself. It must produce correct config for its constituent ops in all cases. The correct resolution is to fix the mapping function. Details on the error (and the paths on this error are relative to config mapping function "root", not the entire document):
Error 1: Received unexpected config entries "['execution', 'loggers', 'ops', 'resources']" at the root. Expected: "['delete_op', 'run_op', 'upload_op']."
Error 2: Missing required config entry "run_op" at the root. Sample config for missing entry: {'run_op': {'config': {'a_param': '...', 'another_param': '...'}}}
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 499, in get_external_execution_plan_snapshot
create_execution_plan(
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/execution/api.py", line 722, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(job_def, run_config)
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/objects.py", line 181, in build
op_config_dict = composite_descent(
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 104, in composite_descent
return {
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 104, in <dictcomp>
return {
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 168, in _composite_descent
_apply_config_mapping(
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 300, in _apply_config_mapping
raise_composite_descent_config_error(current_stack, mapped_ops_config, evr)
File "/path/to/my/python/venv/lib/python3.9/site-packages/dagster/_core/system_config/composite_descent.py", line 356, in raise_composite_descent_config_error
raise DagsterInvalidConfigError(message, evr.errors, failed_config_value)
This is the LaunchPad configuration
ops:
testing_asset_graph:
config:
a_param: Hello
another_param: Folks
and this is the code snippet to generate the asset for the graph of ops