Pedro Moranga

03/18/2020, 12:37 PM
Hi everyone! I need to build a use case with Dagster to interact with EMR: spawning the cluster, adding steps to it, and shutting it down. I went trough the dagster_aws/emr code, but I could not find any example of usage of EMR. Can someone point me out to an example or the snippet of using the EmrRunJobFlowSolidDefinition ? thanks

Auster Cid

03/18/2020, 2:17 PM
Hi Pedro, not one of the Elementl guys, but there is a test using it here: Hope it helps
03/18/2020, 2:53 PM
you can configure a
to use EMR via a ModeDef that includes the EMR resource
ModeDefinition('prod', resource_defs={'pyspark': emr_pyspark_resource})
a lot of the EMR work we’ve done is still rough / alpha quality so would love feedback as you dig in more!

Pedro Moranga

03/18/2020, 4:47 PM
Thank you @nate and @Auster Cid. @nate, We use the EMR to run Scala code on spark. I'll try to build it from the examples you guys gave me.
@nate Can you clarify for me the code related to the emr I saw the implementation of EmrRunJobFlowSolidDefinition, but not of something like "emrrunjob_solid" or something like this. How would I invoke EmrRunJobFlowSolidDefinition ? Or should I implement my own methods using standard @solid and on them call your primitives from class EmrJobRunner ?


03/19/2020, 5:20 PM
hey @Pedro Moranga yes I filed - a miss on our part that this stuff isn’t documented yet. We’ve so far been primarily focused on getting the pyspark <> EMR experience, but the goal is to provide Dagster resources which support both pyspark and Scala spark workloads, and which in turn use the
EmrJobRunner to ship work to EMR. It’s an open task on our end to refresh our Scala/Spark APIs, especially for EMR. But in the interim, you can either manually construct an EMR solid by instantiating the
class, but probably better to just implement your own solid for now—something like:
        'aws_region': Field(String, is_required=False),
        'job_config': define_emr_run_job_flow_config(),
    input_defs=[InputDefinition(_START, Nothing)],
def emr_solid(context):
    poll_interval_sec = 5
    max_wait_time_sec = 3600

    job_runner = EmrJobRunner(region=context.solid_config.get('aws_region'))
    cluster_id = job_runner.run_job_flow(context, context.solid_config['job_config'])
    <|>('waiting for EMR cluster job flow completion...')

    max_iter = int(math.ceil(max_wait_time_sec / float(poll_interval_sec)))

    done = False
    curr_iter = 0
    while not done and curr_iter < max_iter:
        # This will take a while... cluster creation usually > 5 minutes

        cluster = job_runner.describe_cluster(cluster_id)['Cluster']
        <|>('EMR cluster %s state: %s' % (cluster_id, cluster['Status']['State']))
        done = cluster['Status']['State'] in ['TERMINATING', 'TERMINATED', 'TERMINATED_WITH_ERRORS', 'WAITING']
        curr_iter += 1
