Kasper Ramström
12/15/2022, 11:04 AMdef run_node(node):
if node.has_child_nodes():
for child_node in node.child_nodes():
yield run_node(child_node)
yield {"res": my_func(node)}
def my_job():
graph = load_graph()
for node in graph:
run_node(node)
Thomas
12/15/2022, 11:06 AMjamie
12/15/2022, 3:31 PMKasper Ramström
12/16/2022, 7:29 AMAssetsDefinition.from_graph
to create assets lineage?
It looks like dagster-dbt is doing something similar to what I want -- loading some input/config and building a runnable lineage in dagster from this.
I tried reprocuding the dagster-dbt way but I'm not sure how to use AssetsDefinition
and AssetKey
correctly. The following minimal example just gives KeyError
when creating the AssetsDefinition
for the keys_by_output_name
from typing import Dict, Tuple
from dagster import (
AssetKey,
AssetsDefinition,
In,
Nothing,
Out,
op,
)
def get_op(op_name, k, ins, outs):
@op(name=op_name)
def my_op():
print(k)
return my_op
def get_assets() -> AssetsDefinition:
asset_ins: Dict[AssetKey, Tuple[str, In]] = {}
asset_outs: Dict[AssetKey, Tuple[str, Out]] = {}
asset_ins[AssetKey("foo")] = ("some_input", In(Nothing))
asset_outs[AssetKey("bar")] = ("some_output", Out())
my_op = get_op(
op_name="some_output",
k="foo",
ins=dict(asset_ins.values()),
outs=dict(asset_outs.values()),
)
return AssetsDefinition(
keys_by_input_name={
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
},
keys_by_output_name={
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
},
node_def=my_op,
)
get_assets()
from dagster import AssetIn, asset
def load_graph():
return {
"a1": {
"children": [
{"a1b1": {"children": [], "op": None}},
{"a1b2": {"children": [], "op": None}},
],
"op": None,
},
"b1": {
"children": [
{"b1b1": {"children": [], "op": None}},
{"b1b2": {"children": [], "op": None}},
],
"op": None,
},
}
def get_asset(name):
@asset(name=f"{name}__input")
def input_asset():
return "input"
@asset(name=name, ins={"parents_asset": AssetIn(f"{name}__input")})
def generic_asset(parents_asset):
print(parents_asset)
return [input_asset, generic_asset]
def get_assets():
loaded_assets = []
for key, value in load_graph():
loaded_assets = loaded_assets + get_asset(key)
return loaded_assets
multi_assets = get_assets()
which seems to kind of work, the graph is created and it looks correct in dagit, however I get graphql errors instead:
Operation name: AssetGraphLiveQuery
Message: maximum recursion depth exceeded while calling a Python object
Path: ["assetNodes",2,"projectedLogicalVersion"]
Locations: [{"line":44,"column":3}]
jamie
12/16/2022, 4:09 PMKasper Ramström
12/19/2022, 7:50 AM