chrispc
08/16/2021, 3:53 PMDaniel Salama
08/16/2021, 5:07 PM@pipeline
def pipe1:
run_pipe2()
@pipeline
def pipe2:
# printing debug messages
@solid
def run_pipe2():
execute_pipeline(pipe2)
so pipe1 runs pipe2, but in the UI you won’t see the pipe2 running or printing logs .Lian Jiang
08/16/2021, 6:24 PMWilliam Reed
08/17/2021, 2:19 AMmultiprocessing
library to spawn child processes. Is there something we’re missing? We thought we could use any valid Python in our solids.Utkarsh
08/17/2021, 5:45 AMGeorge Pearse
08/17/2021, 8:40 AMPablo Villalobos
08/17/2021, 10:01 AMsolid(a, b)
and I have dynamic lists A
and B
. Ideally I'd want to do something like zip(A,B).map(lambda a,b: solid(a,b))
doom leika
08/17/2021, 10:10 AM0.13
? I am planning to migrate to the new system but I would like to know the timelineNavneet Sajwan
08/17/2021, 11:08 AMDylan Hunt
08/17/2021, 11:13 AMDevaraj Nadiger
08/17/2021, 12:42 PMDylan Hunt
08/17/2021, 2:01 PMSuraj Narwade
08/17/2021, 2:55 PMdagster-infra
namespace and I installed user deployment & configured job namespace in the dagster-user
namespace.
Along with this, I populated Postgresql credentials secret in both the namespace as required.
my pipeline example reads environment variable which comes from the secret.
for example name of k8s secret my pipeline example needs is topsecret
which is present in the dagster-user
namespace & also updated in dagster-instance configmap as env_secrets
in the dagster-user namespace itself.
now with this setup ideally my job pod should get the secret injected but it is not.
though it works when I update the dagster-instance configmap from the dagster-infra
namespace which is consumed by daemon with the same env_secrets
after doing that I've noticed these secrets gets appended into user deployments as well as jobs.
based on this, I've the following questions:
• Why does the daemon need to know about secrets?
• Why does user deployment gets the secrets injected?
• is there any other way to tell daemon that my pipeline needs the given secret?Lily Grier
08/17/2021, 2:57 PMHuib
08/17/2021, 3:13 PMdoom leika
08/17/2021, 4:53 PMmake_values_resource
feels awkward
https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-solids-in-a-pipelinedoom leika
08/17/2021, 4:58 PMchrispc
08/17/2021, 5:14 PMChristian Lam
08/17/2021, 5:31 PMrun_status_sensor
to run on the completion of multiple pipelines? From what I have tested, passing multiple pipelines to the pipeline_selection
parameter will make the sensor function run if any of the selected pipelines completes.George Pearse
08/17/2021, 8:27 PMOperation name: JobMetadataQuery
Message: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout
[SQL: SELECT event_logs.id, event_logs.event
FROM event_logs
WHERE event_logs.run_id = %(run_id_1)s ORDER BY event_logs.id ASC
LIMIT ALL OFFSET %(param_1)s]
[parameters: {'run_id_1': 'ec1ead9e-874a-4de0-b0a8-da5f0b544890', 'param_1': 0}]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Path: ["pipelineRunsOrError","results",0,"assets"]
Locations: [{"line":60,"column":3}]
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/graphql/execution/executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File "/usr/local/lib/python3.7/site-packages/graphql/execution/executors/sync.py", line 16, in execute
return fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/schema/pipelines/pipeline.py", line 270, in resolve_assets
return get_assets_for_run_id(graphene_info, self.run_id)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/fetch_assets.py", line 59, in get_assets_for_run_id
records = graphene_info.context.instance.all_logs(run_id)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1013, in all_logs
return self._event_storage.get_logs_for_run(run_id, of_type=of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 234, in get_logs_for_run
events_by_id = self.get_logs_for_run_by_log_id(run_id, cursor, of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 201, in get_logs_for_run_by_log_id
results = conn.execute(query).fetchall()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1263, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
self, multiparams, params, execution_options
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1462, in _execute_clauseelement
cache_hit=cache_hit,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1815, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1996, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1772, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
But found the screenshot below when I actually looked through the logs and think the error presented to the Dagit UI was just 'directly caused' by this error.William Reed
08/17/2021, 9:18 PMserviceAccountName
for the job pods that get run via the K8sRunLauncher? I’ve tried a number of combinations in the tags
portion of my pipeline/solids to no avail.doom leika
08/17/2021, 11:36 PMRunLauncher
and what's executioner
?Devaraj Nadiger
08/18/2021, 8:15 AMdoom leika
08/18/2021, 9:00 AMjorge.arada
08/18/2021, 10:43 AMjorge.arada
08/18/2021, 10:46 AMjorge.arada
08/18/2021, 10:47 AMjeremy
08/18/2021, 12:09 PMdagster.yaml
to point to the original instance for the db.
workspace.yaml
on the first instance points to the second on port 4000, which is exposed.
Unfortunatly I am still getting inactive grpc channel on the dagit status.
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1629281721.636797067","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3008,"referenced_errors":[{"created":"@1629281721.636770438","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}" >
Any idea what I'm missing?Ritasha Verma
08/18/2021, 1:12 PMchrispc
08/18/2021, 3:58 PMFileNotFoundError: [WinError 2] The system cannot find the file specified
File "C:\Users\***\Anaconda3\envs\borrar\lib\site-packages\dagster\core\host_representation\grpc_server_registry.py", line 177, in _get_grpc_endpoint
server_process = GrpcServerProcess(
File "C:\Users\****\Anaconda3\envs\borrar\lib\site-packages\dagster\grpc\server.py", line 1060, in __init__
self.server_process, self.port = open_server_process_on_dynamic_port(
File "C:\Users\****\Anaconda3\envs\borrar\lib\site-packages\dagster\grpc\server.py", line 1011, in open_server_process_on_dynamic_port
server_process = open_server_process(
File "C:\Users\****\Anaconda3\envs\borrar\lib\site-packages\dagster\grpc\server.py", line 978, in open_server_process
server_process = open_ipc_subprocess(subprocess_args)
File "C:\Users\*****\Anaconda3\envs\borrar\lib\site-packages\dagster\serdes\ipc.py", line 195, in open_ipc_subprocess
return subprocess.Popen(parts, creationflags=creationflags, **kwargs)
File "C:\Users\*****\Anaconda3\envs\borrar\lib\subprocess.py", line 858, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "C:\Users\s4957336\Anaconda3\envs\borrar\lib\subprocess.py", line 1311, in _execute_child
hp, ht, pid, tid = _winapi.CreateProcess(executable, args,