Thomas Mignon
12/22/2021, 10:11 AMfrom dagster import fs_io_manager, job, Any
from dagster_dask import dask_executor
from pathlib import Path
from semaphore_scripts import template_runner
import os
from dagster import Nothing, In, op
@op
def adder_purified(purified_ftp: Any, purified_ftp2: Any, purified_eftp1: Any, purified_eftp2: Any, purified_httpvpublic: Any, purified_httpvpublicnew: Any) -> Any:
return purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew
def template_runner_op(
name="default_name",
ins=None,
**kwargs,
):
"""
Args:
args (any): One or more arguments used to generate the nwe op
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""
@op(
name=name,
ins=ins or {"start": In(Nothing)},
config_schema={
"command": str,
"command_parameter": str,
"semaphore_audience_version": str,
"workspace": str
},
required_resource_keys={"io_manager"},
**kwargs
)
def _template_runner_op(context):
workspace: Path = Path(
os.path.join(context.op_config["workspace"], f'{context.op_config["command"]}-{os.environ["PBS_JOBID"]}'))
template_runner.TemplateRunner(
command=context.op_config["command"],
version=context.op_config["semaphore_audience_version"],
command_parameter=context.op_config["command_parameter"],
workspace=workspace,
skip_signal=True
).run(exit=False)
return _template_runner_op
purify_ftp = template_runner_op(name="purify_ftp")
purify_ftp2 = template_runner_op(name="purify_ftp2")
purify_httpvpublic = template_runner_op(name="purify_httpvpublic")
purify_httpvpublicnew = template_runner_op(name="purify_httpvpublicnew")
purify_eftp1 = template_runner_op(name="purify_eftp1")
purify_eftp2 = template_runner_op(name="purify_eftp2")
partition = template_runner_op(name="partition")
event_session = template_runner_op(name="event_session")
#Business/Export
business_oceanography = template_runner_op(name="business_oceanography")
export_oceanography = template_runner_op(name="export_oceanography")
business_sextant = template_runner_op(name="business_sextant")
export_sextant = template_runner_op(name="export_sextant")
business_seanoe = template_runner_op(name="business_seanoe")
export_seanoe = template_runner_op(name="export_seanoe")
business_archimer = template_runner_op(name="business_archimer")
export_archimer = template_runner_op(name="export_archimer")
export_smoswind = template_runner_op(name="export_smoswind")
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
purified_ftp = purify_ftp()
purified_ftp2 = purify_ftp2()
purified_eftp1 = purify_eftp1()
purified_eftp2 = purify_eftp2()
purified_httpvpublic = purify_httpvpublic()
purified_httpvpublicnew = purify_httpvpublicnew()
purified = adder_purified(purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew)
event = event_session(partition(purified))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
This give me this graph ( first picture ), which seems like what i want even if we see the adder_purify op in the graph but this give me this error,
2021-12-22 10:02:09 - dagster - ERROR - template_runner_job - 911a185d-cb6a-4fef-ad6a-2ef75a28ac49 - 22690 - RUN_FAILURE - Execution of run for "template_runner_job" failed. An exception was thrown during execution.
dagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
The only way i found by now to run my graph is to modify template_runner_job
in order to have only sequentially not parallelized at the beginning :
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
event = event_session(partition(purify_ftp(purify_ftp2(purify_eftp1(purify_eftp2(purify_httpvpublic(purify_httpvpublicnew)))))))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
But this lead me to this graph ( second picture ) that is not what i wantowen
12/22/2021, 5:21 PM@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
event = event_session(partition(start=[purify_ftp(), purify_ftp2(), ... purify_httppublicnew()]))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
Thomas Mignon
12/30/2021, 11:04 AMdagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
maybe because ops are now in series instead as before when they was in parralel ?