Hi everyone, I'm a newby in dagster and I would a...
# ask-community
h
Hi everyone, I'm a newby in dagster and I would appreciate your help. I have a dagster job, metabase_cards, that uses a metabase client to update some queries in Metabase. This job usually takes around 1h. I first launched directly with dagster command (
dagster job execute -f metabase_cache/jobs/metabase_cards.py
), it works fine. Then I try to execute it trough dagit, by executing first the command
dagit
to see the UI, dagit launches de job without me executing it, like in the attached image below, and after some seconds, there appears the following Timeout Error:
Copy code
/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_core/workspace/context.py:559: UserWarning: Error loading repository location automatic_metabase_cache.repositories:Exception: Timed out waiting for gRPC server to start with arguments: "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/bin/python -m dagster api grpc --lazy-load-user-code --socket /tmp/tmpgz11jfa2 --heartbeat --heartbeat-timeout 45 --fixed-server-id d9bd49df-b15e-4fa7-9083-f85c15af44e7 --log-level WARNING --package-name automatic_metabase_cache.repositories". Most recent connection error: dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server

Stack Trace:
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/server.py", line 966, in wait_for_grpc_server
    client.ping("")
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/client.py", line 128, in ping
    res = self._query("Ping", api_pb2.PingRequest, echo=echo)
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/client.py", line 115, in _query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: No such file or directory"
	debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-09-15T17:06:43.263451948+02:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: No such file or directory {grpc_status:14, created_time:"2022-09-15T17:06:43.263449118+02:00"}]}"
>

Stack Trace:
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/client.py", line 112, in _query
    response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)


Stack Trace:
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_core/host_representation/grpc_server_registry.py", line 211, in _get_grpc_endpoint
    server_process = GrpcServerProcess(
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/server.py", line 1121, in __init__
    self.server_process = open_server_process(
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/server.py", line 1034, in open_server_process
    wait_for_grpc_server(server_process, client, subprocess_args, timeout=startup_timeout)
  File "/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/lib/python3.8/site-packages/dagster/_grpc/server.py", line 972, in wait_for_grpc_server
    raise Exception(
I understand dagit has a timeout of 30 seconds to to load its classes but I do not understand why dagit is executing the job before I can see the UI and launch it by myself. Any strategy to solve this problem? Note: I do not have a VPC and when I launched the command
/home/akeneo/workspace/akeneo/quokkas-orchestration-tool/dagster_automatize/bin/python -m dagster api grpc --lazy-load-user-code --socket /tmp/tmplpc3e2m3 --heartbeat --heartbeat-timeout 45 --fixed-server-id dbdea285-4687-4b41-925a-83eab17ac029 --log-level WARNING --package-name automatic_metabase_cache.repositories
found in the trace of the error, it works without problems.
I executed
dagster api grpc-health-check -p 3000 -h 127.0.0.1
and I got:
Copy code
<_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"
	debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2022-09-15T17:54:27.847120671+02:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused {created_time:"2022-09-15T17:54:27.847115112+02:00", grpc_status:14}]}"
>
Unable to connect to gRPC server: 0
Any idea?
d
Hi Heber - are you possibly invoking the job within your Python code? (for example, by calling execute_in_process on it or calling execute_job within the file?) You only want the file that you pass into dagit to define the job, not to execute it
h
Hello Daniel, Thanks for answering. I'm in another timezone so my answer comes a little late. Inside the job, I execute a client's method responsible of updating different Metabase queries. On the other hand, I do not call execute_in_process or execute_job within the file. Here is a copy of my job:
Copy code
import os
import yaml

from dagster import job

from automatic_metabase_cache.metabaserequester import MetabaseRequester

@job
def run_metabase_cards_queries():
    # Get metabase config file
    metabase_config_path = f"{os.path.expanduser('~')}/.dbt-metabase/config.yml"

    # Parse config file
    try:
        with open(metabase_config_path,"r") as config_file:
            metabase_config = yaml.safe_load(config_file)
    except FileNotFoundError:
        print("Metabase config file could not be found")

    # Get credentials
    metabase_user = metabase_config["config"]["metabase_user"]
    metabase_pwd = metabase_config["config"]["metabase_password"]


    # Authenticate
    requester = MetabaseRequester(metabase_user, metabase_pwd)

    # Force refresh of all card queries
    requester.run_all_cards_queries()
d
Got it - I see the problem. It looks like you have computations happening within the @job function. All of the actual work should be within an @op decorator, and then you use the @job decorator to attach ops together into a graph. Like in the example here: https://docs.dagster.io/concepts/ops-jobs-graphs/jobs#using-the-job-decorator So for your example, you would move the logic there into one or more @ops, then the @job would call the op function.
Copy code
@op
def run_metabase_cards_queries():
   ... # your work here

@job
def metabase_cards_job()
   run_metabase_cards_queries()
   # if that op had an output, you could pass that output as an input into another op here if you wanted