Yichen
02/09/2021, 4:39 PMYichen
02/09/2021, 5:39 PMNoah K
02/09/2021, 6:07 PMantonl
02/09/2021, 7:58 PMconfigured
to curry some configuration about a solid vs using a composite_solid
? Is there a best practice when you want to curry inputs or partially configure a solid?Noah K
02/09/2021, 7:59 PMNoah K
02/09/2021, 7:59 PMNoah K
02/09/2021, 7:59 PMantonl
02/09/2021, 8:08 PMCharles Lariviere
02/09/2021, 11:35 PMDynamicOutput
to a composite_solid
? My pipeline works fine when I pass the DynamicOutput
to a single solid
, but I would like the .map()
to execute more than one solid, with the general flow being;
1. Fetch a list of IDs from the database (unknown before execution time);
2. For each; query an API, build a dataframe, output to database
But I’m getting the following error when I package step 2. as a `composite_solid`;
dagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='id', dagster_type_key='Int', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='query_records', output_name='id', mapping_key=None), solid_handle=SolidHandle(name='do_multiple_steps', parent=None), input_name='id')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
Brian Abelson
02/10/2021, 12:24 AM0.10.4
. Everything runs fine, except the scheduler seems to continually shut down after about 2-3 hours with the following error (pasted below). It seems that I have to restart the daemon continually to address this. is this normal? is there a way to suppress these errors? I'm invoking daagster-daemonn
via supervisord
with the simple run
commannd.
dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
with RepositoryLocationHandle.create_from_repository_location_origin(
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
self.grpc_server_process = GrpcServerProcess(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
self.server_process = open_server_process(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
wait_for_grpc_server(server_process, output_file)
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
raise DagsterIPCProtocolError(
Josh Taylor
02/10/2021, 6:17 AMpostgres_url
in pg_config
is a str instead of a StringSource? This would allow setting the entire url via environment variable in the yaml, the postgres_db is setup like this:
"postgres_db": {
"username": StringSource,
"password": StringSource,
"hostname": StringSource,
"db_name": StringSource,
"port": Field(IntSource, is_required=False, default_value=5432),
},
It looks like this would work, but it's static?
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_url: "<postgresql://test:test@{hostname}:5432/test>"
Hamza Khurshid Butt
02/10/2021, 8:09 AM@daily_schedule
however when its time comes to run, it does not run instead scheduler simply skips this pipeline, however i run the same pipeline using cron_schedule
and scheduler runs this pipeline in this case !!! 🤐
Here is the screenshot:
Thank You!Thomas
02/10/2021, 8:41 AMWaqas Awan
02/10/2021, 10:08 AMscheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
[
{ key:"test", value:"two", limit:2 }
]
While running the program it says following:
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for {'max_concurrent_runs': Field(<dagster.config.config_type.Int object at 0x7fd295c8ea30>, default=@, is_required=False), 'tag_concurrency_limits': Field(<dagster.config.config_type.Noneable object at 0x7fd29b096dc0>, default=@, is_required=False), 'dequeue_interval_seconds': Field(<dagster.config.config_type.Int object at 0x7fd295c8ea30>, default=@, is_required=False)}.
Error 1: Received unexpected config entries "['key:"test"', 'limit:2']" at path root:tag_concurrency_limits[0]. Expected: "['key', 'limit', 'value']."
Error 2: Missing required config entries "['key', 'limit']" at path root:tag_concurrency_limits[0]".
Error 3: Received unexpected config entries "['key:"test1"', 'limit:5']" at path root:tag_concurrency_limits[1]. Expected: "['key', 'limit', 'value']."
Error 4: Missing required config entries "['key', 'limit']" at path root:tag_concurrency_limits[1]".
I am using the exact syntax based on docs: https://docs.dagster.io/overview/pipeline-runs/limiting-run-concurrency#mainjonathan
02/10/2021, 3:17 PMSimon Späti
02/10/2021, 3:24 PMDagsterDataType
from a PysparkDataFrame
. I have a generic solid load_delta_table_to_df
, but in my Pipeline I'd like to type-check that the returned DataFrame has certain columns (not always the same see example attached). I try to achieve that with custom DagsterType NpsDataFrame
and TagDataFrame
in my pipeline (see attachment), but that will not show the type in Dagit. How could I use a generic solid but returning different typed DataFrames? I'd like to see NpsDataFrame and TagDataFrame instead of generic PySparkDataFrame. Any best practices? Or should I add an additional parameter to load_delta_table_to_df
where I define the output DataFrame? Thanks a lot guys!Andy H
02/10/2021, 5:18 PMFran Sanchez
02/10/2021, 6:14 PMPaul Wyatt
02/10/2021, 6:37 PMdagster.core.errors.DagsterInvalidDefinitionError: Invalid type: dagster_type must be DagsterType, a python scalar, or a python type that has been marked usable as a dagster type via @usable_dagster_type or make_python_type_usable_as_dagster_type: got typing.NoReturn
on something that was previously working.
Is there a dagster typed equivalent of NoReturn or should I mark NoReturn as usable?Yichen
02/10/2021, 6:48 PMantonl
02/10/2021, 8:43 PMPaul Wyatt
02/11/2021, 2:55 AMOperation name: RunsRootQuery
Message: Exactly 5 or 6 columns has to be specified for iteratorexpression.
Path: ["repositoriesOrError","nodes",1,"schedules",1,"futureTicks"]
Locations: [{"line":183,"column":3}]
I'm hopeful that moving off the system cron scheduler will remediate, but any guidance is nonetheless helpfulJosh Taylor
02/11/2021, 4:59 AMdhume
02/11/2021, 2:06 PMworkspace.yaml
? I tried similar syntax to the dagster.yaml
and that didn’t seem to workRubén Lopez Lozoya
02/11/2021, 3:58 PMBrian Abelson
02/11/2021, 6:04 PMOperation name: SchedulesRootQuery
Message: Invariant failed.
Path: ["repositoryOrError","schedules",0,"futureTicks"]
Locations: [{"line":130,"column":3}]
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/graphql/execution/executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File "/usr/local/lib/python3.8/site-packages/graphql/execution/executors/sync.py", line 16, in execute
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/schema/schedules/schedules.py", line 96, in resolve_futureTicks
tick_times.append(next(time_iter).timestamp())
File "/usr/local/lib/python3.8/site-packages/dagster/utils/schedules.py", line 27, in schedule_execution_time_iterator
check.invariant(len(cron_parts) == 5)
File "/usr/local/lib/python3.8/site-packages/dagster/check/__init__.py", line 172, in invariant
raise_with_traceback(CheckError("Invariant failed."))
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
Michael T
02/11/2021, 6:22 PMThe Dagster type system is independent from the PEP 484 Python type system, although we overload the type annotation syntax on functions to make it easier to specify the input and output types of your solids.
Basil V
02/11/2021, 6:58 PMdocker-compose up
but something is not working. I'm not getting any errors however the application is not available at localhost:3000. Just to confirm, should this example be runnable as-is or is any other configuration required? Given that I'm not getting any errors, I'm thinking the problem may be with exposing port 3000. I also ran lsof -i:3000
locally and it shows nothing, making me think the container port isn't being exposed properly. The docker compose file does include the line expose: - "3000"
in the dagit container though, so not sure what is wrong. Anybody have any suggestions? Thanks so much!user
02/11/2021, 8:34 PMchris
02/11/2021, 8:58 PM