Matthew Smicker
11/01/2021, 11:54 PMTypeError: cannot pickle '_thread.lock' object
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 327, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 381, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 491, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/fs_io_manager.py", line 122, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
Any ideas?yuhan
11/02/2021, 12:02 AMMatthew Smicker
11/02/2021, 12:24 AM@op(
ins={
'project_name': In(dagster_type=String),
'run_subfolder': In(dagster_type=String),
},
out={
'project_name': Out(dagster_type=String),
},
)
def get_project_name(context, project_name, run_subfolder):
unique_project_name = project_name + '-' + run_subfolder
context.log.debug(unique_project_name)
return unique_project_name
@op(
ins={
'project_name': In(
dagster_type=String,
description='Name for the project',
),
},
out={
'project': Out(
dagster_type=Project,
description='Project object',
),
},
required_resource_keys={'rest_service'},
)
def create_get_project(context, project_name):
api = context.resources.rest_service
# Query projects by project name
project_list = api.projects.query(name=project_name)
project = None
if len(project_list) == 0:
project = api.projects.create(name=project_name)
else:
project = project_list[0]
context.log.debug(str(project))
return project
@graph
def create_unique_project(project_name, run_subfolder):
unique_project_name = get_project_name(project_name=project_name, run_subfolder=run_subfolder)
return create_get_project(project_name=unique_project_name)
yuhan
11/02/2021, 12:37 AMmy_in_process_job = create_unique_project.to_job(
executor_def=in_process_executor,
resource_def={"io_manager": mem_io_manager}
)
Matthew Smicker
11/02/2021, 12:49 AMyuhan
11/02/2021, 12:59 AMMatthew Smicker
11/02/2021, 12:59 AMyuhan
11/02/2021, 1:06 AMMatthew Smicker
11/02/2021, 1:07 AMyuhan
11/02/2021, 1:10 AM