Stephen Bailey
06/21/2022, 1:37 AMAssetDefintion.from_graph(...)
, how do I specify resource configurations? It doesn't look like there's a kwarg for resource_defs, but I could be wrong.
2. I've become quite fond of <http://my_graph.to|my_graph.to>_job(...)
pattern. Would it make sense to add a <http://my_graph.to|my_graph.to>_asset(...)
pattern? I might be thinking about this in the wrong way -- still very much in an "op to graph to job/asset" mindset. Not sure if I should be working backwards.geoHeil
06/21/2022, 7:37 AMStephen Bailey
06/21/2022, 2:07 PMsandy
06/21/2022, 2:10 PMMy problem is that I am thinking about things one click higher: 1 graph/job == 1 assetHey @Stephen Bailey -
AssetsDefinition.from_graph
is built exactly for this pattern of multiple ops per assetsandy
06/21/2022, 2:14 PMsandy
06/21/2022, 2:15 PMStephen Bailey
06/21/2022, 2:15 PMwith_resources
could definitely use some documentation in that "upgrading to SDAs" pagesandy
06/21/2022, 2:16 PMsandy
06/21/2022, 2:16 PMStephen Bailey
06/21/2022, 2:18 PMAssetsDefinition.from_graph
resulted in reosurce errors: dagster.core.errors.DagsterInvalidDefinitionError: resource with key 'snowflake' required by op 'load_partition_graph.create_schema_op' was not provided....
It sounds like it should be:
@op(required_resource_keys={"snowflake"})
def do_something_op():
return [ 1, 2, 3 ]
@graph
def do_something_graph():
return do_something_op()
asset = with_resources(
[AssetDefintion.from_graph],
resource_config_by_key={
"snowflake": {
"config": {"bar": ...}
}
}
)
i was expecting the same job api: <http://graph.to|graph.to>_asset(config=..., resource_defs=...)
Stephen Bailey
06/21/2022, 2:18 PMStephen Bailey
06/21/2022, 2:52 PM#%%
from dagster import (
op,
graph,
with_resources,
define_asset_job,
AssetKey,
repository,
AssetsDefinition,
)
from core_workflows.common import whatnot_snowflake_resource
@op(required_resource_keys={"snowflake"}, config_schema={"number": int})
def do_something_op(context):
return [x + context.op_config["number"] for x in [1, 2, 3]]
@graph
def do_something_graph():
return do_something_op()
asset_def = AssetsDefinition.from_graph(
graph_def=do_something_graph,
keys_by_output_name={"result": AssetKey("foo")},
)
assets_configured = with_resources(
definitions=[asset_def],
resource_defs={"snowflake": whatnot_snowflake_resource},
)
asset_job = define_asset_job(
"foo_job",
selection="foo",
config={
"ops": {
"do_something_graph": {
"ops": {"do_something_op": {"config": {"number": 1}}}
}
}
},
)
@repository
def repo():
return assets_configured + [asset_job]
However, what's missing here is that the AssetDefinition does not have config embedded into it for the do_something_op
, so when I run foo_job
, it works, but if i try to "Materialize Asset" from the UI, it does not. Am I missing something in the AssetDefinition?sandy
06/21/2022, 3:13 PMconfigured
on the op before putting it inside the graph or give the graph a config-mapping and call configured
on it before calling AssetsDefinition.from_graph
on itsandy
06/21/2022, 3:15 PMfrom dagster import op, graph, AssetsDefinition, repository, config_mapping
@op(config_schema={"v": int})
def op1(context):
return context.op_config["v"] + 1
@op
def op2(context, op1):
return op1 + 1
@config_mapping(config_schema={"v": int})
def mygraph_config_mapping(val):
return {"op1": {"config": val}}
@graph(config=mygraph_config_mapping)
def mygraph():
return op2(op1())
VALUE_OPTIONS = [0, 1, 2]
myassets = [
[AssetsDefinition.from_graph(mygraph.configured({"v": v}, name=f"mygraph{v}"))][0]
for v in VALUE_OPTIONS
]
sandy
06/21/2022, 3:15 PMStephen Bailey
06/21/2022, 3:42 PMsandy
06/21/2022, 3:42 PM