Dmitry Mogilevsky
06/20/2022, 7:50 AMwith_resources
to create multiple assets from the same graph with varying configs, using the code below
@op(required_resource_keys={"a"})
def op1(context):
return context.resources.a["v"] + 1
@op
def op2(context, op1):
return op1 + 1
@graph
def mygraph():
return op2(op1())
@job(resource_defs={"a": make_values_resource(v=int)})
def myjob():
mygraph()
VALUE_OPTIONS = [0, 1, 2]
myassets = [
with_resources(
[AssetsDefinition.from_graph(mygraph)],
resource_defs={"a": make_values_resource(v=int)},
resource_config_by_key={"a": {"config": {"v": v}}},
)[0]
for v in VALUE_OPTIONS
]
@repository
def test_repo():
return [*myassets, myjob]
However, I get the following error when I attempt to load it
dagster.core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'a' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
Is it not possible to specify multiple assets from a single graph that vary only in input config?
I realise this a classic case for Static Partitioning, but I'm trying to do this without using Static Partitioning to get around the fact that I can't partition across multiple directions, as mentioned in a previous thread.sandy
06/20/2022, 4:40 PMfrom dagster import op, graph, AssetsDefinition, repository, config_mapping
@op(config_schema={"v": int})
def op1(context):
return context.op_config["v"] + 1
@op
def op2(context, op1):
return op1 + 1
@config_mapping(config_schema={"v": int})
def mygraph_config_mapping(val):
return {"op1": {"config": val}}
@graph(config=mygraph_config_mapping)
def mygraph():
return op2(op1())
VALUE_OPTIONS = [0, 1, 2]
myassets = [
[AssetsDefinition.from_graph(mygraph.configured({"v": v}, name=f"mygraph{v}"))][0]
for v in VALUE_OPTIONS
]
@repository
def test_repo():
return [*myassets]
Dmitry Mogilevsky
06/20/2022, 4:42 PM