Bolin Zhu
03/16/2023, 8:25 AM<http://dagster.graph.to|dagster.graph.to>_job
with different config and resource_defs etc.
If I understand the concept correct, each asset/graph-backed-asset is supposed to be unique and has a unique identifier/key. Do I have to convert the following graph
@graph
def batch_mongo_graph():
data = extract_mongo_op()
validated = deserialise_json(data)
df = validated_to_dataframe(validated)
load_redshift_batch(df) // another graph executing commands to load data from s3 to redshift
to something like
@graph_asset
def mongo_extract_load_asset_one():
data = extract_mongo_op(start=job_is_defined)
validated = deserialise_json(data)
df = validated_to_dataframe(validated)
load_redshift_batch(df)
@graph_asset
def mongo_extract_load_asset_two():
data = extract_mongo_op(start=job_is_defined)
validated = deserialise_json(data)
df = validated_to_dataframe(validated)
load_redshift_batch(df)
How do I specify run time configurations of the graph assets and also how do I turn them into jobs?asset_one = AssetsDefinition.from_graph(...)
asset_one_job = define_asset_job(name='asset_one')
defs = Definitions(
assets=[asset_one],
jobs=[asset_one_job]
)
I am getting this error:
dagster._check.CheckError: Invariant failed. Description: All leaf nodes within graph 'batch_mongo_graph' must generate outputs which are mapped to outputs of the graph, and produce assets. The following leaf node(s) are non-asset producing ops: {'load_redshift_batch'}. This behavior is not currently supported because these ops are not required for the creation of the associated asset(s).