Afternoon All! Running into some very weird issues...
# dagster-serverless
Afternoon All! Running into some very weird issues with our EMR serverless deployment that I can’t quite manage to reproduce elsewhere. We had a materialization run that uses the EMR Step launcher, however ended up being marked incorrectly as failed due to a boto exception. After that, any runs ran against it end up failing due to another reason. Is this cause for concern, or is it the type of thing that should ‘self resolve’ after some of the runners restart? (Forgive me if I’m not completely correct with the term). Details in thread 🧵 UPDATE: Waited awhile before rerunning, and it seems like it’s resolved. Still a very strange issue 🙂
First Job Failed with:
Copy code
dagster_aws.emr.emr.EmrError: EMR log file did not appear on S3 after waiting
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/", line 224, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/emr/", line 310, in launch_step
    yield from self.wait_for_completion_and_log(run_id, step_key, emr_step_id, step_context)
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/emr/", line 323, in wait_for_completion_and_log
    self._log_logs_from_s3(step_context.log, emr_step_id)
  File "/opt/dagster/app/common/", line 165, in _log_logs_from_s3
    stdout_log, stderr_log = self.emr_job_runner.retrieve_logs_for_step_id(
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/emr/", line 379, in retrieve_logs_for_step_id
    stdout_log = self.wait_for_log(log, log_bucket, "{prefix}/stdout.gz".format(prefix=prefix))
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/emr/", line 419, in wait_for_log
    raise EmrError("EMR log file did not appear on S3 after waiting") from err
The above exception was caused by the following exception:
botocore.exceptions.WaiterError: Waiter ObjectExists failed: An error occurred (403): Forbidden
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/emr/", line 413, in wait_for_log
  File "/usr/local/lib/python3.8/site-packages/botocore/", line 55, in wait
    Waiter.wait(self, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/", line 357, in wait
    raise WaiterError(
The above error may be a one time issue, iam, etc. I haven’t seen it happen before but I’m more concerned with the invalid state in may have introduced/caused. I now have both STEP_SUCCESS and STEP_FAILURE in the logs…
Now when I run it again to refresh the partition (just in an attempt to remove the ambiguity) I get an Invariant failed error:
Copy code
dagster._check.CheckError: Invariant failed. Description: Attempted to mark step poi_scores as complete that was not known to be in flight
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/", line 1085, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/executor/", line 219, in execute
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/", line 400, in handle_event
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/", line 342, in mark_failed
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/", line 387, in _mark_complete
  File "/usr/local/lib/python3.8/site-packages/dagster/_check/", line 1470, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
Hey Zach - I'll go ahead and file an issue for this as it looks like something the EmrPySparkStepLauncher should be able to recover from better. It looks like what happened was that the EMR step completed (hence the success event) but then trying to get the EMR logs from S3 failed and created a STEP_FAILURE event (failing with a 403: Forbidden is somewhat surprising - I'm not actually totally sure if that's an 'expected' failure where we should keep retrying or not, the intention is that it waits for a while until the object exists, but it's possible the 403 is short-circuiting that). Do you have any idea why it would be 403ing with a permissions issue rather than telling you that the object doesn't exist? The relevant code that we're hitting is here: Separately, if you don't actually care about these logs, you could set wait_for_logs to False - but that's the default, so i imagine you set it to True intentionally
From our logs, I think you actually hit the same invariant in both runs - it's getting confused by the STEP_SUCCESS then STEP_FAILURE thing, but it's kind of a secondary problem, the main problem is that 403
Ah OK, here's a theory - (java not python, but same issue) suggests that this may go away if the IAM role you're using has the s3:List* permission?
Hmm interesting. I’m not entirely sure why a 403 is occurring here, I haven’t made any changes to the workflow here, and running it again seems to resolve the 403. The dagster role we’re running with has List* permissions on the bucket, but not List* for all buckets. (Ironically, actually noticed some were missing, but not the buckets that would be used here!) One thing that may be worth noting is that I’m actually subclassing the PysparkStepLauncher to add some extra properties and an override for the
method. (It’s almost the same, just an if/else statement in the record logging loop.
it's possible that when it worked, you were lucky and the bucket exists by the first time that it checked
👀 1
If you have s3:ListBucket permissions on the bucket in question, I wouldn't expect it to raise a 403 though
I'm not sure why the Waiter doesn't give you a mode where it also retries on 403s, if it did we could use it here
Agreed! Many things I wish boto3 had or did differently! I’ll check the IAM again in the morning.