Auster Cid
03/17/2020, 10:29 PM@pipeline
def crawlers_pipeline():
running_jobs = run_crawlers()
b = wait_for_jobs(running_jobs)
c = get_results(b)
write_results_to_s3(c)
@pipeline
def comparativos_pipeline():
running_jobs = run_crawlers()
for crawler in Crawlers:
a_wait_for_jobs = wait_for_jobs.alias("wait_for_{}".format(crawler))
a_get_results = get_results.alias("get_{}_results".format(crawler))
a_write_results_to_s3 = write_results_to_s3.alias("write_{}_results".format(crawler))
b = a_wait_for_jobs(running_jobs[crawler])
c = a_get_results(b)
a_write_results_to_s3(c)
alex
03/17/2020, 10:50 PMAuster Cid
03/17/2020, 10:53 PMalex
03/17/2020, 11:14 PMOutputDefinitions
for each named job and yielding Output(job, 'job_name')
for each job
* setting is_required=False
on OutputDefinition
to be able to turn off a job by not yielding the Output
(toggled via config)
* in the composition function - we return a namedtuple for multiple outputs - you will interact with that to wire up the named job output to its aliased downstream dag@composite_solid
def handle_job(job_spec: CrawlerJob):
b = wait_for_jobs(job_spec)
c = get_results(b)
write_results_to_s3(c)
def generate_crawlers_pipeline():
job_specs = get_jobs() # dict[str, job]
outputs_defs = [
OutputDefinition(CrawlerJob, name, is_required=False)
for name in job_specs.keys()
]
@solid(
config={'skip': Field([str], is_required=False, default_value=[])}
output_defs=output_defs
)
def run_crawlers(context):
for name, job in job_specs:
if name not in context.solid_config['skip']:
job = do_work(job)
yield Output(job, name)
@pipeline
def crawlers_pipeline():
outputs = run_crawlers()
# convert namedtuple to dict for iteration
# might be better way to do this
for name, handle for outputs._asdict():
handle_job.alias('handle_{}'.format(name))(job_spec=handle)
return crawlers_pipeline