Charlie Bini
05/24/2022, 3:33 PMExceeded max_retries of 1
for an Op with retry_policy=RetryPolicy(max_retries=9, delay=30)
after 1 retry?Charlie Bini
05/24/2022, 3:36 PMraise RetryRequested() from e
override the op's RetryPolicy
?sean
05/25/2022, 1:35 PMCharlie Bini
05/25/2022, 1:53 PM@op(
ins={
"table": In(dagster_type=Any),
"projection": In(dagster_type=Optional[String]),
"query": In(dagster_type=Optional[String]),
"count": In(dagster_type=Int),
"n_pages": In(dagster_type=Int),
},
out={"gcs_file_handles": Out(dagster_type=List)},
required_resource_keys={"gcs_fm"},
retry_policy=RetryPolicy(max_retries=9, delay=30),
config_schema={"query_timeout": Field(Int, is_required=False, default_value=30)},
tags={"dagster/priority": 6},
)
def get_data(context, table, projection, query, count, n_pages):
file_ext = "json.gz"
file_stem = "_".join(filter(None, [table.name, str(query or "")]))
gcs_file_handles = []
for p in range(n_pages):
file_key = f"{table.name}/{file_stem}_p_{p}.{file_ext}"
if context.retry_number > 0 and context.resources.gcs_fm._has_object(
key=file_key
):
context.log.debug("File already exists from previous try. Skipping.")
else:
context.log.debug(f"page:\t{(p + 1)}/{n_pages}")
try:
data = time_limit_query(
context=context,
table=table,
query=query,
projection=projection,
page=(p + 1),
)
except Exception as e:
raise RetryRequested(
max_retries=context.op_def.retry_policy.max_retries,
seconds_to_wait=context.op_def.retry_policy.delay,
) from e
jsongz_obj = gzip.compress(json.dumps(data).encode("utf-8"))
gcs_file_handles.append(
context.resources.gcs_fm.upload_data(
context=context, obj=jsongz_obj, file_key=file_key
)
)
Charlie Bini
05/25/2022, 1:54 PMRetryRequested
was superceding RetryPolicy
I passed the additional paramsCharlie Bini
05/25/2022, 1:55 PMsean
05/25/2022, 2:43 PMsean
05/25/2022, 2:44 PMRetryRequested
objectDagster Bot
05/25/2022, 2:44 PMCharlie Bini
05/25/2022, 3:22 PM