Hi! I am trying to create a data pipeline which do...
# ask-community
d
Hi! I am trying to create a data pipeline which does the following: 1. Launch an EMR Cluster 2. Add a step to that cluster I am attempting to launch the cluster using the EmrJobRunner that has a function called "run_job_flow" that returns an EMR cluster ID after launching a cluster. Then I am attempting to add a step to that cluster by using the emr_pyspark_step_launcher. The step launcher requires the Cluster ID as part of the resource definition in order to know on which cluster should a step be executed. I tried launching the step on to a pre-existing cluster and hard coding the cluster ID, this works as expected. However, I am having issues when trying to launch the cluster programatically, fetch the cluster ID, and pass it on as a resource definition. I am able to launch the cluster, but then executing the step is giving me the following error:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3".
N.B. I have added the necessary aws-java-sdk-bundle and hadoop-aws JAR files to my user deployment code and placed them in the SPARK_HOME. As I said, when simply hard code the cluster ID of an existing cluster and execute the step it works, so this error is not due to not having the right JARs. I do however believe that I am somehow messing up the resources and it leads to this issue. Has anyone tried to run a pipeline similar to this before? The EmrJobRunner class has a function which gets a cluster ID if you give it your cluster name. However, this is returning "None" when I run it in Dagster but if I open up a separate terminal and try it myself then I can see the cluster_id. Here is my code, I am writing it here because I think I am misusing the resources but I don't know what it is that's wrong. I am hoping someone can help, thanks in advance 🙂
from pathlib import Path
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.emr.emr import EmrJobRunner
from dagster_aws.s3 import s3_resource
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame
from custom_package import execute_transformation
from dagster import IOManager, graph, io_manager, op, resource, In, Nothing, Out
from utils.configs import get_emr_cluster_config
import logging
Class ParquetIOManager(IOManager):
def _get_path(self, context):
return "/".join(
[
context.resource_config["path_prefix"],
context.run_id,
context.step_key,
context.name,
]
)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
def load_input(self, context):
spark = context.resources.pyspark.spark_session
return spark.read.parquet(self._get_path(context.upstream_output))
@io_manager(required_resource_keys={"pyspark"}, config_schema={"path_prefix": str})
def parquet_io_manager():
return ParquetIOManager()
def launch_cluster(
emr: EmrJobRunner, log: logging.Logger, emr_config: dict
) -> None:
emr_config = get_emr_cluster_config(
release_label=emr_config["emr_release_label"],
cluster_name=emr_config["cluster_name"],
master_node_instance_type=emr_config["master_node_instance_type"],
worker_node_instance_type=emr_config["worker_node_instance_type"],
worker_node_instance_count=emr_config["worker_node_instance_count"],
ec2_subnet_id=emr_config["ec2_subnet_id"],
bid_price=emr_config["worker_node_spot_bid_price"],
)
return emr.run_job_flow(log=log, cluster_config=emr_config)
@resource
def emr_job_runner(init_context):
return EmrJobRunner(region="eu-central-1")
@resource(
config_schema={"cluster_name": str}, required_resource_keys={"emr_job_runner"}
)
def emr_cluster_id(init_context):
return init_context.resources.emr_job_runner.cluster_id_from_name(
cluster_name=init_context.resource_config["cluster_name"]
)
@resource(required_resource_keys={"emr_cluster_id"})
def pyspark_step_launcher(init_context):
return emr_pyspark_step_launcher.configured(
{
"cluster_id": init_context.resources.emr_cluster_id,
"local_pipeline_package_path": str(Path(__file__).parent.parent),
"deploy_local_pipeline_package": True,
"region_name": "eu-central-1",
"staging_bucket": "analytics-platform-dagster-emr-staging",
"wait_for_logs": True,
}
)
@op(
config_schema={
"emr_release_label": str,
"cluster_name": str,
"master_node_instance_type": str,
"worker_node_instance_type": str,
"worker_node_instance_count": int,
"ec2_subnet_id": str,
"worker_node_spot_bid_price": str,
},
required_resource_keys={"emr_job_runner"}
)
def launch_emr_cluster(context) -> None:
op_config = context.op_config
cluster_id = launch_cluster(
emr=context.resources.emr_job_runner, log=context.log, emr_config=op_config
)
<http://context.log.info|context.log.info>(f"CLUSTER ID: {cluster_id}") # this returns a cluster ID
@op(
ins={"start": In(Nothing)},
required_resource_keys={"pyspark", "pyspark_step_launcher", "emr_cluster_id"},
out=Out(io_manager_key="pyspark_io_manager")
)
def get_dataframe(context) -> DataFrame:
<http://context.log.info|context.log.info>(f"CLUSTER ID inside transformation step -----  {<http://context.log.info|context.log.info>(context.resources.emr_cluster_id)}") #this returns None
return execute_transformation(spark_session=context.resources.pyspark.spark_session)
@graph
def make_and_filter_data():
get_dataframe(launch_emr_cluster())
run_data_emr = <http://make_and_filter_data.to|make_and_filter_data.to>_job(
name="prod",
resource_defs={
"pyspark_step_launcher": pyspark_step_launcher,
"pyspark": pyspark_resource,
"s3": s3_resource.configured({"region_name": "eu-central-1"}),
"pyspark_io_manager": parquet_io_manager.configured(
{"path_prefix": "<s3://analytics-platform-dagster-emr-step-results>"}
),
"emr_job_runner": emr_job_runner,
"emr_cluster_id": emr_cluster_id
}
)