Stefan Adelbert
04/06/2022, 2:57 AMop
as one of its input?
I have an op
that works over a date range describer by start_date
and end_date
. The start and end dates are relative to today, but I would not like to bake the date range logic into the op
so that I can reuse it, i.e. I'd like to calculate the start and end dates outside of the op
and then pass those to the op
somehow.
I tried the following, but it failed with the error,
@dagster.op
def my_op(context, start_date, end_date):
<http://context.log.info|context.log.info>(f"{start_date} - {end_date}")
@dagster.job
def my_job():
start_date = datetime.date.today()
end_date = datetime.date.today() + datetime.timedelta(days=1)
In @job my_job, received invalid type <class 'datetime.date'> for input "start_date" (passed by keyword) in op invocation "my_op". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
I'm not surprised by this error - I understand that there is a composition step that happens where a job is "solidified", which is what the error message is referring to.
I see two options:
• I could have another op
calculate the start and end dates and pass those to my_op
• I could possibly pass start and end dates as config, although they probably wouldn't be dynamic, i.e. they wouldn't be recalculated on each execution of the job
Please help me to understand a pattern that would allow me to have several different jobs which could all reuse my_op
, each using different start and end dates that would be dynamic, i.e. recalculated each time the jobs run.claire
04/06/2022, 4:22 AM@op(ins={"dataframe": In(root_manager_key="my_root_manager")})
def my_op(dataframe):
"""Do some stuff"""
@root_input_manager
def table1_loader(_):
return read_dataframe_from_table(name="table1")
@job(resource_defs={"my_root_manager": table1_loader})
def my_job():
my_op()
@root_input_manager
def table2_loader(_):
return read_dataframe_from_table(name="table2")
@job(resource_defs={"my_root_manager": table2_loader})
def job_2():
my_op()
Then, each job can contain the same resource key that maps to a different root input manager that passes in a dynamic start date.Stefan Adelbert
04/06/2022, 5:59 AMroot_input_manager
for each relevant date range calculation which return (start_date, end_date)
. The op
that needs the date range would simply use the date_range
input (based on the root_manager_key
) and it would be up to the job to set the relevant root_input_manager
for the root_manager_key
. I realise this is pretty much exactly what you spelled out above. 👍
Could you help me to understand when the root_input_manager
compute_fn actually gets executed? If it behaves like a resource then it'll be at the time that the job is executed (as part of execution initialisation). Or is it executed lazily the first time it is needed?claire
04/06/2022, 6:51 AMroot_input_manager
behaves like a resource. The @root_input_manager
decorator yields a RootInputManagerDefinition
which is a ResourceDefinition
that produces a RootInputManager
.