https://dagster.io/ logo
Title
p

pdpark

07/29/2021, 6:37 PM
Calling a function in a repo definition that appears to be called multiple times?
@repository
def prod_repo():
    run_config = get_params()
    return [
        my_graph.to_job(
            config={
                "solids": {
                    "do_stuff": {
                        "config": {
                            "a_param": run_config.get("a_param")
                        }
                    }
                }
            }
        )
The op
do_stuff
is a “mapped” op downstream from a`DynamicOutputDefinition` op.
dynamic_output_op().map(do_stuff)
Each time
do_stuff
is called
a_param
is different - I was expecting the value to be the same…?
a

alex

07/29/2021, 6:46 PM
so jobs default to multiprocess and each process has to get to the code it needs to execute which it does by going grabbing the job out of the repository what is
get_params
?
p

pdpark

07/29/2021, 6:48 PM
I see. It’s a function that returns a dictionary of params I want to pass as config values to several ops/solids - I want them to all get the same values. The params are dynamic and will be different from one invocation to the next: one is based on the current date/time, for instance.
a

alex

07/29/2021, 6:51 PM
how are you executing? One way to to do this would be to fire generation once when you submit the run, but that may be cumbersome from dagit
p

pdpark

07/29/2021, 6:53 PM
currently just using dagster cli but eventually want to schedule or trigger the runs probably through dagit
a

alex

07/29/2021, 6:56 PM
ah this is pretty tricky This pattern would work from a schedule - if you fired this
get_params
to get the run_config that the schedule submits I think this would actually work from dagit as well - since there it would take this blob config - load it in to the editor - and then submit that explicit version I bet in the dagster CLI it would work too if you did
--preset default
which would cause the same “submit explicit copy of config” behavior You are currently getting burned since the jobs “default config” when no explicit run config is provided is part of its in memory definition and is changing as its reloaded in each process
p

pdpark

07/29/2021, 7:02 PM
I haven’t tried schedules or running in dagit yet, just dagster cli
dagster pipeline launch \
   --pipeline {graph} \
   --workspace work_{env}/workspace.yaml
would I just add
--preset default
to this command?
a

alex

07/29/2021, 7:05 PM
yea - though i would consider that sort of a workaround since that takes advantage of current implementation details
p

pdpark

07/29/2021, 7:06 PM
gotchya - so maybe I’ll have to generate a config file on the fly and call dagster cli with that?
a

alex

07/29/2021, 7:30 PM
that pattern should always work yea
😎 1
s

sandy

07/29/2021, 8:05 PM
another approach, which wouldn't require generating a file on the fly, would be to supply a config function to the job's config argument. e.g.
from dagster import ConfigMapping

def config_fn():
    current_time = datetime.now()
    return {
        "solids": {
            "do_stuff": {
                "config": {
                    "a_param": current_time
                }
            }
        }
    }

@repository
def prod_repo():
    return [
        my_graph.to_job(
            config=ConfigMapping(config_fn=config_fn)
        )
🎸 1
p

pdpark

07/29/2021, 8:06 PM
Nice! I’ll give that a try.
s

sandy

07/29/2021, 8:48 PM
@alex pointed out to me that, if you go the config function route, the function will get called in each step process, so that might be less desirable
👍 1
p

pdpark

07/30/2021, 1:55 PM
Generating a config file for the run to set the current date-time value to be used for all ops as a resource using
make_values_resource
. I’m only specifying the resource value in the config file, not any solid config values, like this:
resources:
    values:
        config:
            graph_run_dts: "20210730T111848.842Z"
Getting this error:
dagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvalidConfigError: Error in config mapping for pipeline part_graph mode default
    Error 1: Received unexpected config entry "resources" at the root.
In the repo I have several jobs defined (I’m using
graph
and
ops
) that require config values:
a_graph.to_job(config={...})
If I supply a config file, can I just include resources or do I have to include all the config values as well, which I’m currently setting in the
to_job
call?
a

alex

08/02/2021, 2:02 PM
hmm this may be a bug on our end, did you set
required_resource_keys
on the ops for the
values
resource?
p

pdpark

08/02/2021, 10:07 PM
I do have
required_resource_keys
, but I actually refactored this, so it’s not an issue for me now. I had some values I was passing to several of my `op`s - such as the env (prod, dev) - that I moved to to a
values
resource.
Simplified the code nicely, too