Aleksei Kopylov
08/09/2023, 10:54 AMimport asyncio
from dagster import op, In, Output, List, AssetIn, Any, graph_asset
def get_n_messages(unit, name="get_n_messages"):
@op(
name=name,
ins={"rpc_unit": In(description="unit obj")},
)
def _op(unit):
result = asyncio.run(unit.get_property("n_msgs"))
return Output(result, metadata={"unit_id": unit.id})
return _op
@op(ins={"units": In(List, description="Units")})
def build_n_msg_factory(context, units):
return Output([get_n_messages(u) for u in units])
And I'm trying to use it like this:
@graph_asset(
ins={"units": AssetIn("units", dagster_type=List[Any])}
)
def units_n_msg(units):
return build_n_msg_factory(units)
I'm getting Error:
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
I'm clearly misunderstand something. And I haven't found examples or detailed explanations how to use this pattern in Dagster blob sad.Odette Harary
08/09/2023, 2:24 PMins
in get_n_messages
that can be passed to the ops?Zach
08/09/2023, 3:50 PM@graph
/ @graph_asset
definition, to create a set of ops for a graph at the time the graph is loaded into memory (before the job is run).
If the units
value is dynamic at runtime, it might make more sense to use a dynamic graph to iterate over the units and yield a DynamicOutput for each one in a first op, then have a downstream op that maps the DynamicOutputs and retrieves the n_msgs
property for each one.