Qwame
06/19/2023, 9:40 PMDynamicOut
and the gcs_pickle_io_manager
. I got this error
dagster._core.errors.DagsterInvariantViolationError: Cannot perform reexecution with in-memory io managers.
To enable reexecution, you can set a persistent io manager, such as the fs_io_manager, in the resource_defs argument on your job: resource_defs={"io_manager": fs_io_manager}
I am pretty sure the gcs_pickle_io_manager
supports retries and reexecution. Any reason I got this error?Zach
06/19/2023, 10:03 PMgcs_pickle_io_manager
isn't being configured right? Can you share where your resources are being attached to your job, and the op where the DynamicOutput is being generated?Qwame
06/19/2023, 10:26 PMDefinitions
, I have
Definitions(resources={"io_manager": gcs_pickle_io_manager.configured(
{
"gcs_bucket": {"env": "CLOUD_STORAGE_BUCKET"},
"gcs_prefix": "prod_",
}
)}
)
Then in my job, I have
@op(out=DynamicOut())
def generate_batches(
context, bq_config: ETLConfig
):
if bq_config.new_metrics.shape[0] > 0:
batches = np.array_split(
new_metrics,
ceil(new_metrics.shape[0] / 25),
)
for index, batch in enumerate(batches):
yield DynamicOutput(
{"batch": batch, "index": index}, mapping_key=str(index)
)
@op
def perform_action(context, batch):
print(batch)
....
@job
def dynamic_job():
generate_batches().map(lambda x: perform_action(x))
Zach
06/19/2023, 10:31 PMQwame
06/19/2023, 10:32 PMZach
06/19/2023, 10:32 PMZach
06/19/2023, 10:33 PM