https://dagster.io/ logo
#ask-community
Title
# ask-community
m

Maxime Lenormand

03/24/2022, 1:26 PM
Hi there! EDIT: Fixed it. Passing
process=false
to my Dask client seems to have fixed it. Leaving this in case anyone might need it later :) I'm trying to understand how to use Dagster in combination with Dask, and can't seem to figure out how to move forward so coming here for some help. I started from a pipeline that I have which takes a bunch of data, splits it in individual pieces, processes it and then merges the output together. I'm creating one giant wrapper
op
on top of my function that handles all of this pipeline. This op is the only thing running in my job, and is defined as follows:
Copy code
@op(
    config_schema={"config_as_json": Any},
    out={"list_all_intermediate_files": Out(), "status_all_intermediate_files": Out(), "merged_file_path": Out()},
)
def process_everything_op(context):
    config_dict = context.op_config["config_as_json"]
    return process_everything(config_dict, logger=logger)
I pass a dict of all the parameters and file paths I need to run as
config_as_json
. Inside that
op
I spin up a local Dask cluster, and run on it. I've then created a
graph
and
job
that simply take this single
op
and run it They look like this:
Copy code
@graph
def process_everything_graph():
    list_all_intermediate_files, status_all_intermediate_files, merged_file_path = process_everything_op()

@job(executor_def=dask_executor)
def process_everything_job():
    return process_everything_graph()
Everything runs fine if I call this job from a local
main.py
file:
Copy code
if __name__ == "__main__":
    with open("config_file_path.yaml") as file:
        config = yaml.safe_load(file)
    risk_score_job.execute_in_process(config)
It creates a local Dask cluster, runs everything perfect! But then I try to run the same thing but through Dagit and run into
Copy code
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "process_everything_op":
The above exception was caused by the following exception:
AssertionError: daemonic processes are not allowed to have children
I'm not quite sure what's happening and where to go from there?
d

daniel

03/24/2022, 2:27 PM
Hi Maxime - when you say "Inside that
op
I spin up a local Dask cluster" - does that mean all the dask management is happening inside your op? If that's the case I wouldn't expect you to need to use the dask_executor
m

Maxime Lenormand

03/24/2022, 2:36 PM
Yep, that's the case I'm not quite sure I understood how that dask_executor does to be honest
d

daniel

03/24/2022, 2:43 PM
This might help as a start: https://docs.dagster.io/deployment/guides/dask#execute-on-dask The goal with the dask executor is to have Dagster manage the dask client for you - so you speicfy the dask configuration and then you can run the same business logic locally (using the default executor) or in dask (using the dask executor), without having to have a bunch of dask-specific logic in your ops
But it's totally fine to manage dask in your ops if you want - you just don't need the dask_executor in that case
1
m

Maxime Lenormand

03/24/2022, 3:12 PM
I did take a look at that thanks! I just did understand how to build the Dask cluster form there, do I only need to pass
client()
without any of the parameters like
n_threads
or do I specifically need to call them from the config when calling Dask?
d

daniel

03/24/2022, 3:17 PM
I think that doc assumes that you are separately managing the dask cluster outside of dagster. If you want to build the cluster up as part of execution, then what you have (or making a separate dask resource that creates the cluster) would make sense
m

Maxime Lenormand

03/25/2022, 8:29 AM
Okay, thanks for taking the time to answer 🙂
condagster 1