Charles Lariviere
03/18/2021, 1:52 PMm5.large
up to 5; CPU 50% threshold), and also have the k8s cluster autoscaler. On the Dagster side, I use the K8sRunLauncher
with the QueuedRunCoordinator
and a concurrency limit of 10 on backfills.
I ran a backfill job yesterday (~300 hourly partitions) which were for the vast majority not doing any heavy lifting (sending an API request, not getting anything back, and simply running through the Dagster pipeline without doing any computations). Though, this pushed both nodes to 100% CPU before the autoscaling kicked in, and even after autoscaling, CPU would periodically reach 100% on one or more nodes and cause a few different issues;
• some pipeline runs became zombies
• dagit was periodically unresponsive
• got these “framework errors” and “inactive rpc error” thrown periodically which caused pipeline failure (see thread)
Is this just a matter of “get larger instances”? I would have expected an m5.large
to be able to run through 2~5 “empty” Dagster pipelines in parallel, but maybe that’s too much. Is it a common strategy to run Dagit, the Daemon, and our User Code pods on distinct nodes than the ones handling the pipeline runs? If so, I would absolutely love to hear how to achieve something like this!An exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.check.CheckError: Invariant failed. Description: Pipeline run etl_mailjet_bounce_events_pipeline (eeb7698a-f006-4973-92ac-f29a83bc4ae3) in state PipelineRunStatus.STARTED, expected NOT_STARTED or STARTING
Caught an error for run f43dfc69-3969-4968-9288-26ab3d0c6ceb while removing it from the queue. Marking the run as failed and dropping it from the queue: grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1616030966.874849696","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":5396,"referenced_errors":[{"created":"@1616030966.874844846","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}"
rex
03/18/2021, 7:59 PMIs it a common strategy to run Dagit, the Daemon, and our User Code pods on distinct nodes than the ones handling the pipeline runsThis is a pretty good idea! You can accomplish this using node selectors or taints/tolerations
Charles Lariviere
03/18/2021, 9:12 PMyou can increase the request cpu when the pipeline is launched for two reasons:
1. Node CPU utilization does not approach 100%
2. Your autoscaling gets triggered earlierI’m not familiar with that — are you referring to the
dagster-k8s/config
tag that we can set on solids as described here?rex
03/18/2021, 9:14 PMCharles Lariviere
03/18/2021, 9:14 PMdagster-k8s/config
tag be added to a preset
as well, or is it limited to solid
/ pipeline
?
I’m getting errors about invalid tag format when adding it to a preset, which I’m then using to automatically pull tags when defining schedules (as described here) — curious if this is by design, or an error on my part;
dagster.check.CheckError: Value in dictionary mismatches expected type for key dagster-k8s/config. Expected value of type <class 'str'>.
Rubén Lopez Lozoya
03/24/2021, 2:01 PMCharles Lariviere
03/24/2021, 2:06 PMtags
parameter in PresetDefinition
expects a Dict[str, str]
, while elsewhere (ex. pipeline or solid) tags
allow Dict[str, Any]
. I understood this may have been an oversight and would be updated.
What I ended up doing at first was passing the k8s config to every pipeline. I later found out the extraManifests
section of the values.yaml
which appears to allow for setting default `requests`/`limits` (or other configs you may want to pass) at the cluster level. I now just pass the k8s config tag when I want to overwrite the defaults.Rubén Lopez Lozoya
03/24/2021, 2:08 PM