Sterling Paramore
07/29/2022, 10:37 PMSterling Paramore
07/29/2022, 10:40 PMgetattr(sys.modules[__name__], 'my_op_from_factory')
? Seems there there would be a dagster-ish way to do this.Adam Bloom
07/29/2022, 10:45 PMschrockn
07/29/2022, 10:51 PMfrom dagster import job, op
def get_op():
@op
def an_op():
pass
return an_op
@job
def a_job():
get_op()()
schrockn
07/29/2022, 10:51 PMSterling Paramore
07/29/2022, 10:59 PMget_state -> run -> save_state
, but will eventually be parameterized with different arguments.
def meltano_get_state_factory(name='not defined op', ins=None, **kwargs):
@op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
def my_inner_op(**kwargs):
return f'meltano get state for {name}'
return my_inner_op
def meltano_run_factory(name='not defined op', ins=None, **kwargs):
@op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
def my_inner_op(**kwargs):
return f'meltano run for {name}'
return my_inner_op
def meltano_save_state_factory(name='not defined op', ins=None, **kwargs):
@op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
def my_inner_op(**kwargs):
return f'meltano save state for {name}'
return my_inner_op
def meltano_graph_factory(name='not defined graph', **kwargs):
@graph(name=name)
def my_inner_graph(**kwargs):
get_state = meltano_get_state_factory(f'{name}_get_state')
run = meltano_run_factory(f'{name}_run')
save_state = meltano_save_state_factory(f'{name}_save_state')
return save_state(
run(
get_state()
)
)
return my_inner_graph
@job
def meltano_etahub_to_bigquery_job():
meltano_graph_factory('etahub_to_bigquery')()
@job
def meltano_msehub_to_bigquery_job():
meltano_graph_factory('msehub_to_bigquery')()
Sterling Paramore
07/29/2022, 10:59 PMschrockn
07/29/2022, 11:03 PMowen
07/29/2022, 11:15 PM"Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually."
. It feels like the "name" is the only bit that will change between these commands, so having a single resource that will supply that shared configuration to each of the ops will end up fairly clean:
@op(ins={"start": In(Nothing)}, required_resource_keys={"meltano_name"})
def meltano_run(context):
name = context.resources.meltano_name
# run command based off of name
# ... same thing for meltano_save_state and meltano_get_state
@graph
def meltano_graph():
meltano_save_state(start=meltano_run(start=meltano_get_state()))
job_1 = meltano_graph.to_job(
resource_defs={"meltano_name": ResourceDefinition.hardcoded_resource("etahub_to_bigquery")}
)
job_1 = meltano_graph.to_job(
resource_defs={"meltano_name": ResourceDefinition.hardcoded_resource("msehub_to_bigquery")}
)
Sterling Paramore
07/29/2022, 11:22 PMSterling Paramore
07/30/2022, 12:27 AMetahub_to_bigquery
and msehub_to_bigquery
, but I’d like to create a third job which runs the graphs behind these two jobs in parallel and then when they both complete, run another graph.
But it looks like the graphs can only use a resource when they’re part of a job, and jobs cannot be composed of other jobs….Sterling Paramore
07/30/2022, 12:39 AMowen
07/30/2022, 12:40 AMowen
07/30/2022, 12:40 AMowen
07/30/2022, 12:41 AMSterling Paramore
07/30/2022, 12:42 AMowen
07/30/2022, 12:43 AMSterling Paramore
08/02/2022, 12:19 AM