I'm running through this example of submitting dag...
# ask-community
j
I'm running through this example of submitting dagster jobs to emr: https://docs.dagster.io/integrations/spark#submitting-pyspark-ops-on-emr The job runs fine from the dagit UI, however when I try to run the job from a python file (eg.
make_and_filter_data_emr.execute_in_process()
) I get the following error:
Copy code
def inst_param(
    obj: T, param_name: str, ttype: TypeOrTupleOfTypes, additional_message: Optional[str] = None
  ) -> T:
    if not isinstance(obj, ttype):
>     raise _param_type_mismatch_exception(
        obj, ttype, param_name, additional_message=additional_message
      )
E     dagster.check.ParameterCheckError: Param "recon_pipeline" is not a ReconstructablePipeline. Got <dagster.core.definitions.pipeline_base.InMemoryPipeline object at 0x11675f0a0> which is type <class 'dagster.core.definitions.pipeline_base.InMemoryPipeline'>.

../../../../opt/miniconda3/envs/nebulon/lib/python3.9/site-packages/dagster/check/__init__.py:505: ParameterCheckError
I've tried converting the job into a reconstructable pipeline as indicated, but that doesn't seem to be helping. Any idea what's going on here?
z
I ran into similar issues trying to use .execute_in_process() to test the pyspark databricks launcher, and I might be wrong but I think it's a limitation of mixing the in-process executor used by .execute_in_process() and a step launcher that runs steps remotely like the emr_pyspark_step_launcher. Which vaguely makes sense as the in-process executor is supposed to execute all steps linearly in a single process, so might not be equipped to be distributed to a remote process by a remote step launcher. If you need to test the job using .execute_in_process() you might have to swap out the emr_pyspark_step_launcher for a default step launcher - although it's a bit unclear to me what step launcher (or if one exists) one would use as this part of the API seems to be a little under-documented. if this is possible I'd love to know the solution as well.
👍 1
s
@Zach you're exactly right. For testing without the emr_pyspark_step_launcher, you can use
none_resource
.
Copy code
from dagster import ResourceDefinition

x.execute_in_process(resource_defs={"pyspark_step_launcher": ResourceDefinition.none_resource()})
j
Thanks for confirming @sandy. Do you have any recommendations on running the job from a python file if I wanted to keep the emr_pyspark_step_launcher?
s
There's a legacy API that allows you to do this:
Copy code
from dagster import job, reconstructable, execute_pipeline

@job(...)
def my_job():
    ...

execute_pipeline(reconstructable(my_job))
The reason we've been hesitant to support it is that, due to the complexities of reproducing jobs in remote processes, it only works if your job is defined using the
@job
decorator. E.g. the following would not work:
Copy code
from dagster import job, reconstructable, execute_pipeline

@graph(...)
def my_graph():
    ...

my_job = my_graph.to_job(...)
execute_pipeline(reconstructable(my_job))
another approach would be to have a Python process invoke the command line interface to execute your job