Sundara Moorthy
04/25/2022, 12:25 PMdef example_graph(request_json):
graph_def = GraphDefinition(
name="step_isolated_job",
node_defs=get_op_list(proxy_json),
dependencies=get_dg(proxy_json)
)
return graph_def
request_json={"ops":{"node1":d}}
step_isolated_job = example_graph(request_json).to_job(
name="step_isolated_job",
resource_defs={
**RESOURCES_LOCAL
},
executor_def=k8s_job_executor,
)
daniel
04/25/2022, 1:14 PMSundara Moorthy
04/25/2022, 1:56 PMdaniel
04/25/2022, 2:00 PMSundara Moorthy
04/25/2022, 2:04 PMdaniel
04/25/2022, 2:15 PMSundara Moorthy
04/25/2022, 2:21 PM# create the op which submit the job
default_config = {"ops": {"do_something": {"config": {"config_param": "stuff"}}}}
def example_graph(request_json):
graph_def = GraphDefinition(
name="step_isolated_job",
node_defs=get_op_list(request_json),
dependencies=get_dg(request_json)
)
return graph_def
#config_schema=dict
@op(config_schema={"do_something": {} })
def graph_job(context):
# create the graph
<http://context.log.info|context.log.info>("starting the graph creation")
options=context.solid_config.get("data")
step_isolated_job = example_graph(options)
<http://context.log.info|context.log.info>(step_isolated_job)
<http://context.log.info|context.log.info>("stopped the graph creation")
return step_isolated_job
@job(config=default_config)
def simple_job():
graph_job().to_job(
name="step_isolated_job",
resource_defs={
**RESOURCES_LOCAL
},
executor_def=k8s_job_executor,
# config=pipeline_config
)
@repository
def example_repo():
return [simple_job()]
daniel
04/25/2022, 2:22 PMSundara Moorthy
04/25/2022, 2:42 PMdaniel
04/25/2022, 2:57 PM