Hello :dagster-spin: ! I have struggle in impleme...
# ask-community
a
Hello dagster spin ! I have struggle in implementing and using the op factory. I'm trying to create op for every Unit object that will request it's property and create new asset. My factory looks like this:
Copy code
import asyncio

from dagster import op, In, Output, List, AssetIn, Any, graph_asset


def get_n_messages(unit, name="get_n_messages"):
    @op(
        name=name,
        ins={"rpc_unit": In(description="unit obj")},
    )
    def _op(unit):
        result = asyncio.run(unit.get_property("n_msgs"))
        return Output(result, metadata={"unit_id": unit.id})

    return _op

@op(ins={"units": In(List, description="Units")})
def build_n_msg_factory(context, units):
    return Output([get_n_messages(u) for u in units])
And I'm trying to use it like this:
Copy code
@graph_asset(
    ins={"units": AssetIn("units", dagster_type=List[Any])}
)
def units_n_msg(units):
    return build_n_msg_factory(units)
I'm getting Error:
dagster._core.errors.DagsterInvariantViolationError: Attempting to access run_id, but it was not provided when constructing the OutputContext
I'm clearly misunderstand something. And I haven't found examples or detailed explanations how to use this pattern in Dagster blob sad.
o
Hi Aleksei - have you tried putting
ins
in
get_n_messages
that can be passed to the ops?
z
It looks like you're calling your op factory from within another op - this essentially leads to ops being called from within other ops, which is a no-go in Dagster. Op factories are usually called from within a
@graph
/
@graph_asset
definition, to create a set of ops for a graph at the time the graph is loaded into memory (before the job is run). If the
units
value is dynamic at runtime, it might make more sense to use a dynamic graph to iterate over the units and yield a DynamicOutput for each one in a first op, then have a downstream op that maps the DynamicOutputs and retrieves the
n_msgs
property for each one.
👀 1
thank you box 1