https://dagster.io/ logo
Title
t

Thomas Mignon

12/22/2021, 10:11 AM
Hi 🙂 I'm currently trying to do a graph with Job A,Job B, Job C, Job D, Job E ( they have to be parralelized at the same level and they produce no output but Job F need to be runned after ) -> Job F but with the doc My code looks like this for the moment
from dagster import fs_io_manager, job, Any
from dagster_dask import dask_executor
from pathlib import Path
from semaphore_scripts import template_runner
import os
from dagster import Nothing, In, op

@op
def adder_purified(purified_ftp: Any, purified_ftp2: Any, purified_eftp1: Any, purified_eftp2: Any, purified_httpvpublic: Any, purified_httpvpublicnew: Any) -> Any:
    return purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew

def template_runner_op(
    name="default_name",
    ins=None,
    **kwargs,
):
    """
    Args:
        args (any): One or more arguments used to generate the nwe op
        name (str): The name of the new op.
        ins (Dict[str, In]): Any Ins for the new op. Default: None.

    Returns:
        function: The new op.
    """

    @op(
        name=name,
        ins=ins or {"start": In(Nothing)},
        config_schema={
            "command": str,
            "command_parameter": str,
            "semaphore_audience_version": str,
            "workspace": str
        },
        required_resource_keys={"io_manager"},
        **kwargs
    )
    def _template_runner_op(context):
        workspace: Path = Path(
            os.path.join(context.op_config["workspace"], f'{context.op_config["command"]}-{os.environ["PBS_JOBID"]}'))

        template_runner.TemplateRunner(
            command=context.op_config["command"],
            version=context.op_config["semaphore_audience_version"],
            command_parameter=context.op_config["command_parameter"],
            workspace=workspace,
            skip_signal=True
        ).run(exit=False)

    return _template_runner_op

purify_ftp = template_runner_op(name="purify_ftp")
purify_ftp2 = template_runner_op(name="purify_ftp2")
purify_httpvpublic = template_runner_op(name="purify_httpvpublic")
purify_httpvpublicnew = template_runner_op(name="purify_httpvpublicnew")
purify_eftp1 = template_runner_op(name="purify_eftp1")
purify_eftp2 = template_runner_op(name="purify_eftp2")

partition = template_runner_op(name="partition")
event_session = template_runner_op(name="event_session")

#Business/Export
business_oceanography = template_runner_op(name="business_oceanography")
export_oceanography = template_runner_op(name="export_oceanography")

business_sextant = template_runner_op(name="business_sextant")
export_sextant = template_runner_op(name="export_sextant")

business_seanoe = template_runner_op(name="business_seanoe")
export_seanoe = template_runner_op(name="export_seanoe")

business_archimer = template_runner_op(name="business_archimer")
export_archimer = template_runner_op(name="export_archimer")

export_smoswind = template_runner_op(name="export_smoswind")

@job(
    name='template_runner_job',
    resource_defs={"io_manager": fs_io_manager},
    executor_def=dask_executor
)
def template_runner_job():
    purified_ftp = purify_ftp()
    purified_ftp2 = purify_ftp2()
    purified_eftp1 = purify_eftp1()
    purified_eftp2 = purify_eftp2()
    purified_httpvpublic = purify_httpvpublic()
    purified_httpvpublicnew = purify_httpvpublicnew()

    purified = adder_purified(purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew)

    event = event_session(partition(purified))

    export_oceanography(business_oceanography(event))
    export_sextant(business_sextant(event))
    export_seanoe(business_seanoe(event))
    export_archimer(business_archimer(event))
    export_smoswind(event)
This give me this graph ( first picture ), which seems like what i want even if we see the adder_purify op in the graph but this give me this error,
2021-12-22 10:02:09 - dagster - ERROR - template_runner_job - 911a185d-cb6a-4fef-ad6a-2ef75a28ac49 - 22690 - RUN_FAILURE - Execution of run for "template_runner_job" failed. An exception was thrown during execution.

dagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
The only way i found by now to run my graph is to modify
template_runner_job
in order to have only sequentially not parallelized at the beginning :
@job(
    name='template_runner_job',
    resource_defs={"io_manager": fs_io_manager},
    executor_def=dask_executor
)
def template_runner_job():
    event = event_session(partition(purify_ftp(purify_ftp2(purify_eftp1(purify_eftp2(purify_httpvpublic(purify_httpvpublicnew)))))))

    export_oceanography(business_oceanography(event))
    export_sextant(business_sextant(event))
    export_seanoe(business_seanoe(event))
    export_archimer(business_archimer(event))
    export_smoswind(event)
But this lead me to this graph ( second picture ) that is not what i want
o

owen

12/22/2021, 5:21 PM
hi @Thomas Mignon -- not exactly sure why you're getting that template_runner_job not found error in one of those cases but not the other, but one thing to note is that you can pass a list of outputs to a Nothing-type input to encode "only start after all of these are available". So that would look something like:
@job(
    name='template_runner_job',
    resource_defs={"io_manager": fs_io_manager},
    executor_def=dask_executor
)
def template_runner_job():
    event = event_session(partition(start=[purify_ftp(), purify_ftp2(), ... purify_httppublicnew()]))

    export_oceanography(business_oceanography(event))
    export_sextant(business_sextant(event))
    export_seanoe(business_seanoe(event))
    export_archimer(business_archimer(event))
    export_smoswind(event)
not completely confident that this will solve your issue, but it's worth a shot (and is a simpler way to get the graph you want regardless)
t

Thomas Mignon

12/30/2021, 11:04 AM
Hi @owen, WIth your solution my graph looks like i wanted to do so it's much better 🙂 but when launching my pipeline i got this error :
dagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
maybe because ops are now in series instead as before when they was in parralel ?