https://dagster.io/ logo
Title
m

Matthew Smicker

11/01/2021, 11:54 PM
I am upgrading to the new core api and making use of nested graphs to abstract away some complexity. I have a fairly simple case with the first op taking 2 strings and outputting a string. the second op (within the graph) takes the string as input and outputs a python class (which is defined as the dagster type). The step_output type check passes but then I get this error message:
TypeError: cannot pickle '_thread.lock' object
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 327, in core_dagster_event_sequence_for_step
    for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 381, in _type_check_and_store_output
    for evt in _store_output(step_context, step_output_handle, output, input_lineage):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 491, in _store_output
    handle_output_res = output_manager.handle_output(output_context, output.value)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/fs_io_manager.py", line 122, in handle_output
    pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
Any ideas?
y

yuhan

11/02/2021, 12:02 AM
Hi Matthew, do you mind sharing your nested graph definition and the ops?
m

Matthew Smicker

11/02/2021, 12:24 AM
Sure
@op(
    ins={
        'project_name': In(dagster_type=String),
        'run_subfolder': In(dagster_type=String),
    },
    out={
        'project_name': Out(dagster_type=String),
    },
)
def get_project_name(context, project_name, run_subfolder):
    unique_project_name = project_name + '-' + run_subfolder
    context.log.debug(unique_project_name)
    return unique_project_name

@op(
    ins={
        'project_name': In(
            dagster_type=String,
            description='Name for the project',
        ),
    },
    out={
        'project': Out(
            dagster_type=Project,
            description='Project object',
        ),
    },
    required_resource_keys={'rest_service'},
)
def create_get_project(context, project_name):
    api = context.resources.rest_service
    
    # Query projects by project name
    project_list = api.projects.query(name=project_name)
    project = None
    if len(project_list) == 0:
        project = api.projects.create(name=project_name)
    else:
        project = project_list[0]
    context.log.debug(str(project))
    
    return project

@graph
def create_unique_project(project_name, run_subfolder):
    unique_project_name = get_project_name(project_name=project_name, run_subfolder=run_subfolder)
    return create_get_project(project_name=unique_project_name)
Ah, it relates to Artur's point. I tested and the Project object cannot be pickled. Unfortunately, I am not so clear how to address 🙂
y

yuhan

11/02/2021, 12:37 AM
when you define a job, you can specify the executor and io_manager as:
my_in_process_job = create_unique_project.to_job(
	executor_def=in_process_executor,
	resource_def={"io_manager": mem_io_manager}
)
m

Matthew Smicker

11/02/2021, 12:49 AM
Thanks! At the moment my overall job contains many ops along with this graph. What I did for the moment is configured that job with the options you listed above. Would there be a way to have the in_process_executor and mem_io_manager for only that single part of the job (the defined graph)?
Or does that not make sense
y

yuhan

11/02/2021, 12:59 AM
do you want other parts of the job to be multiprocessing and their outputs to be persisted on disk?
m

Matthew Smicker

11/02/2021, 12:59 AM
Yes, I was thinking that
y

yuhan

11/02/2021, 1:06 AM
hmm i don’t think it’s straightforward to have two executors being used in one job. however, if you want the other parts to persist data, you can specify the io_manager per output: https://docs.dagster.io/concepts/io-management/io-managers#per-output-io-manager
m

Matthew Smicker

11/02/2021, 1:07 AM
Will take a look. In any case I'm up and running. Appreciate it!
y

yuhan

11/02/2021, 1:10 AM
alternatively, you could write a custom io manager for the Object output so you can specially handle the deserialization/serialization for that type. and in that case, you don’t need to switch the entire job to be in-process (multiprocessing requires data to be shared across processes) see an example here: https://docs.dagster.io/concepts/io-management/io-managers#a-custom-io-manager-that-stores-pandas-dataframes-in-tables
👍 1