i’m trying to chain ops which get dynamically gene...
# ask-community
s
i’m trying to chain ops which get dynamically generated from a config upon runtime, into a sequence, but i’m getting stuck on concepts: here’s what i’m generally envisioning, although i want to use your cool .map() chaining method
Copy code
@op
def return_b(arg):
    return arg + "b"
def return_a(arg):
    return arg + "a"


def c(init_arg='hello', fn=(return_a, return_b)):
    next_step = None
    for next_step in fn:
        prev_step=next_step or init_arg
        next_step(prev_step)
        
# expected_output = return_a(return_b('hello'))
and here’s my sad attempt at creating an op_factory that’ll give me an iterable with the functions, although how i’ll string it together in a job is beyond me atm
Copy code
def op_factory(steps) -> OpDefinition:

    for step in steps:
        if step[:6] == 'import':
            @op(name=step, **kw)
            def importer():
                print("import stuff")
            yield importer

        elif step == 'parquet':
            @op(name=step, **kw)
            def to_parquet():
                print("transform stuff")
            yield to_parquet
p
Hi Solaris. We support dynamic graphs that can duplicate steps of work horizontally, but not ones that sequentially chain. We need to be able to determine the depth of the graph at definition time (this is how we can show you the shape of the graph without the actual run config), where it seems like you’re looking for a way to dynamically generate the graph at runtime.
We do have users who programmatically generate jobs, but this is based off of some schema that is not varied with run config. Here, for example, is a case where a graph is constructed from a YAML file: https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#constructing-dependencies
s
i’m not sure DynamicOutput would work since that spawns parallel processes, whereas i need i/o callable chaining @prha based on your statement, are you saying i couldnt even use examples like this, similar to my first pseudocode? bc it wouldnt work with the UI and other fundamental aspecfs of dagster atm?
i have as sensor that will kick off these jobs, and the sensor will pass all of the <!subteam^S029W40435L|@ops> that need to run in order at that point in time). so at that point i could feed in a graph, like your example?
p
Yes, that’s right. The dependency on the UI is less important than the fact that the shape of the DAG is determined at definition time.
s
also, i could still arbitrarily chain the fns within ONE op right?
p
In Dagster, jobs are standalone and can be kicked off either via backfills, schedules, sensors, or manually.
Yes, you could chain functions with one op
The compute function of an op is just pure python and you can execute whatever you want within it.
s
ok. i’m not familiar with graphs, would i use the sensor to construct the graph definition and then return it or? my sensor only yields a RunRequest right now (which it should…)
p
Sensors create run requests, which provide run config to kick off runs of a specific job. Your repository (which defines the sensor) should also contain a job definition. This job definition is composed of your op or ops.
s
seems like i can’t dynamically generate the graph definition from a sensor then?
p
That’s right. Those are separate concepts, both of which must have definitions within a repository.
s
sry, still learning over here. thanks for the guide.
@prha ok. so i went thru guides on GraphDefinition/dependency construction and repo construction using dyamically-generated yamls. is it possible to have a sensor in a repo that runs on a reg interval to pull from a job config db, construct the yamls which get picked up by the repo to return the job definitions? or is it not possible with sensors, because, as you said above, both sensors and graphs must have definitions already defined within a repo? i am also looking at this example, but I am not sure if the repo gets refreshed frequently if at all, so that any refreshes to your config db wouldnt get propagated to the dynamic jobs.
p
It is not possible with sensors, since they must already be defined within the repo.
1
s
can you speak on the stack overflow idea? does the function in their repo example automagically pick up changes from an external config db? just want to make sure i go down the right path for this undertaking
p
Yes, this does pick up changes from the external db. The code snippet will query that database whenever the repository is loaded.
s
i’ve been thinking instead of the sensor idea, an external cloud trigger can, via graphql, run dagster jobs. combining that with the repo code to pull external configs, will the repo be reloaded upon each job call from the cloud trigger? i am worried the repo wont reload the graph defs based on chris’s answer
p
You are correct, the repo will not automatically reload defs upon the call to launch a job. You can probably force-reload the repo though when you know that the graph shape has changed.
s
one more try - if we had config events (ie. updates to metadata) trigger a graphql request to reload the repo, and the repo code to lazy load sensors and jobs, would that work? i’m hoping the sensors/jobs would run uninterrupted, and update only when the repo gets updated.
🤞🏻 1
p
I’m not sure about the interaction between repo / sensor. I believe that in the sensor daemon, the workspace is loaded around every 60 seconds. This is completely independent of whether or not there has been a graphql request to reload the repo. It’s a little odd to have the repo be lazily loaded and then have sensors / jobs be recalculated. You may lose out on a number of features that rely on the structure of the job being fixed/known, but it is possible to orchestrate this level of dynamism.