Hello, I am having pickling error with `adls2_reso...
# ask-community
o
Hello, I am having pickling error with
adls2_resource
Copy code
The above exception was caused by the following exception:
TypeError: can't pickle _thread.RLock objects

Stack Trace:
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 524, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/usr/local/lib/python3.7/site-packages/dagster_azure/adls2/io_manager.py", line 82, in handle_output
    pickled_obj = pickle.dumps(obj, PICKLE_PROTOCOL)

Exception ignored in: <generator object snowflake at 0x7f4b1dcbdf50>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/gpdagster/resources/snowflake.py", line 703, in snowflake
  File "/usr/local/lib/python3.7/site-packages/gpdagster/resources/base_db.py", line 124, in __exit__
  File "/usr/local/lib/python3.7/site-packages/dbutils/pooled_db.py", line 380, in close
  File "/usr/lib64/python3.7/threading.py", line 365, in notify_all
  File "/usr/lib64/python3.7/threading.py", line 348, in notify
TypeError: 'NoneType' object is not callable
c
Can you share the code where this error is occurring? It seems like it's caused by use of the wrong queueing library in multiprocessing contexts, but we don't use the queue within adls2 i don't believe: https://stackoverflow.com/questions/44144584/typeerror-cant-pickle-thread-lock-objects
o
I saw the thread on the queuing library too
Copy code
@op(required_resource_keys={'pyspark_step_launcher', 'emr_snowflake'},
    out={"category_df": Out(dagster_type=DataFrame, io_manager_key="adls2_pickle_io_manager")})
def get_categories(context) -> DataFrame:
    run_date = context.op_config.get('run_date')
    query = f"""
    SELECT id,
       title
    FROM STITCH.MLS_MYSQL.categories
    WHERE created_at < '{run_date}'::DATE;
    """
    spark_session = _get_spark_session()
    category_df = context.resources.emr_snowflake.read_spark_dataframe(spark_session, query) \
        .select("id", "title")

    yield Output(category_df, output_name='category_df')
I think in this particular case it may have to do with the fact that the dataframe being returned from the snowflake query can not be pickled.
c
Ahh sorry I see now you're using the adls2 io manager. Yea that's correct. The io manager you're using expects outputs to be pickleable, and I think the error you're seeing is a deep error from the io manager attempting to do that pickling.
👍 1
You could potentially modify the adls2 io manager to handle spark dataframes. We perform this handling in our hackernews example: https://github.com/dagster-io/dagster/blob/master/examples/hacker_news/hacker_news/resources/snowflake_io_manager.py, so it would be combining that logic with the existing write logic within the adls2 io manager
o
Will try it out. This already looks hopeful!
❤️ 1
@chris Do you have a link to adls2 io manager github page?
o
Thanks