I am unable to retry from failure when using `Dyna...
# ask-community
q
I am unable to retry from failure when using
DynamicOut
and the
gcs_pickle_io_manager
. I got this error
Copy code
dagster._core.errors.DagsterInvariantViolationError: Cannot perform reexecution with in-memory io managers.
To enable reexecution, you can set a persistent io manager, such as the fs_io_manager, in the resource_defs argument on your job: resource_defs={"io_manager": fs_io_manager}
I am pretty sure the
gcs_pickle_io_manager
supports retries and reexecution. Any reason I got this error?
z
Feels like maybe the
gcs_pickle_io_manager
isn't being configured right? Can you share where your resources are being attached to your job, and the op where the DynamicOutput is being generated?
q
in my
Definitions
, I have
Copy code
Definitions(resources={"io_manager": gcs_pickle_io_manager.configured(
        {
            "gcs_bucket": {"env": "CLOUD_STORAGE_BUCKET"},
            "gcs_prefix": "prod_",
        }
    )}
)
Then in my job, I have
Copy code
@op(out=DynamicOut())
def generate_batches(
    context, bq_config: ETLConfig
):

    if bq_config.new_metrics.shape[0] > 0:
 
        batches = np.array_split(
            new_metrics,
            ceil(new_metrics.shape[0] / 25),
        )
        for index, batch in enumerate(batches):
            yield DynamicOutput(
                {"batch": batch, "index": index}, mapping_key=str(index)
            )
@op
def perform_action(context, batch):
     print(batch)
     ....

@job
def dynamic_job():
   generate_batches().map(lambda x: perform_action(x))
z
Have you been able to confirm that your outputs are being written to gcs? If so, then it feels like a bug
q
Yes, my outputs are all written to gcs. I get all the batches
z
Yeah that feels like a bug then, it would probably be worth creating an issue in Github
wish I could be more helpful here - I'm able to do this same thing with the s3_pickle_io_manager just fine