https://dagster.io/ logo
Title
g

Gayathiri

11/15/2022, 4:34 PM
Hi All, We are trying to use @failure_hook (applied on job level) to get failure messages for the ops to go to our slack channel. We have added this failure hook to a job that has multiple ops in it. One of the ops failed with this following message but it didn't invoke the failure hook. Can you please help us understand why the hook wasn't even called? Note: For other ops failure, we saw hook invokations in the dagster logging console after the op was executed. But for this particular op, there were no log events showing hook was called or skipped and the job went on to execute the next op. - Eg :
Skipped the execution of hook "slack_message_on_failure". It did not meet its triggering condition during the execution of "xxx".
c

claire

11/15/2022, 9:32 PM
Hi Gayathiri, would you mind sharing a code snippet containing your job so I can reproduce?
As well as any steps taken to reach this error?
g

Gayathiri

11/15/2022, 10:34 PM
Logs for the job from Dagit are :
read_pg[predicti…d_high_effort]
STEP_START
Started execution of step "read_pg[prediction_std_high_effort]".
17:02:32.489

read_pg[predicti…d_high_effort]
LOADED_INPUT
Loaded input "tablename" using input manager "io_manager", from output "result" of step "get_pg_tables"
17:02:32.583

read_pg[predicti…d_high_effort]
STEP_INPUT
Got input "tablename" of type "String". (Type check passed).
17:02:32.606

read_pg[std_claim_actions]
STEP_FAILURE
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '901431a7-45f3-409b-98f7-f6c0283000ba', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f2f5659b-e562-426b-81ce-d474cea20292', 'X-Kubernetes-Pf-Prioritylevel-Uid': '3b438278-a6eb-4a1a-918a-842f3525d6bb', 'Date': 'Tue, 15 Nov 2022 22:02:33 GMT', 'Content-Length': '284'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dagster-step-24a9f39cdeadc24dda07fa68c8aabcf9-1\" not found","reason":"NotFound","details":{"name":"dagster-step-24a9f39cdeadc24dda07fa68c8aabcf9-1","group":"batch","kind":"jobs"},"code":404}



Stack Trace:
  File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 239, in execute
    health_check_result = self._step_handler.check_step_health(
,  File "/usr/local/lib/python3.9/site-packages/dagster_k8s/executor.py", line 244, in check_step_health
    job = self._batch_api.read_namespaced_job(
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api/batch_v1_api.py", line 2657, in read_namespaced_job
    return self.read_namespaced_job_with_http_info(name, namespace, **kwargs)  # noqa: E501
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api/batch_v1_api.py", line 2744, in read_namespaced_job_with_http_info
    return self.api_client.call_api(
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 241, in GET
    return self.request("GET", url,
,  File "/usr/local/lib/python3.9/site-packages/kubernetes/client/rest.py", line 235, in request
    raise ApiException(http_resp=r)
17:02:33.478
View full message
read_pg[predicti…d_high_effort]

STEP_OUTPUT
Yielded output "df" of type "DataFrame". (Type check passed).
17:02:33.675

read_pg[predicti…d_high_effort]
HANDLED_OUTPUT
Handled output "df" using IO manager "io_manager"
17:02:33.909

read_pg[predicti…d_high_effort]
STEP_OUTPUT
Yielded output "schema" of type "DataFrame". (Type check passed).
17:02:33.926

read_pg[predicti…d_high_effort]
HANDLED_OUTPUT
Handled output "schema" using IO manager "io_manager"
17:02:34.157

read_pg[predicti…d_high_effort]
STEP_SUCCESS
Finished execution of step "read_pg[prediction_std_high_effort]" in 1.54s.
17:02:34.174

read_pg[predicti…d_high_effort]
HOOK_SKIPPED
Skipped the execution of hook "slack_message_on_failure". It did not meet its triggering condition during the execution of "read_pg".
17:02:34.192

read_pg[predicti…d_high_effort]
HOOK_COMPLETED
Finished the execution of hook "slack_message_on_success" triggered for "read_pg".
17:02:34.209

-
RUN_FAILURE
Execution of run for "app_to_cdf" failed. An exception was thrown during execution.
dagster._check.CheckError: Invariant failed. Description: Attempted to mark step read_pg[std_claim_actions] as complete that was not known to be in flight

Stack Trace:
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 822, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
,  File "/usr/local/lib/python3.9/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 220, in execute
    active_execution.handle_event(dagster_event)
,  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 400, in handle_event
    self.mark_failed(step_key)
,  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 342, in mark_failed
    self._mark_complete(step_key)
,  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/active.py", line 387, in _mark_complete
    check.invariant(
,  File "/usr/local/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1455, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")
17:02:34.530
View full message
-
ENGINE_EVENT
Process for run exited (pid: 1).
17:02:34.58
We see this behavior only for jobs that have retry policy set.
Job code : 
@dagster.op(
    required_resource_keys={"global_values"}, retry_policy=dagster.RetryPolicy()
)
def load_bq(
    context, tablename: str, df: pd.DataFrame, col_df: pd.DataFrame
) -> pd.DataFrame:
    """Write postgres data to bigquery.

    Massages the data based on the pg column info so that it will fit into bq.
    """
    <http://context.log.info|context.log.info>(f"row count: {df.shape[0]}")
    <http://context.log.info|context.log.info>(col_df)
    global_values = context.resources.global_valuesåç

    df = _transform_df(df, col_df)

    df.to_gbq(å
        destination_table=f"{global_values['bq_dataset']}_app.{tablename}",
        project_id=global_values["gcp_project_id"],
        chunksize=512,
        if_exists="replace",
        table_schema=_get_bq_schema(col_df),
    )
    return df

@dagster.op(
    required_resource_keys={"global_values"},
    out={"df": dagster.Out(), "schema": dagster.Out()},
    retry_policy=dagster.RetryPolicy(),
)
def read_pg(context, tablename: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Read data from postgres.

    Returns:
        - df: the rows in the table
        - col_df: metainformation about the columns in the table
    """
    global_values = context.resources.global_values
    engine = common.get_postgres_engine(global_values)

    df = pd.read_sql(f"SELECT * FROM public.{tablename}", engine)
    col_df = pd.read_sql(_col_query.format(tablename=tablename), engine)

    return df, col_df


@dagster.job(
    name="app_name_a",
    executor_def=autoconfig.get_executor(),
    config=k8s_executor_config_mapping,
    resource_defs={
        "global_values": global_values_resource,
        **autoconfig.get_resource_defs(),
        "slack": slack_resource.configured({"token": get_slack_token()}),
    },
    hooks={slack_message_on_failure},
)
def app_to_cdf():

    def _read_and_load(tablename: str):
        df, col_df = read_pg(tablename)
        df = load_bq(tablename, df, col_df)
        return df
We expect the hook to fail when there is a step_failure event and run_failure event. But the failure hook is skipped and success hook is triggered.
j

johann

11/15/2022, 11:08 PM
What version of dagster are you running?
We had a recent fix for the kubernetes 404 on step retries in 1.0.17/0.16.17. The behavior of the hooks is unexpected. I don’t see
slack_message_on_success
in your code snippet, could you share where you add it
g

Gayathiri

11/16/2022, 3:16 AM
Hi, We are running dagster 1.0.16 .
The slack_message_on_success was added to the same job that I shared earlier with the failure hook. I removed it later so that we could debug the behavior of the failure hook.
@dagster.job(
    name="app_name_a",
    executor_def=autoconfig.get_executor(),
    config=k8s_executor_config_mapping,
    resource_defs={
        "global_values": global_values_resource,
        **autoconfig.get_resource_defs(),
        "slack": slack_resource.configured({"token": get_slack_token()}),
    },
    hooks={slack_message_on_failure, slack_message_on_success},
)
def app_to_cdf():

    def _read_and_load(tablename: str):
        df, col_df = read_pg(tablename)
        df = load_bq(tablename, df, col_df)
        return df
@johann Could you please give us the issue number for this fix
j

johann

11/16/2022, 1:58 PM
1.0.17 for the kubernetes fix
g

Gayathiri

11/16/2022, 2:01 PM
Thanks. I was looking for documentation or release notes so that we could check if our 404 are due to the same issue as the documentation
j

johann

11/16/2022, 2:02 PM
Gotcha, let me find it. But it looks like the same scenario
g

Gayathiri

11/16/2022, 2:03 PM
Thank you so much. Also any inputs on the hook behavior with the retries that we reported, would be much appreciated
j

johann

11/16/2022, 2:03 PM
c

claire

11/16/2022, 10:02 PM
@Gayathiri were you able to see if the issue still exists on 1.0.17?
g

Gayathiri

11/17/2022, 5:17 PM
Hi, We upgraded our dev to 1.0.17 and we dont see the 404s anymore. Will confirm back if this upgrade fixed the inconsistent failure hook behavior that we were experiencing. Thank you!
:rainbow-daggy: 1
We dont see the inconsistent hook behaviors after upgrading to 1.0.17. Thank you!
:rainbow-daggy: 1
d

David Tong

12/01/2022, 12:39 AM
@johann would you know if 1.0.17 would also fix the following similar error (k8s 401 instead of 404)?
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '3e5fc9cb-e799-4adf-8665-6f78f0e8b361', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 29 Nov 2022 23:30:38 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}


  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/dagster/_core/executor/step_delegating/step_delegating_executor.py", line 243, in execute
    health_check_result = self._step_handler.check_step_health(
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/dagster_k8s/executor.py", line 248, in check_step_health
    job = self._batch_api.read_namespaced_job(
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1257, in read_namespaced_job
    return self.read_namespaced_job_with_http_info(name, namespace, **kwargs)  # noqa: E501
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1352, in read_namespaced_job_with_http_info
    return self.api_client.call_api(
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/rest.py", line 239, in GET
    return self.request("GET", url,
  File "/opt/conda/envs/akuna/lib/python3.8/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)
this error was raised on only one pod of several in a k8s retry job, where the other analogous pods were able to complete without issue.