Hi again guys, I've got a question on pipelines to...
# announcements
a
Hi again guys, I've got a question on pipelines today.
I've got a pipeline that looks like this:
Copy code
@pipeline
def crawlers_pipeline():
    running_jobs = run_crawlers()
    b = wait_for_jobs(running_jobs)
    c = get_results(b)
    write_results_to_s3(c)
run_crawlers is a solid that starts several crawler jobs using an API, and returns a dict containing objects used in the following steps to access job status and results
My problem is that some of the crawlers take a lot longer to run than the others, and wait_for_jobs has to wait for them all to finish
I'm wondering if I could use solid aliases to "branch out" out of run_crawlers and execute the following solids once for each of the crawlers
something like:
Copy code
@pipeline
def comparativos_pipeline():
    running_jobs = run_crawlers()
    for crawler in Crawlers:
        a_wait_for_jobs = wait_for_jobs.alias("wait_for_{}".format(crawler))
        a_get_results = get_results.alias("get_{}_results".format(crawler))
        a_write_results_to_s3 = write_results_to_s3.alias("write_{}_results".format(crawler))
        b = a_wait_for_jobs(running_jobs[crawler])
        c = a_get_results(b)
        a_write_results_to_s3(c)
Is this possible? I'm wondering how to yield and "unpack" the results from run_crawlers
a
ya we dont have generic map/reduce support yet but you can set up a fixed width parallelism set up - here is a toy example https://github.com/dagster-io/dagster/blob/master/examples/dagster_examples/toys/sleepy.py
do you know how many jobs are going to happen at pipeline creation time? you could also just dynamically generate a pipeline that fans out for every expected job
a
Ah yes, that should work perfectly! Thank you!
The jobs are known at creation time and yes, I'm attempting to generate it dynamically
With maybe the possibility of turning jobs on and off via config
a
things to look at if you want to do more generation: * creating
OutputDefinitions
for each named job and yielding
Output(job, 'job_name')
for each job * setting
is_required=False
on
OutputDefinition
to be able to turn off a job by not yielding the
Output
(toggled via config) * in the composition function - we return a namedtuple for multiple outputs - you will interact with that to wire up the named job output to its aliased downstream dag
Copy code
@composite_solid
def handle_job(job_spec: CrawlerJob):
    b = wait_for_jobs(job_spec)
    c = get_results(b)
    write_results_to_s3(c)


def generate_crawlers_pipeline():
    job_specs = get_jobs() # dict[str, job]
    
    outputs_defs = [
        OutputDefinition(CrawlerJob, name, is_required=False)
        for name in job_specs.keys()
    ]

    @solid(
        config={'skip': Field([str], is_required=False, default_value=[])}
        output_defs=output_defs
    )
    def run_crawlers(context):
        for name, job in job_specs:
            if name not in context.solid_config['skip']:
                job = do_work(job)
                yield Output(job, name)

    @pipeline
    def crawlers_pipeline():
        outputs = run_crawlers()
        
        # convert namedtuple to dict for iteration
        # might be better way to do this
        for name, handle for outputs._asdict():
            handle_job.alias('handle_{}'.format(name))(job_spec=handle)

    return crawlers_pipeline