When creating a `RunRequest` from a `@sensor` is t...
# ask-community
a
When creating a
RunRequest
from a
@sensor
is there any shortcut assigning a RunConfig to multiple (or all) assets in the job? So far I’m finding that I have to define
Copy code
config = MyJobConfig(...)
yield RunRequest(run_config=RunConfig({
    'asset1': config,
    'asset2': config,
    'asset3': config,
    etc...
})
If this is an anti-pattern and I shouldn’t be passing config information to each asset like this that would be welcome feedback too. It doesn’t appear that
RunConfig({'all':config'})
works, but is there something similar to that?
t
The example uses ops, but have you seen our docs on ConfigMapping?
a
I hadn’t found that specific part of the docs. I’d read the API for ConfigMapping and I couldn’t make heads or tails of it tbh. I can try again and come back with more specific questions
t
I'm gonna acknowledge that I don't think
ConfigMapping
got an update when we updated our config system. let me see how it parallels.
a
Okay, if I’m reading that example correctly, IF that example was extended to have multiple ops, then the
RunConfig
returned by
simplified_config
would need to look like:
Copy code
return RunConfig(
        ops={
            "do_something": DoSomethingConfig(config_param=val.simplified_param),
            "do_something_else": DoSomethingConfig(config_param=val.simplified_param)
        }
    )
Is that right? And then the job config can just be
simplified_config
and it will then actually pass along a larger
RunConfig
that provides a config for each asset.
You still have to write the config for each asset, but you can do it once in a
@config_mapping
object instead
t
Ah, you're right. my bad. misread the question. let me see what else we have
a
And I could leave out the
ops=
part, and it will be inferred to be ops because its the first positional input to
RunConfig
? Thats what it looks like from the RunConfig function signature at least. The examples aren’t totally consistent I think.
Copy code
return RunConfig(
        {
            "do_something": DoSomethingConfig(config_param=val.simplified_param),
            "do_something_else": DoSomethingConfig(config_param=val.simplified_param)
        }
    )
no worries @Tim Castillo, this is really helpful already!
oh actually, now that I’m trying to use this, I’m not sure how to do it with the pythonic configs. So I’ve created a
simplified_config
function with the
@config_mapping
decorator, but I can’t figure out how to use that in my
@sensor
RunRequest RunConfig.
t
Yeah, that's what I was looking into...not sure what translates or is relevant.
a
Its looking like its just easier to write out all the assets at this point. 🤷 . Its adding a fair bit of boilerplate overhead to map it out, and I still have to write them all out anyways. The only benefit was I could define the mapping in my assets.py where the asset names were easily readable. Now i’ll just have to pop back and forth. oh well
z
You could also just make the
ops
dict argument in a loop by looping over your asset names. Or even in a dict comprehension.
{k:DoSomethingConfig(config_param=val.simplified_param) for k in asset_list}
💯 1
m
I'm facing this issue too. I have a sensor that create a run request for either
job_x
or
job_y
. Each of them uses a different (large)
AssetSelection
, and thebut all the assets used in either job take a config object of the same class (with slightly different fields.) Calculation of the
AssetSelection
for each job is easy if I just pick the last one in the chain and call
upstream()
on it, but it's really tedious to spell each set of
AssetKeys
out manually -- and because business requirements around these jobs are changing rapidly, I don't feel that manually spelling the asset names out is a long-term solution. All I want is for every asset in the job to use the exact same config object. I feel like this should be very easy... I find it hard to believe the only solution is to list them literally in the source.
I've tried to understand the use of
ConfigMapping,
but for the life of me I can't tell how it accomplishes anything other than a second level of indirection. The example in the documentation requires spelling out the name of ops[1], so not sure what the benefit is... [1] see
def simplified_config
here: https://docs.dagster.io/concepts/ops-jobs-graphs/op-jobs#config-mapping
Hopefully, I'm just missing something in the documentation 🙂
Thanks for the reply. Oh man, you deleted it?
z
It wasn't quite correct for your scenario
Are the values of the config object different for the different assets in the job?
If so, you might consider using a ConfigurableResource as a global config object for the job so you don't have to specify config separately for each asset
m
RE values of object: Yes, different. The config object only has one field, which is a
datetime.datetime
. For job X, it gets set to `datetime.now()`For job Y, the calculation is a little complicated.
z
But all the assets in the job receive the same datetime?
m
yep
z
So the difference in config is really just from job to job
m
That's right
z
In which case I would use a ConfigurableResource to model this config, that way you only have to set the config in one place on the RunRequest, and not enter it specifically for each asset / op
m
Okay, thanks very much. I'll go check out the documentation and try that out.
That should also be good practice for using resources in general 🙂
z
Yeah this is a relatively common pattern for setting "global" / shared configuration across assets / ops in a job.
m
Looks like this example shows a typical setup: https://docs.dagster.io/concepts/resources#with-ops-and-jobs
Oh, except that there the resource is at the top level definition and not per job. I'm sure I can set them per job though
z
Yeah you configure resources at the Definition level, and then Dagster figures out what asset needs what resource based on type hints. I think you'll need to use configure_at_launch to defer actually setting the configuration until your sensor fires
m
Great, thanks a lot, taht's plenty for me to go on.
z
No problem, may all your job runs be green and your data error free
m
It seems as if a datetime is not a valid member for a class that inherits from ConfigurableResource:
Copy code
Error defining Dagster config class <class 'pipeline.assets.assets.DatetimeResource'> on field 'start_point'.
Unable to resolve config type <class 'datetime.datetime'> to a supported Dagster config type.


This config type can be a:
    - Python primitive type
        - int, float, bool, str, list
    - A Python Dict or List type containing other valid types
    - Custom data classes extending dagster.Config
    - A Pydantic discriminated union type (<https://docs.pydantic.dev/usage/types/#discriminated-unions-aka-tagged-unions>)
Pydantic supports datetimes, but I guess Dagster only allows a subset of what Pydantic supports? That's okay. I'll just make the class member a string instead, and do the conversion from datetime to string at the call site of
RunRequest
The
ConfigurableResource
code I wrote is just:
Copy code
class DatetimeResource(ConfigurableResource):
    start_point: dt.datetime = Field(
        description="<snip>"
    )
(ignore silly naming pls)
z
Yeah you'll have to use a string. The config types that you put in a Config or ConfigurableResource have to be something that can reasonably be rendered in yaml so that it can be configured in the launchpad. A datetime object has no obvious way to be encoded in a yaml form
m
ah, gotcha. thanks again
🎉 2
dagster yay 1
Oh hey, so that means I could change these in the launchpad and run a job with an ad hoc value for this resource's field, huh? My boss is going to love that!
d
@Maxwell Joslyn In terms of getting a proper datetime-object from your Resource, you can always do this:
Copy code
class Foo(ConfigurableResource):
  date_str: str
  
  def the_time(self) -> datetime:
    return datetime.strptime(self.date_str, format)
 
@asset
def bar(foo_res: Foo):
  print(foo_res.the_time())
format
could of course be a constant, another field on the Resource, a PrivateAttr, etc., and you could use it to validate date_str before you even start a run.
m
@DB thats a good point, thanks DB!
I am trying to make a ConfigurableResource to do what I need WRT passing the same resource (just a dumb bag of data) to every asset in a job. @Zach was very helpful last week and I think this approach will work, but I'm having trouble at the last step. Observe:
dagster._check.ParameterCheckError: Invariant violation for parameter default_value. Description: required arguments should not specify default values
I'm getting that when I try to launch
dagster dev
for testing purposes. Why? I don't think I am passing default values for anything: there is no default value for the argument in the asset function, for instance. Not sure where else Dagster is thinking it sees a default argumet for something. Here is a minimal example:
Copy code
# RESOURCE
class DatetimeResource(ConfigurableResource):
    """start_point is a stringifed datetime representing the beginning of a time interval (inclusive). Tells assets to operate on all data from this date, to the date on which the asset runs."""
    start_point: str


# ASSET
@asset(
    group_name="SHENANIGANS",
    compute_kind="SQL",
)
def finalize_actions(foobar: DatetimeResource) -> None:
    return


# JOB
finalize_actions_job = define_asset_job(
    name="finalize_actions",
    selection=AssetSelection.keys("finalize_actions"),
    executor_def=multiprocess_executor,
)

# SCHEDULE
finalize_actions_daily = ScheduleDefinition(
    job=finalize_actions_job,
    cron_schedule="0 21 * * *",
    execution_timezone="US/Eastern",
    # required_resource_keys="foobar", # TODO needed?
    run_config=RunConfig(
        resources={
            # must be named foobar because that's the name of the
            # parameter which this job's assets take
            "foobar": DatetimeResource(
                start_point=str(dt.datetime.now().date())
            ),
        },
    ),
)
I think I figured it out. At least, now Dagster starts up without issues... If you don't explicitly use
Field(default=None)
for a Pydantic field (whether for a config or a configurable resource), a default will get set for you somewhere in the bowels of Dagster, which Dagster does not like. This feels like a bug: if I don't ask for a default, why am I getting one -- especially when Dagster then immediately rejects that default with an error? Was incredibly unintuitive and took me probably an hour or two to figure out.
d
I always use resources without Field(), and it works fine (see my post regarding getting a datetime from resource). In fact, I actively avoid setting defaults, because resources with defaults don't make the Launchpad appear when you materialize assets, and I usually do want that to happen. Does your example from above produce an error or not? It looks fine to me (apart from the job name colliding with the asset name, and the missing "Definitions" object, which you need if you have resources).
m
@DB Thanks for the help, sorry I didn't see it faster. You are right that not setting defaults is useful because of making the launchpad appear! The example above, when furnished with a definitions object, as you say, does indeed produce the error ``dagster._check.ParameterCheckError: Invariant violation for parameter default_value. Description: required arguments should not specify default values` The example above will work if I: 1. use
= Field()
on the
start_point
member var definition. 2. move the config to the job rather than the schedule (with some other shenanigans relating to using a RunConfig object versus using a raw dictionary)