https://dagster.io/ logo
Title
r

Rohan Kshirsagar

02/05/2022, 6:08 PM
I had an operator that stores a dataframe to a bigquery table and when I run locally, i believe the code automatically pulls my credential file in my local home directory to authenticate.. how do secrets work with dagster cloud in the local agent configuration? I tested out the same operator in the cloud run and it just worked on the first try
d

daniel

02/05/2022, 6:27 PM
Since the agent runs your ops, I'd expect that to work the same way as in open source dagster - dagster cloud doesn't need any additional access to your secrets in order to work
r

Rohan Kshirsagar

02/05/2022, 6:28 PM
so if I terminated my local agent, the jobs synced to dagster cloud stop working?
oh i see the local agent is basically running everything locally on my machine.. so I can’t actually shut my laptop down and expect my dagster cloud pipelines to run
so whats the difference between local dagster with dagit, and dagster cloud with local agent?
sorry for all the questions im trying to figure out the fastest path how to schedule my data/ml pipelines on the cloud, with cloud compute 😅
d

daniel

02/05/2022, 6:45 PM
That's right, cloud handles scheduling/orchestration/storage/UI, and the parts that require access to your code/data happen via the agent
No problem - One big difference between local dagster with dagit is that you'll never have to worry about anything related to storage or database migrations, or hosting dagit. There's also built in SSO and RBAC features.
r

Rohan Kshirsagar

02/05/2022, 6:52 PM
okay I see thanks thats helpful and a pretty cool design. so I just need to figure out how to figure out the simplest way to run an agent with some cloud compute.. I was hoping for a more ‘lazy’ compute option that only spun up compute on scheduled runs
d

daniel

02/05/2022, 6:53 PM
If you want your pipelines to run with cloud compute, your best bet is probably to use our kubernetes or ECS agents: https://docs.dagster.cloud/deployment#the-agent
Running the local agent somewhere like EC2 would also be an option depending on your compute needs
👍 1
r

Rohan Kshirsagar

02/06/2022, 5:43 PM
I’ll probably work on that next week (using google cloud) - perhaps we can work together to generate some docs on that
d

daniel

02/06/2022, 5:46 PM
that would be great! We definitely could improve the documentation around best use of the local agent
r

Rohan Kshirsagar

02/27/2022, 8:21 PM
Getting back to this finally. If I’m on Google Cloud, which offering would be the right set up to run the local agent? Is it Google Compute Engine? Wondering if you’ve seen any examples of this integration
d

daniel

02/27/2022, 9:20 PM
GCE is probably the way to go. If you're planning to run in the cloud I'd consider trying the Docker Agent - https://docs.dagster.cloud/agents/docker/setup . That has a somewhat better CI/CD story than the local agent. For examples, we're working on a Cloudformation template for the Docker agent, that should be ready sometime this week. That's AWS not Google Cloud but could still function as an example for getting cloud infra up and running.
👍 1
r

Rohan Kshirsagar

02/28/2022, 4:35 PM
i was able to get it up and running, and the jobs are starting up fine but failing in the middle. i wonder if there are some permissions/firewall issues with gprc?
The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses"
        debug_error_string = "{"created":"@1646066017.618437961","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1646066017.618436560","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
d

daniel

02/28/2022, 4:35 PM
Would it be possible to post the full stack trace?
r

Rohan Kshirsagar

02/28/2022, 4:35 PM
2022-02-28 16:33:25 +0000 - dagster - ERROR - topic_job - 7596ba94-1f22-4222-8285-49aad84705a9 - 2757 - assign_topics - STEP_FAILURE - Execution of step "assign_topics" failed.

dagster.core.executor.child_process_executor.ChildProcessCrashException

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/core/executor/multiprocess.py", line 210, in execute
    event_or_none = next(step_iter)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/core/executor/multiprocess.py", line 323, in execute_step_out_of_process
    for ret in execute_child_process_command(multiproc_ctx, command):
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/core/executor/child_process_executor.py", line 158, in execute_child_process_command
    raise ChildProcessCrashException(exit_code=process.exitcode)

2022-02-28 16:33:26 +0000 - dagster - ERROR - topic_job - 7596ba94-1f22-4222-8285-49aad84705a9 - store_to_table - Dependencies for step store_to_table failed: ['assign_topics']. Not executing.
Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 541, in _process_api_request
    api_result = self._handle_api_request(request, instance, user_code_launcher)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 390, in _handle_api_request
    serialized_sensor_data_or_error = client.external_sensor_execution(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 292, in external_sensor_execution
    chunks = list(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 117, in _streaming_query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

The above exception was caused by the following exception:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses"
        debug_error_string = "{"created":"@1646066137.749912668","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1646066137.749911537","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 115, in _streaming_query
    yield from response_stream
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/grpc/_channel.py", line 809, in _next
    raise self
d

daniel

02/28/2022, 4:37 PM
Got it - I think those are two different errors. Where did you see the second one?
r

Rohan Kshirsagar

02/28/2022, 4:39 PM
gotcha. yeah now it erroring with the gprc error every few seconds. I tried restarting the cloud agent. im not sure if thats a function of the cloud server trying to re-execute the job
2022-02-28 16:38:41 +0000 - dagster_cloud - ERROR - dagster.core.errors.DagsterUserCodeUnreachableError: Failure loading server endpoint for location unify-news-dag: Exception: Timed out waiting for gRPC server to start with arguments: "/home/rohan/miniconda3/envs/unify_news_dag/bin/python -m dagster api grpc --lazy-load-user-code --socket /tmp/tmp07zmf4q_ --heartbeat --heartbeat-timeout 60 --log-level WARNING --use-python-environment-entry-point -d /home/rohan/news-bias --package-name unify_news_dag". Most recent connection error: dagster.core.errors.DagsterUserCodeUnreachableError: Could not reach user code server

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/server.py", line 933, in wait_for_grpc_server
    client.ping("")
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 121, in ping
    res = self._query("Ping", api_pb2.PingRequest, echo=echo)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 108, in _query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses"
        debug_error_string = "{"created":"@1646066257.267542960","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3128,"referenced_errors":[{"created":"@1646066257.267541769","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
>

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 105, in _query
    response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)


Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 410, in _reconcile
    new_updated_endpoint = self._create_new_server_endpoint(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/workspace/user_code_launcher/process.py", line 128, in _create_new_server_endpoint
    server_process = GrpcServerProcess(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/server.py", line 1091, in __init__
    self.server_process = open_server_process(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/server.py", line 1004, in open_server_process
    wait_for_grpc_server(server_process, client, subprocess_args, timeout=startup_timeout)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/server.py", line 939, in wait_for_grpc_server
    raise Exception(


Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 541, in _process_api_request
    api_result = self._handle_api_request(request, instance, user_code_launcher)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 384, in _handle_api_request
    client = self._get_grpc_client(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 269, in _get_grpc_client
    endpoint = user_code_launcher.get_grpc_endpoint(repository_location_origin)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 511, in get_grpc_endpoint
    raise DagsterUserCodeUnreachableError(
d

daniel

02/28/2022, 4:40 PM
the ChildProcessCrashException happened because the subprocess that it opened to run your op crashed. The most frequent reason i've seen for this happening is because of a memory issue in the subprocess causing it to run out of memory. You might get more useful output of the error by running the job using the in_process executor, by adding this to your run_config in the launchpad. That runs all the ops in the same process
execution:
  config:
    in_process:
I think overall the symptoms that you're seeing are consistent with your agent and jobs using more resources than you have available on your GCE box - i.e. the operating system seems to be shutting down a bunch of your processes. One of the nice things about using the docker agent is that it gets easier to enforce resource limits in the different processes that the agent spins up
r

Rohan Kshirsagar

02/28/2022, 4:44 PM
yep this makes sense. I dont remember how much memory i spawned the instance with so trying to investigate that now. I figured I’d get it working with local agent first with ample resources and then figure out how to move to docker and then have an automatic build that builds the docker
regarding the in process config - if i already have a parameter to pass in to the op it would look like this?
ops:
  store_to_table:
    config:
      param: "blah"
execution:
  config:
    in_process:
d

daniel

02/28/2022, 4:48 PM
exactly
that said if the process crashes there too due to a memory issue, that may manifest as the run getting stuck in STARTED until you manually terminate it. We're working on improving our monitoring capabilities here when the run process crashes, the agent will fairly soon detect crashed run worker processes and clean up the state in Dagit
r

Rohan Kshirsagar

02/28/2022, 4:54 PM
is this error still indicate of getting into a bad state due to the OOM?
Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 541, in _process_api_request
    api_result = self._handle_api_request(request, instance, user_code_launcher)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 390, in _handle_api_request
    serialized_sensor_data_or_error = client.external_sensor_execution(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 292, in external_sensor_execution
    chunks = list(
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster/grpc/client.py", line 117, in _streaming_query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
d

daniel

02/28/2022, 4:55 PM
It would be indicative of a process that the agent spins up crashing and no longer being reachable - you can tell the agent to spin it back up again by pressing the Redeploy button on the Workspace tab
and the most likely reason I can think of for that process crashing would a memory issue on the box
the non-local agents are better about crash recovery here - e.g. the kubernetes agent runs those servers as k8s deployments, so if a process crashes, k8s will bring it back up
👍 1
r

Rohan Kshirsagar

02/28/2022, 4:57 PM
i see a python process taking up 1.6G of memory - is that a background process that dagster-cloud runs?
d

daniel

02/28/2022, 4:58 PM
Does it say what the command is?
r

Rohan Kshirsagar

02/28/2022, 4:58 PM
just
python
d

daniel

02/28/2022, 5:00 PM
do you have a way to run
ps aux | grep <pid>
? where pid is the pid of that process? Usually that gives more information about the arguments that were passed in
r

Rohan Kshirsagar

02/28/2022, 5:00 PM
rohan     7954  4.3 40.5 4303820 1637944 pts/1 Sl+  16:55   0:13 /home/rohan/miniconda3/envs/unify_news_dag/bin/python -m dagster api grpc --lazy-load-user-code --socket /tmp/tmpu4rrl7_x --heartbeat --heartbeat-timeout 60 --log-level WARNING --use-python-environment-entry-point -d /home/rohan/news-bias --package-name unify_news_dag
i wonder if the loading of all the modules in the package (ML/NLP heavy) causes the memory bloat
if i start a run, i assume that memory will automatically double even before execution
d

daniel

02/28/2022, 5:02 PM
Yeah, I suspect that's exactly it. This actually came up in an issue a couple of days ago: https://github.com/dagster-io/dagster/issues/6824
I'd be curious if the workaround suggested there (delaying the heavy imports until the ops that actually use them) feels like an acceptable workaround
r

Rohan Kshirsagar

02/28/2022, 5:04 PM
trade off is that it could hide issues on initialization
d

daniel

02/28/2022, 5:04 PM
Yeah, exactly
If you're running in k8s we have some additional tricks we could pull here (for example, using different images or python environment for different steps, with different imports)
r

Rohan Kshirsagar

02/28/2022, 5:05 PM
im not sure how dockerizing this will help, wouldnt that only add memory?
i would love to run in k8s but as a one man team I’m not willing to invest in that yet
d

daniel

02/28/2022, 5:10 PM
Dockerizing could help by letting you run each op in its own container, and then customizing the image used for each op so that it uses only the imports it needs. But that's definitely a more complex setup (and in some cases running each op in its own container could result in more memory usage overall). So its not a silver bullet
r

Rohan Kshirsagar

02/28/2022, 5:11 PM
gotcha. where do i find an example of that set up?
d

daniel

02/28/2022, 5:16 PM
I may have spoken slightly too soon. That 'varying the image per op' pattern is currently supported in k8s but not in docker (both solutions also wouldn't really help with the memory usage of this standing server). I think the first thing I would try would be using the default multiprocess executor plus delaying the beefy imports until the bodies of the ops, just to get a baseline number on whether that reduces the memory usage to acceptable levels?
1
r

Rohan Kshirsagar

02/28/2022, 5:46 PM
so i got a beefier instance and the job ran successfully! I am seeing a transient error
dagster_cloud.storage.errors.GraphQLStorageError: Could not find a suitable TLS CA certificate bundle, invalid path: /home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/certifi/cacert.pem

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 169, in run_loop
    self._check_update_workspace(instance, user_code_launcher)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 194, in _check_update_workspace
    self._query_for_workspace_updates(instance, user_code_launcher, upload_results=False)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/agent/dagster_cloud_agent.py", line 242, in _query_for_workspace_updates
    result = instance.graphql_client.execute(WORKSPACE_ENTRIES_QUERY)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/storage/client.py", line 84, in execute
    raise GraphQLStorageError(exc.__str__()) from exc

The above exception was caused by the following exception:
OSError: Could not find a suitable TLS CA certificate bundle, invalid path: /home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/certifi/cacert.pem

Stack Trace:
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/dagster_cloud/storage/client.py", line 69, in execute
    response = <http://self._session.post|self._session.post>(self.url, **post_args)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/requests/sessions.py", line 577, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/requests/adapters.py", line 417, in send
    self.cert_verify(conn, request.url, verify, cert)
  File "/home/rohan/miniconda3/envs/unify_news_dag/lib/python3.8/site-packages/requests/adapters.py", line 228, in cert_verify
    raise IOError("Could not find a suitable TLS CA certificate bundle, "
I assume this is a google compute engine setup thing
d

daniel

02/28/2022, 5:48 PM
Oh interesting. Transient? How frequently are you seeing it?
r

Rohan Kshirsagar

03/01/2022, 12:48 AM
dont see it right now from the cloud cli. but i saw it soon after the run happened