https://dagster.io/ logo
Title
s

Simon Späti

11/17/2021, 8:12 PM
I guess I found a small bug when using
@graph
in combination with a
pyspark.DataFrame
. Not sure if the dynamic output makes the problem or the graph itself is already the issue. As you can see on the image, when passing a
DataFrame
inside the
get_changed_segments
-graph, it persist the parquet files on a path that includes `[]`which gives trouble to read it. When removing the brackets, I can read it with no error. Any way to work around this? I guess now I need to remove the
@graph
and then it will work (checking that right now). The stack trace:
pyspark.sql.utils.AnalysisException: Path does not exist: <s3a://mnt-b002/intermediate_storage/storage/27dbdd4a-89fb-4cd0-a070-ee065941610d/get_changed_segments.get_diff_df[update_FactCallSession_0]/result>;
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/dagster/core/execution/plan/utils.py", line 44, in solid_execution_error_boundary
    yield
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/dagster/core/execution/plan/inputs.py", line 528, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/dagster_generic_code/sac_s3_resources/sac_io_manager_df.py", line 41, in load_input
    df = context.resources.pyspark.spark_session.read.parquet(self._uri_for_key(key))
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 353, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/spaeti/.venvs/dagster/lib/python3.8/site-packages/pyspark/sql/utils.py", line 134, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
OK also when not using a
@graph
around, I get the same error. So I assume it's related to the
DynamicOutput
mechanisms, could that be? How could I workaround this? 🤔
s

sandy

11/18/2021, 2:16 AM
hey @Simon Späti - the path is determined by your IOManager (based on your stack trace, it looks like it's defined in dagster_generic_code/sac_s3_resources/sac_io_manager_df.py)
s

Simon Späti

11/19/2021, 11:42 AM
thanks @sandy. I thought we used or copied basic dagster components from s3-io-manager from dagster, but as you pointed out right, it's our own wrong implementation. I could simply replace special characters for parquet and it worked. Thanks a ton!
s

sandy

11/19/2021, 4:22 PM
glad that worked out!