In the Dagster docs page there are instructions on...
# ask-community
s
In the Dagster docs page there are instructions on how to create op factories. After I’ve used the factory to create some ops, how do I create a graph from those factory-defined ops?
Is it a best practice to just use normal python dynamic function, like
getattr(sys.modules[__name__], 'my_op_from_factory')
? Seems there there would be a dagster-ish way to do this.
a
What I've used for jobs, which would presumably also work for graphs, is to also make factory functions for them. I actually moved away from op factories since I was able to accomplish everything I needed solely with job and schedule factories. I don't see why you'd want to use a dynamic function instead of just invoking the factory directly.
s
Copy code
from dagster import job, op


def get_op():
    @op
    def an_op():
        pass
    return an_op

@job
def a_job():
    get_op()()
☝️ 1
@Sterling Paramore is that what you are looking for?
s
I think I figured out what I was trying to do. I didn’t realize that the methods that ops define can be placed anywhere, so I just generated in the graph. Here’s my attempt at mocking out two jobs that use the same kind of data flow
get_state -> run -> save_state
, but will eventually be parameterized with different arguments.
Copy code
def meltano_get_state_factory(name='not defined op', ins=None, **kwargs):
    @op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
    def my_inner_op(**kwargs):
        return f'meltano get state for {name}'

    return my_inner_op

def meltano_run_factory(name='not defined op', ins=None, **kwargs):
    @op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
    def my_inner_op(**kwargs):
        return f'meltano run for {name}'

    return my_inner_op

def meltano_save_state_factory(name='not defined op', ins=None, **kwargs):
    @op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
    def my_inner_op(**kwargs):
        return f'meltano save state for {name}'

    return my_inner_op

def meltano_graph_factory(name='not defined graph', **kwargs):
    @graph(name=name)
    def my_inner_graph(**kwargs):
        get_state = meltano_get_state_factory(f'{name}_get_state')
        run = meltano_run_factory(f'{name}_run')
        save_state = meltano_save_state_factory(f'{name}_save_state')
        return save_state(
            run(
                get_state()
            )
        )
    return my_inner_graph

@job
def meltano_etahub_to_bigquery_job():
    meltano_graph_factory('etahub_to_bigquery')()

@job
def meltano_msehub_to_bigquery_job():
    meltano_graph_factory('msehub_to_bigquery')()
Still don’t really know if this is the right way to do this kind of thing….
s
what varies between the two jobs in the actual calls to meltano
o
my instinct looking at the graph factory is that you might be better served by using resources, rather than factories. Specifically for the second point
"Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually."
. It feels like the "name" is the only bit that will change between these commands, so having a single resource that will supply that shared configuration to each of the ops will end up fairly clean:
Copy code
@op(ins={"start": In(Nothing)}, required_resource_keys={"meltano_name"})
def meltano_run(context):
    name = context.resources.meltano_name
    # run command based off of name

# ... same thing for meltano_save_state and meltano_get_state


@graph
def meltano_graph():
    meltano_save_state(start=meltano_run(start=meltano_get_state()))


job_1 = meltano_graph.to_job(
    resource_defs={"meltano_name": ResourceDefinition.hardcoded_resource("etahub_to_bigquery")}
)

job_1 = meltano_graph.to_job(
    resource_defs={"meltano_name": ResourceDefinition.hardcoded_resource("msehub_to_bigquery")}
)
s
Thanks, I haven’t quite gotten through the resources docs yet. There will be more different between the jobs than just the name, but from that code it looks like I can make that work.
Ok….. so I’ve got that working, but I might not have fully stated my ultimate intentions…. 😬 Now I’ve got these two jobs that I might want to manually trigger:
etahub_to_bigquery
and
msehub_to_bigquery
, but I’d like to create a third job which runs the graphs behind these two jobs in parallel and then when they both complete, run another graph. But it looks like the graphs can only use a resource when they’re part of a job, and jobs cannot be composed of other jobs….
Maybe I should be looking into configs instead of resources…. But that’s a next week problem. Thanks for your help!
o
ooh I see — that does change things a bit, as having a global resource muddies the waters when you want to have different values in play
you're totally correct on config — specifically ConfigMapping on your graph would provide a clean interface for configuring your graph
ty spinny 1
but happy to talk more about that next week if it's helpful :)
s
Getting this going is my top priority for the next month so you’ll definitely be hearing from me again. Thanks again for your prompt and super helpful responses!
o
happy to help!
s
I was able to get the dag working as I had hoped by using configs and configured!
👍 1