I'm running into two (possibly related) issues wit...
# ask-community
I'm running into two (possibly related) issues with the DatabricksPySparkStepLauncher (running on Dagster Cloud). the databricks jobs seem to start fine and run for 5-10 minutes, after which the dagster process that is responsible for polling the pickled events file from S3 chokes on an unpickling error:
Copy code
_pickle.UnpicklingError: invalid load key, ' '.
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.9/site-packages/etxdagster/databricks/tools.py", line 318, in launch_step
    yield from self.step_events_iterator(
  File "/usr/local/lib/python3.9/site-packages/etxdagster/databricks/tools.py", line 258, in step_events_iterator
    all_events = self.get_step_events(
  File "/usr/local/lib/python3.9/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 259, in get_step_events
    return backoff(
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/backoff.py", line 67, in backoff
    raise to_raise
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/backoff.py", line 61, in backoff
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 254, in _get_step_records
    return deserialize_value(pickle.loads(serialized_records))
After running into a few of those in a row I started seeing these after the Databricks job has been running and successfully sending back events:
Copy code
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 16: dagster.core.errors.DagsterUnknownStepStateError: Execution exited with steps {'generate_params'} in an unknown state to this process.
This was likely caused by losing communication with the process performing step execution.

Stack Trace:
  File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/child_process_executor.py", line 70, in _execute_command_in_child_process
    for step_event in command.execute():
  File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/multiprocess.py", line 82, in execute
    yield from execute_plan_iterator(
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 916, in __iter__
    yield from self.iterator(
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 110, in inner_plan_execution_iterator
    yield hook_event
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 137, in __exit__
    raise DagsterUnknownStepStateError(

  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 822, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/multiprocess.py", line 276, in execute
    raise DagsterSubprocessError(
the second issue seems to have started within a similar timeframe to the new Dagster release this afternoon, could that be related at all?
Dagster version 0.15.3
hi @Zach! is 0.15.3 the dagster version that you're launching the step from, or the dagster version of the databricks cluster, or both? I could definitely believe that if the version drifted between those two, that a pickling error could crop up (potentially related to dynamic outputs, but I could also imagine it happening elsewhere).
also, is around 5-10 minutes around the amount of time you'd expect the step to complete (meaning it's possible that it was an output-related event that sparked the issue), or would you expect those steps to take longer than that?
yeah this is in a step generating dynamic outputs, but the dagster version appears to be 0.15.3 for both the cluster and the workers, same with all other dagster libraries
this step that's failing usually takes somewhere between 5 and 15 minutes
the step is yielding dynamic outputs in chunks, so iterating over a list and yielding a list of dynamic outputs for each element, and we seem to get a bunch of dynamic outputs from the first element in the list before it errors out (can't tell yet if we get all the expected dynamic outputs for that element though)
got it, thanks for the context!
the step appears to complete if there is only one set of dynamic outputs being generated in that iteration I mentioned - meaning when there's only one element to yield dynamic outputs for
more testing has shown that last statement to be not be correct - it seems to be able to make it through multiple iterations of yielding DynamicOutputs, now trying to see if there's a specific number of DynamicOutputs it starts to break down on. this step is generating thousands of dynamic outputs (up to ~18,000 for this configuration set) in a massive fan out, but looking at the underlying code it doesn't seem like there'd be any real reason to break down for larger sets of dynamic output events being returned, unless there's just some weird pickling issues when the file the databricks worker is writing to gets to larger sizes
And just to confirm the timeline of when these errors started showing up, did you have this code deployed / working normally originally, then error #1 started to occur, then yesterday error #2 started to surface?
I agree that the size of the pickled events file could potentially be the source of the issue. My current thinking is that it might be getting to a size large enough to require multiple blocks to download.
do you have an easy way to check the size of that file on your end? It might make sense for us to gzip the file. I'll try to replicate this on my end as well with a large set of outputs
Yes, that timeline is correct. I can get the size of the file for you. The theory I'm working through right now is that the file is being written in chunks from databricks_step_main.py and it's big enough that the Dagster instance polls the file while it's incomplete in some manner. The block size thing would make sense too. Interestingly it's fairly inconsistent as to exactly how many DynamicOutputs get yielded before it fails.
one of the tries that got a bit further had a file size of 9.3MB, another was 3.4MB
got it ok -- I think it's quite likely to be a file size issue + randomness introduced based on when the polling actually happens. I'm still getting my cluster set up, but after I replicate the issue I'll try a few things to manage that (my hope is that there would be a pretty high compression ratio on the file which would just solve the issue outright, but if not I'll play around with other solutions)
also, did the number of dynamic outputs that you're emitting increase recently, or has it always been around 18k?
it's been relatively recently that we started running this step on databricks - prior we were running it just on the ECS container the dagster worker runs. it was working for a day or two on databricks, but looking back the number of dynamic outputs for those runs were significantly smaller
👍 1
long story short it does seem that starting these larger fan outs on Databricks correlates with when we started seeing the problem
I modified the backoff on the read/unpickle operation to be a bit more aggressive and it's looking really good.
also added more max retries
nice -- for what it's worth, I was able to replicate the issue with a pretty pathological example (just writing 18k outputs in a loop with no delay), and I'm adding several measures to make this more consistent. Gzipping the file seems to give a really great compression ratio (which is nice regardless of the consistency benefits), and I'm also making the reads / writes more atomic.
awesome, seems like both of those could help. I appreciate you digging into this
a combination of gzip + atomic-ish writes + more lenient backoff has allowed my hellish creation to complete successfully 🙂 I'll get that change into the next release, but it seems like your solution will work until then -- thanks for the report!
🤣 1
thanks so much for the quick turnaround!
actually, after more experimentation, just gzipping the file on either end (+ a couple of extra retries) seems to do the trick. Turns out it's around a 100:1 compression ratio on the pickled file, which helps a ton