I'm running through this example of submitting dag...
I'm running through this example of submitting dagster jobs to emr: https://docs.dagster.io/integrations/spark#submitting-pyspark-ops-on-emr The job runs fine from the dagit UI, however when I try to run the job from a python file (eg.
) I get the following error:
def inst_param(
    obj: T, param_name: str, ttype: TypeOrTupleOfTypes, additional_message: Optional[str] = None
  ) -> T:
    if not isinstance(obj, ttype):
>     raise _param_type_mismatch_exception(
        obj, ttype, param_name, additional_message=additional_message
E     dagster.check.ParameterCheckError: Param "recon_pipeline" is not a ReconstructablePipeline. Got <dagster.core.definitions.pipeline_base.InMemoryPipeline object at 0x11675f0a0> which is type <class 'dagster.core.definitions.pipeline_base.InMemoryPipeline'>.

../../../../opt/miniconda3/envs/nebulon/lib/python3.9/site-packages/dagster/check/__init__.py:505: ParameterCheckError
I've tried converting the job into a reconstructable pipeline as indicated, but that doesn't seem to be helping. Any idea what's going on here?
I ran into similar issues trying to use .execute_in_process() to test the pyspark databricks launcher, and I might be wrong but I think it's a limitation of mixing the in-process executor used by .execute_in_process() and a step launcher that runs steps remotely like the emr_pyspark_step_launcher. Which vaguely makes sense as the in-process executor is supposed to execute all steps linearly in a single process, so might not be equipped to be distributed to a remote process by a remote step launcher. If you need to test the job using .execute_in_process() you might have to swap out the emr_pyspark_step_launcher for a default step launcher - although it's a bit unclear to me what step launcher (or if one exists) one would use as this part of the API seems to be a little under-documented. if this is possible I'd love to know the solution as well.
@Zach you're exactly right. For testing without the emr_pyspark_step_launcher, you can use
from dagster import ResourceDefinition

x.execute_in_process(resource_defs={"pyspark_step_launcher": ResourceDefinition.none_resource()})
Thanks for confirming @sandy. Do you have any recommendations on running the job from a python file if I wanted to keep the emr_pyspark_step_launcher?
There's a legacy API that allows you to do this:
from dagster import job, reconstructable, execute_pipeline

def my_job():

The reason we've been hesitant to support it is that, due to the complexities of reproducing jobs in remote processes, it only works if your job is defined using the
decorator. E.g. the following would not work:
from dagster import job, reconstructable, execute_pipeline

def my_graph():

my_job = my_graph.to_job(...)
another approach would be to have a Python process invoke the command line interface to execute your job