Maxime Lenormand
03/24/2022, 1:26 PMprocess=false
to my Dask client seems to have fixed it. Leaving this in case anyone might need it later :)
I'm trying to understand how to use Dagster in combination with Dask, and can't seem to figure out how to move forward so coming here for some help.
I started from a pipeline that I have which takes a bunch of data, splits it in individual pieces, processes it and then merges the output together.
I'm creating one giant wrapper op
on top of my function that handles all of this pipeline. This op is the only thing running in my job, and is defined as follows:
@op(
config_schema={"config_as_json": Any},
out={"list_all_intermediate_files": Out(), "status_all_intermediate_files": Out(), "merged_file_path": Out()},
)
def process_everything_op(context):
config_dict = context.op_config["config_as_json"]
return process_everything(config_dict, logger=logger)
I pass a dict of all the parameters and file paths I need to run as config_as_json
.
Inside that op
I spin up a local Dask cluster, and run on it.
I've then created a graph
and job
that simply take this single op
and run it
They look like this:
@graph
def process_everything_graph():
list_all_intermediate_files, status_all_intermediate_files, merged_file_path = process_everything_op()
@job(executor_def=dask_executor)
def process_everything_job():
return process_everything_graph()
Everything runs fine if I call this job from a local main.py
file:
if __name__ == "__main__":
with open("config_file_path.yaml") as file:
config = yaml.safe_load(file)
risk_score_job.execute_in_process(config)
It creates a local Dask cluster, runs everything perfect!
But then I try to run the same thing but through Dagit and run into
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "process_everything_op":
The above exception was caused by the following exception:
AssertionError: daemonic processes are not allowed to have children
I'm not quite sure what's happening and where to go from there?daniel
03/24/2022, 2:27 PMop
I spin up a local Dask cluster" - does that mean all the dask management is happening inside your op? If that's the case I wouldn't expect you to need to use the dask_executorMaxime Lenormand
03/24/2022, 2:36 PMdaniel
03/24/2022, 2:43 PMdaniel
03/24/2022, 2:43 PMMaxime Lenormand
03/24/2022, 3:12 PMclient()
without any of the parameters like n_threads
or do I specifically need to call them from the config when calling Dask?daniel
03/24/2022, 3:17 PMMaxime Lenormand
03/25/2022, 8:29 AM