https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Mark Fickett

04/28/2022, 2:12 PM
Another testing question, is it possible to use something
graph.execute_in_process
with a subgraph which takes an input? For example:
Copy code
@graph
def my_subgraph(value: int):
    a = my_op(value)
    return my_other_op(a)
Looking at the testing docs I see ways to test ops and full graphs, but testing subgraphs would be great too.
a

alex

04/28/2022, 3:07 PM
you should be able to specify the input via
run_config
direct value passing has some prototypes and is something we are working towards
m

Mark Fickett

04/28/2022, 3:11 PM
Thanks. So I would just specify the input for the ops within the subgraph, there's not an op name for the graph, right? Looking at https://docs.dagster.io/_apidocs/execution#config-schema , for my example it would be:
Copy code
ops: {

    # these keys align with the names of the ops, or their alias in this job
    "my_op": {

      # configurably specify input values, keyed by input name
      inputs: {
        "value": {
          # if an dagster_type_loader is specified, that schema must be satisfied here;
          # scalar, built-in types will generally allow their values to be specified directly:
          "value": 3
        }
      },

    }
  },
And I'd just repeat that for each op if multiple ops in the subgraph consume the graph's input(s).
a

alex

04/28/2022, 3:23 PM
you should be able to do top level
inputs: value: 3
my_subgraph.execute_in_process(run_config={'inputs': {'value': 3}})
m

Mark Fickett

04/28/2022, 3:24 PM
Oh, nice!
That's working well, thanks for all the help! Followup, what's the best way to load an input that contains an enum? (I've been looking at dagster type loader docs.) I have an input
pipe_to_test_id_list
of type
Dict[Pipe, List[str]]
where
Pipe
is my enum. It works fine in a full test run, but for
execute_in_process
with
inputs
in the config, I get an error that it "can not be loaded from configuration" and I should "add a dagster_type_loader for the type 'Dict[Pipe,[String]]'". If I add a type loader for the enum does that resolve the complex type too? Since I'm passing the actual enum value in my config inputs like
{'inputs': {'pipe_to_test_id_list': {<http://Pipe.my|Pipe.my>_pipe: ['id1', 'id2']}}}
, I'm not sure what my loader would be.
I tried adding a loader just for my
Pipe
enum but it doesn't seem to work, I get roughly the same error (though it does reference my DagsterType):
Copy code
@dagster.dagster_type_loader(dagster.Permissive())
def _load_pipe(_context, value):
    return value

_PipeIn = dagster.PythonObjectDagsterType(python_type=Pipe, name="_PipeIn", loader=_load_pipe)
Copy code
@op(
        ins={
            "pipe_to_test_id_list": In(dagster_type=dagster.Dict[_PipeIn, dagster.List[str]]),
a

alex

04/28/2022, 8:30 PM
It works fine in a full test run
how are you passing the value there? you may need to do
<http://Pipe.my|Pipe.my>_pipe.value
since run_config is the same input powered by yaml. It expects “scalar” inputs not any complex python object types
m

Mark Fickett

04/28/2022, 8:40 PM
In normal job execution, I'm just returning a value like
{<http://Pipe.my|Pipe.my>_pipe: ['id1', 'id2']}
from an upstream op, and that works fine. In a unit test, I was trying to do
my_subgraph.execute_in_process(run_config={'inputs': {'pipe_to_test_id_list': {<http://Pipe.my|Pipe.my>_pipe: ['id1', 'id2']}}})
. I get the same error whether I do that with
<http://Pipe.my|Pipe.my>_pipe
or
<http://Pipe.my|Pipe.my>_pipe.value
in the run_config argument, it says "add a dagster_type_loader for the type 'Dict[Pipe,[String]]'".
(By "full test run" I meant if I actually run the job that contains this subgraph via local Dagit.)
a

alex

04/28/2022, 8:46 PM
ah i see ya type loaders for composite types aren’t really a thing. I think you would have to make a type that represented / replaced the whole
dict[pipe, list[str]]
as a single type
m

Mark Fickett

04/29/2022, 12:53 AM
I'm not getting that to work either. Here's a full example. If I use the root input manager then I still get an error about the config, and if I add a config too then I get a different error about the config. If the root input manager can work that that would be a great solution.
As a workaround, I can just define an
@op
to produce the input value and a
@graph
to wrap the subgraph under test, in my unit test. But if there's a way to get something more streamlined working that'd be nice!
a

alex

04/29/2022, 2:00 PM
This is admittedly very confusing, but for the root input manager version you have to fill out the config at the level of the op input instead of via the graph input mapping.
{'ops': {'consume_value': ['inputs': 'pipe_to_test_id_list': ...]}}