https://dagster.io/ logo
#announcements
Title
# announcements
p

Pedro Moranga

03/18/2020, 12:37 PM
Hi everyone! I need to build a use case with Dagster to interact with EMR: spawning the cluster, adding steps to it, and shutting it down. I went trough the dagster_aws/emr code, but I could not find any example of usage of EMR. Can someone point me out to an example or the snippet of using the EmrRunJobFlowSolidDefinition ? thanks
a

Auster Cid

03/18/2020, 2:17 PM
Hi Pedro, not one of the Elementl guys, but there is a test using it here: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_solids.py Hope it helps
👍 1
n

nate

03/18/2020, 2:53 PM
you can configure a
@pyspark_solid
to use EMR via a ModeDef that includes the EMR resource
ModeDefinition('prod', resource_defs={'pyspark': emr_pyspark_resource})
a lot of the EMR work we’ve done is still rough / alpha quality so would love feedback as you dig in more!
p

Pedro Moranga

03/18/2020, 4:47 PM
Thank you @nate and @Auster Cid. @nate, We use the EMR to run Scala code on spark. I'll try to build it from the examples you guys gave me.
@nate Can you clarify for me the code related to the emr I saw the implementation of EmrRunJobFlowSolidDefinition, but not of something like "emrrunjob_solid" or something like this. How would I invoke EmrRunJobFlowSolidDefinition ? Or should I implement my own methods using standard @solid and on them call your primitives from class EmrJobRunner ?
n

nate

03/19/2020, 5:20 PM
hey @Pedro Moranga yes I filed https://github.com/dagster-io/dagster/issues/2298 - a miss on our part that this stuff isn’t documented yet. We’ve so far been primarily focused on getting the pyspark <> EMR experience, but the goal is to provide Dagster resources which support both pyspark and Scala spark workloads, and which in turn use the
EmrJobRunner
https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/emr/emr.py to ship work to EMR. It’s an open task on our end to refresh our Scala/Spark APIs, especially for EMR. But in the interim, you can either manually construct an EMR solid by instantiating the
EmrRunJobFlowSolidDefinition
class, but probably better to just implement your own solid for now—something like:
Copy code
@solid(
    config={
        'aws_region': Field(String, is_required=False),
        'job_config': define_emr_run_job_flow_config(),
    },
    input_defs=[InputDefinition(_START, Nothing)],
    output_defs=[OutputDefinition(String)],
)
def emr_solid(context):
    poll_interval_sec = 5
    max_wait_time_sec = 3600

    job_runner = EmrJobRunner(region=context.solid_config.get('aws_region'))
    cluster_id = job_runner.run_job_flow(context, context.solid_config['job_config'])
    <http://context.log.info|context.log.info>('waiting for EMR cluster job flow completion...')

    max_iter = int(math.ceil(max_wait_time_sec / float(poll_interval_sec)))

    done = False
    curr_iter = 0
    while not done and curr_iter < max_iter:
        # This will take a while... cluster creation usually > 5 minutes
        time.sleep(poll_interval_sec)

        cluster = job_runner.describe_cluster(cluster_id)['Cluster']
        <http://context.log.info|context.log.info>('EMR cluster %s state: %s' % (cluster_id, cluster['Status']['State']))
        done = cluster['Status']['State'] in ['TERMINATING', 'TERMINATED', 'TERMINATED_WITH_ERRORS', 'WAITING']
        curr_iter += 1
👍 1
3 Views