Alexander Buck
08/24/2023, 7:23 PMRunRequest
from a @sensor
is there any shortcut assigning a RunConfig to multiple (or all) assets in the job? So far I’m finding that I have to define
config = MyJobConfig(...)
yield RunRequest(run_config=RunConfig({
'asset1': config,
'asset2': config,
'asset3': config,
etc...
})
If this is an anti-pattern and I shouldn’t be passing config information to each asset like this that would be welcome feedback too.
It doesn’t appear that RunConfig({'all':config'})
works, but is there something similar to that?Tim Castillo
08/24/2023, 7:24 PMAlexander Buck
08/24/2023, 7:25 PMTim Castillo
08/24/2023, 7:34 PMConfigMapping
got an update when we updated our config system. let me see how it parallels.Alexander Buck
08/24/2023, 7:35 PMRunConfig
returned by simplified_config
would need to look like:
return RunConfig(
ops={
"do_something": DoSomethingConfig(config_param=val.simplified_param),
"do_something_else": DoSomethingConfig(config_param=val.simplified_param)
}
)
Is that right? And then the job config can just be simplified_config
and it will then actually pass along a larger RunConfig
that provides a config for each asset.Alexander Buck
08/24/2023, 7:35 PM@config_mapping
object insteadTim Castillo
08/24/2023, 7:36 PMAlexander Buck
08/24/2023, 7:37 PMops=
part, and it will be inferred to be ops because its the first positional input to RunConfig
? Thats what it looks like from the RunConfig function signature at least. The examples aren’t totally consistent I think.
return RunConfig(
{
"do_something": DoSomethingConfig(config_param=val.simplified_param),
"do_something_else": DoSomethingConfig(config_param=val.simplified_param)
}
)
Alexander Buck
08/24/2023, 7:38 PMAlexander Buck
08/24/2023, 7:47 PMsimplified_config
function with the @config_mapping
decorator, but I can’t figure out how to use that in my @sensor
RunRequest RunConfig.Tim Castillo
08/24/2023, 7:48 PMAlexander Buck
08/24/2023, 7:49 PMZach
08/25/2023, 12:40 AMops
dict argument in a loop by looping over your asset names. Or even in a dict comprehension.
{k:DoSomethingConfig(config_param=val.simplified_param) for k in asset_list}
Maxwell Joslyn
11/03/2023, 10:56 PMjob_x
or job_y
. Each of them uses a different (large) AssetSelection
, and thebut all the assets used in either job take a config object of the same class (with slightly different fields.) Calculation of the AssetSelection
for each job is easy if I just pick the last one in the chain and call upstream()
on it, but it's really tedious to spell each set of AssetKeys
out manually -- and because business requirements around these jobs are changing rapidly, I don't feel that manually spelling the asset names out is a long-term solution. All I want is for every asset in the job to use the exact same config object. I feel like this should be very easy... I find it hard to believe the only solution is to list them literally in the source.Maxwell Joslyn
11/03/2023, 11:02 PMConfigMapping,
but for the life of me I can't tell how it accomplishes anything other than a second level of indirection. The example in the documentation requires spelling out the name of ops[1], so not sure what the benefit is...
[1] see def simplified_config
here: https://docs.dagster.io/concepts/ops-jobs-graphs/op-jobs#config-mappingMaxwell Joslyn
11/03/2023, 11:03 PMMaxwell Joslyn
11/03/2023, 11:16 PMZach
11/03/2023, 11:17 PMZach
11/03/2023, 11:17 PMZach
11/03/2023, 11:18 PMMaxwell Joslyn
11/03/2023, 11:18 PMdatetime.datetime
. For job X, it gets set to `datetime.now()`For job Y, the calculation is a little complicated.Zach
11/03/2023, 11:19 PMMaxwell Joslyn
11/03/2023, 11:19 PMZach
11/03/2023, 11:19 PMMaxwell Joslyn
11/03/2023, 11:19 PMZach
11/03/2023, 11:20 PMMaxwell Joslyn
11/03/2023, 11:20 PMMaxwell Joslyn
11/03/2023, 11:21 PMZach
11/03/2023, 11:22 PMMaxwell Joslyn
11/03/2023, 11:23 PMMaxwell Joslyn
11/03/2023, 11:23 PMZach
11/03/2023, 11:25 PMZach
11/03/2023, 11:25 PMMaxwell Joslyn
11/03/2023, 11:27 PMZach
11/03/2023, 11:30 PMMaxwell Joslyn
11/03/2023, 11:43 PMError defining Dagster config class <class 'pipeline.assets.assets.DatetimeResource'> on field 'start_point'.
Unable to resolve config type <class 'datetime.datetime'> to a supported Dagster config type.
This config type can be a:
- Python primitive type
- int, float, bool, str, list
- A Python Dict or List type containing other valid types
- Custom data classes extending dagster.Config
- A Pydantic discriminated union type (<https://docs.pydantic.dev/usage/types/#discriminated-unions-aka-tagged-unions>)
Pydantic supports datetimes, but I guess Dagster only allows a subset of what Pydantic supports? That's okay. I'll just make the class member a string instead, and do the conversion from datetime to string at the call site of RunRequest
Maxwell Joslyn
11/03/2023, 11:44 PMConfigurableResource
code I wrote is just:
class DatetimeResource(ConfigurableResource):
start_point: dt.datetime = Field(
description="<snip>"
)
Maxwell Joslyn
11/03/2023, 11:44 PMZach
11/03/2023, 11:45 PMMaxwell Joslyn
11/03/2023, 11:46 PMMaxwell Joslyn
11/03/2023, 11:47 PMDB
11/04/2023, 3:08 PMclass Foo(ConfigurableResource):
date_str: str
def the_time(self) -> datetime:
return datetime.strptime(self.date_str, format)
@asset
def bar(foo_res: Foo):
print(foo_res.the_time())
format
could of course be a constant, another field on the Resource, a PrivateAttr, etc., and you could use it to validate date_str before you even start a run.Maxwell Joslyn
11/06/2023, 6:14 PMMaxwell Joslyn
11/06/2023, 6:22 PMdagster._check.ParameterCheckError: Invariant violation for parameter default_value. Description: required arguments should not specify default values
I'm getting that when I try to launch dagster dev
for testing purposes. Why? I don't think I am passing default values for anything: there is no default value for the argument in the asset function, for instance. Not sure where else Dagster is thinking it sees a default argumet for something.
Here is a minimal example:
# RESOURCE
class DatetimeResource(ConfigurableResource):
"""start_point is a stringifed datetime representing the beginning of a time interval (inclusive). Tells assets to operate on all data from this date, to the date on which the asset runs."""
start_point: str
# ASSET
@asset(
group_name="SHENANIGANS",
compute_kind="SQL",
)
def finalize_actions(foobar: DatetimeResource) -> None:
return
# JOB
finalize_actions_job = define_asset_job(
name="finalize_actions",
selection=AssetSelection.keys("finalize_actions"),
executor_def=multiprocess_executor,
)
# SCHEDULE
finalize_actions_daily = ScheduleDefinition(
job=finalize_actions_job,
cron_schedule="0 21 * * *",
execution_timezone="US/Eastern",
# required_resource_keys="foobar", # TODO needed?
run_config=RunConfig(
resources={
# must be named foobar because that's the name of the
# parameter which this job's assets take
"foobar": DatetimeResource(
start_point=str(dt.datetime.now().date())
),
},
),
)
Maxwell Joslyn
11/07/2023, 2:24 AMField(default=None)
for a Pydantic field (whether for a config or a configurable resource), a default will get set for you somewhere in the bowels of Dagster, which Dagster does not like. This feels like a bug: if I don't ask for a default, why am I getting one -- especially when Dagster then immediately rejects that default with an error? Was incredibly unintuitive and took me probably an hour or two to figure out.DB
11/07/2023, 7:19 PMMaxwell Joslyn
11/16/2023, 12:13 AM= Field()
on the start_point
member var definition.
2. move the config to the job rather than the schedule (with some other shenanigans relating to using a RunConfig object versus using a raw dictionary)