Gayathiri
11/15/2022, 4:34 PMSkipped the execution of hook "slack_message_on_failure". It did not meet its triggering condition during the execution of "xxx".
claire
11/15/2022, 9:32 PMGayathiri
11/15/2022, 10:34 PMread_pg[predicti…d_high_effort]
STEP_START
Started execution of step "read_pg[prediction_std_high_effort]".
17:02:32.489
read_pg[predicti…d_high_effort]
LOADED_INPUT
Loaded input "tablename" using input manager "io_manager", from output "result" of step "get_pg_tables"
17:02:32.583
read_pg[predicti…d_high_effort]
STEP_INPUT
Got input "tablename" of type "String". (Type check passed).
17:02:32.606
read_pg[std_claim_actions]
STEP_FAILURE
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '901431a7-45f3-409b-98f7-f6c0283000ba', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f2f5659b-e562-426b-81ce-d474cea20292', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3b438278-a6eb-4a1a-918a-842f3525d6bb', 'Date': 'Tue, 15 Nov 2022 22:02:33 GMT', 'Content-Length': '284'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dagster-step-24a9f39cdeadc24dda07fa68c8aabcf9-1\" not found","reason":"NotFound","details":{"name":"dagster-step-24a9f39cdeadc24dda07fa68c8aabcf9-1","group":"batch","kind":"jobs"},"code":404}
Stack Trace:
File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 239, in execute
health_check_result = self._step_handler.check_step_health(
, File "/usr/local/lib/python3.9/site-packages/dagster_k8s/executor.py", line 244, in check_step_health
job = self._batch_api.read_namespaced_job(
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api/batch_v1_api.py", line 2657, in read_namespaced_job
return self.read_namespaced_job_with_http_info(name, namespace, **kwargs) # noqa: E501
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api/batch_v1_api.py", line 2744, in read_namespaced_job_with_http_info
return self.api_client.call_api(
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 241, in GET
return self.request("GET", url,
, File "/usr/local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 235, in request
raise ApiException(http_resp=r)
17:02:33.478
View full message
read_pg[predicti…d_high_effort]
STEP_OUTPUT
Yielded output "df" of type "DataFrame". (Type check passed).
17:02:33.675
read_pg[predicti…d_high_effort]
HANDLED_OUTPUT
Handled output "df" using IO manager "io_manager"
17:02:33.909
read_pg[predicti…d_high_effort]
STEP_OUTPUT
Yielded output "schema" of type "DataFrame". (Type check passed).
17:02:33.926
read_pg[predicti…d_high_effort]
HANDLED_OUTPUT
Handled output "schema" using IO manager "io_manager"
17:02:34.157
read_pg[predicti…d_high_effort]
STEP_SUCCESS
Finished execution of step "read_pg[prediction_std_high_effort]" in 1.54s.
17:02:34.174
read_pg[predicti…d_high_effort]
HOOK_SKIPPED
Skipped the execution of hook "slack_message_on_failure". It did not meet its triggering condition during the execution of "read_pg".
17:02:34.192
read_pg[predicti…d_high_effort]
HOOK_COMPLETED
Finished the execution of hook "slack_message_on_success" triggered for "read_pg".
17:02:34.209
-
RUN_FAILURE
Execution of run for "app_to_cdf" failed. An exception was thrown during execution.
dagster._check.CheckError: Invariant failed. Description: Attempted to mark step read_pg[std_claim_actions] as complete that was not known to be in flight
Stack Trace:
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 822, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
, File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 220, in execute
active_execution.handle_event(dagster_event)
, File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 400, in handle_event
self.mark_failed(step_key)
, File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 342, in mark_failed
self._mark_complete(step_key)
, File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 387, in _mark_complete
check.invariant(
, File "/usr/local/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1455, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
17:02:34.530
View full message
-
ENGINE_EVENT
Process for run exited (pid: 1).
17:02:34.58
Job code :
@dagster.op(
required_resource_keys={"global_values"}, retry_policy=dagster.RetryPolicy()
)
def load_bq(
context, tablename: str, df: pd.DataFrame, col_df: pd.DataFrame
) -> pd.DataFrame:
"""Write postgres data to bigquery.
Massages the data based on the pg column info so that it will fit into bq.
"""
<http://context.log.info|context.log.info>(f"row count: {df.shape[0]}")
<http://context.log.info|context.log.info>(col_df)
global_values = context.resources.global_valuesåç
df = _transform_df(df, col_df)
df.to_gbq(å
destination_table=f"{global_values['bq_dataset']}_app.{tablename}",
project_id=global_values["gcp_project_id"],
chunksize=512,
if_exists="replace",
table_schema=_get_bq_schema(col_df),
)
return df
@dagster.op(
required_resource_keys={"global_values"},
out={"df": dagster.Out(), "schema": dagster.Out()},
retry_policy=dagster.RetryPolicy(),
)
def read_pg(context, tablename: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Read data from postgres.
Returns:
- df: the rows in the table
- col_df: metainformation about the columns in the table
"""
global_values = context.resources.global_values
engine = common.get_postgres_engine(global_values)
df = pd.read_sql(f"SELECT * FROM public.{tablename}", engine)
col_df = pd.read_sql(_col_query.format(tablename=tablename), engine)
return df, col_df
@dagster.job(
name="app_name_a",
executor_def=autoconfig.get_executor(),
config=k8s_executor_config_mapping,
resource_defs={
"global_values": global_values_resource,
**autoconfig.get_resource_defs(),
"slack": slack_resource.configured({"token": get_slack_token()}),
},
hooks={slack_message_on_failure},
)
def app_to_cdf():
def _read_and_load(tablename: str):
df, col_df = read_pg(tablename)
df = load_bq(tablename, df, col_df)
return df
johann
11/15/2022, 11:08 PMslack_message_on_success
in your code snippet, could you share where you add itGayathiri
11/16/2022, 3:16 AM@dagster.job(
name="app_name_a",
executor_def=autoconfig.get_executor(),
config=k8s_executor_config_mapping,
resource_defs={
"global_values": global_values_resource,
**autoconfig.get_resource_defs(),
"slack": slack_resource.configured({"token": get_slack_token()}),
},
hooks={slack_message_on_failure, slack_message_on_success},
)
def app_to_cdf():
def _read_and_load(tablename: str):
df, col_df = read_pg(tablename)
df = load_bq(tablename, df, col_df)
return df
johann
11/16/2022, 1:58 PMGayathiri
11/16/2022, 2:01 PMjohann
11/16/2022, 2:02 PMGayathiri
11/16/2022, 2:03 PMjohann
11/16/2022, 2:03 PMclaire
11/16/2022, 10:02 PMGayathiri
11/17/2022, 5:17 PMDavid Tong
12/01/2022, 12:39 AMkubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '3e5fc9cb-e799-4adf-8665-6f78f0e8b361', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 29 Nov 2022 23:30:38 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/dagster/_core/executor/step_delegating/step_delegating_executor.py", line 243, in execute
health_check_result = self._step_handler.check_step_health(
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/dagster_k8s/executor.py", line 248, in check_step_health
job = self._batch_api.read_namespaced_job(
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1257, in read_namespaced_job
return self.read_namespaced_job_with_http_info(name, namespace, **kwargs) # noqa: E501
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1352, in read_namespaced_job_with_http_info
return self.api_client.call_api(
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/rest.py", line 239, in GET
return self.request("GET", url,
File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/rest.py", line 233, in request
raise ApiException(http_resp=r)
this error was raised on only one pod of several in a k8s retry job, where the other analogous pods were able to complete without issue.