Hello! I'm testing out Dagster with PySpark. I've ...
# announcements
w
Hello! I'm testing out Dagster with PySpark. I've got a few solids with DagsterPySparkDataFrame as inputs/outputs. My config is declaring storagefilesystemconfig:base_dir to a directory on my server, and the first run of the pipeline works flawlessly - looks awesome! I tried to Re-Run a segment of the DAG but am getting a error raised from Pyspark about the intermediate parquet file already existing. I'm assuming there's some way to tell the system to always re-run intermediates with mode="overwrite"?
a
@sandy
s
Hi Wes - are you able to share some of the path that it's trying to write to? In general, it should be writing to a new file each time (based on the run ID), so I'm surprised to hear that you're hitting trouble with "overwrite". Very possible something isn't working correctly though
y
If base_dir is set, the old intermediate storage path would not include run_id in the file path. That’s likely the cause here
And then object store will fail the check when trying to cp object in reexecution.
This should be resolved in the IO managers world with the 0.10.0 push
s
@Wes Roach - what version are you using?
w
Copy code
dagit                     0.9.21             pyh44b312d_1    conda-forge
dagster                   0.9.21           py37h89c1867_1    conda-forge
dagster-graphql           0.9.21             pyheb06c22_1    conda-forge
dagster-pyspark           0.9.21             pyheb06c22_1    conda-forge
dagster-spark             0.9.21             pyhd8ed1ab_1    conda-forge
Config:
Copy code
storage:
  filesystem:
    config:
      base_dir: /u01/projects/hcdev/working/wroach/hcdev/sandbox/dagster/mvp/storage
Error:
Copy code
pyspark.sql.utils.AnalysisException: path file:/u01/projects/hcdev/working/wroach/hcdev/sandbox/dagster/mvp/storage/intermediates/staged_eligibility_sdf.compute/result already exists.;