Pedro Moranga
03/18/2020, 12:37 PMAuster Cid
03/18/2020, 2:17 PMnate
03/18/2020, 2:53 PM@pyspark_solid
to use EMR via a ModeDef that includes the EMR resource ModeDefinition('prod', resource_defs={'pyspark': emr_pyspark_resource})
Pedro Moranga
03/18/2020, 4:47 PMnate
03/19/2020, 5:20 PMEmrJobRunner
https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/emr/emr.py to ship work to EMR.
It’s an open task on our end to refresh our Scala/Spark APIs, especially for EMR. But in the interim, you can either manually construct an EMR solid by instantiating the EmrRunJobFlowSolidDefinition
class, but probably better to just implement your own solid for now—something like:
@solid(
config={
'aws_region': Field(String, is_required=False),
'job_config': define_emr_run_job_flow_config(),
},
input_defs=[InputDefinition(_START, Nothing)],
output_defs=[OutputDefinition(String)],
)
def emr_solid(context):
poll_interval_sec = 5
max_wait_time_sec = 3600
job_runner = EmrJobRunner(region=context.solid_config.get('aws_region'))
cluster_id = job_runner.run_job_flow(context, context.solid_config['job_config'])
<http://context.log.info|context.log.info>('waiting for EMR cluster job flow completion...')
max_iter = int(math.ceil(max_wait_time_sec / float(poll_interval_sec)))
done = False
curr_iter = 0
while not done and curr_iter < max_iter:
# This will take a while... cluster creation usually > 5 minutes
time.sleep(poll_interval_sec)
cluster = job_runner.describe_cluster(cluster_id)['Cluster']
<http://context.log.info|context.log.info>('EMR cluster %s state: %s' % (cluster_id, cluster['Status']['State']))
done = cluster['Status']['State'] in ['TERMINATING', 'TERMINATED', 'TERMINATED_WITH_ERRORS', 'WAITING']
curr_iter += 1