Charles Lariviere
02/08/2021, 9:09 PMNone
or an empty list?
I have an extract
solid that pulls data from a REST API as a list of dicts, which I take as input in a to_df
solid to format as a dataframe. The API is not guaranteed to return results for a given partition, which then causes issues downstream with my IO manager and Dagster’s type validation. I could address that in the to_df
solid, but curious if there was a better pattern to use — some kind of conditional execution logic in the pipeline definition?Xu Zhang
02/09/2021, 4:32 PMXu Zhang
02/09/2021, 4:33 PMYichen
02/09/2021, 4:39 PMYichen
02/09/2021, 5:39 PMNoah K
02/09/2021, 6:07 PMantonl
02/09/2021, 7:58 PMconfigured
to curry some configuration about a solid vs using a composite_solid
? Is there a best practice when you want to curry inputs or partially configure a solid?Noah K
02/09/2021, 7:59 PMNoah K
02/09/2021, 7:59 PMNoah K
02/09/2021, 7:59 PMantonl
02/09/2021, 8:08 PMCharles Lariviere
02/09/2021, 11:35 PMDynamicOutput
to a composite_solid
? My pipeline works fine when I pass the DynamicOutput
to a single solid
, but I would like the .map()
to execute more than one solid, with the general flow being;
1. Fetch a list of IDs from the database (unknown before execution time);
2. For each; query an API, build a dataframe, output to database
But I’m getting the following error when I package step 2. as a `composite_solid`;
dagster.core.errors.DagsterSubprocessError: dagster.check.CheckError: Member of list mismatches type. Expected <class 'dagster.core.execution.plan.inputs.StepInput'>. Got UnresolvedStepInput(name='id', dagster_type_key='Int', source=FromPendingDynamicStepOutput(step_output_handle=StepOutputHandle(step_key='query_records', output_name='id', mapping_key=None), solid_handle=SolidHandle(name='do_multiple_steps', parent=None), input_name='id')) of type <class 'dagster.core.execution.plan.inputs.UnresolvedStepInput'>.
Brian Abelson
02/10/2021, 12:24 AM0.10.4
. Everything runs fine, except the scheduler seems to continually shut down after about 2-3 hours with the following error (pasted below). It seems that I have to restart the daemon continually to address this. is this normal? is there a way to suppress these errors? I'm invoking daagster-daemonn
via supervisord
with the simple run
commannd.
dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
with RepositoryLocationHandle.create_from_repository_location_origin(
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
self.grpc_server_process = GrpcServerProcess(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
self.server_process = open_server_process(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
wait_for_grpc_server(server_process, output_file)
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
raise DagsterIPCProtocolError(
Josh Taylor
02/10/2021, 6:17 AMpostgres_url
in pg_config
is a str instead of a StringSource? This would allow setting the entire url via environment variable in the yaml, the postgres_db is setup like this:
"postgres_db": {
"username": StringSource,
"password": StringSource,
"hostname": StringSource,
"db_name": StringSource,
"port": Field(IntSource, is_required=False, default_value=5432),
},
It looks like this would work, but it's static?
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_url: "<postgresql://test:test@{hostname}:5432/test>"
Hamza Khurshid Butt
02/10/2021, 8:09 AM@daily_schedule
however when its time comes to run, it does not run instead scheduler simply skips this pipeline, however i run the same pipeline using cron_schedule
and scheduler runs this pipeline in this case !!! 🤐
Here is the screenshot:
Thank You!Thomas
02/10/2021, 8:41 AMWaqas Awan
02/10/2021, 10:08 AMscheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
[
{ key:"test", value:"two", limit:2 }
]
While running the program it says following:
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for {'max_concurrent_runs': Field(<dagster.config.config_type.Int object at 0x7fd295c8ea30>, default=@, is_required=False), 'tag_concurrency_limits': Field(<dagster.config.config_type.Noneable object at 0x7fd29b096dc0>, default=@, is_required=False), 'dequeue_interval_seconds': Field(<dagster.config.config_type.Int object at 0x7fd295c8ea30>, default=@, is_required=False)}.
Error 1: Received unexpected config entries "['key:"test"', 'limit:2']" at path root:tag_concurrency_limits[0]. Expected: "['key', 'limit', 'value']."
Error 2: Missing required config entries "['key', 'limit']" at path root:tag_concurrency_limits[0]".
Error 3: Received unexpected config entries "['key:"test1"', 'limit:5']" at path root:tag_concurrency_limits[1]. Expected: "['key', 'limit', 'value']."
Error 4: Missing required config entries "['key', 'limit']" at path root:tag_concurrency_limits[1]".
I am using the exact syntax based on docs: https://docs.dagster.io/overview/pipeline-runs/limiting-run-concurrency#mainjonathan
02/10/2021, 3:17 PMSimon Späti
02/10/2021, 3:24 PMDagsterDataType
from a PysparkDataFrame
. I have a generic solid load_delta_table_to_df
, but in my Pipeline I'd like to type-check that the returned DataFrame has certain columns (not always the same see example attached). I try to achieve that with custom DagsterType NpsDataFrame
and TagDataFrame
in my pipeline (see attachment), but that will not show the type in Dagit. How could I use a generic solid but returning different typed DataFrames? I'd like to see NpsDataFrame and TagDataFrame instead of generic PySparkDataFrame. Any best practices? Or should I add an additional parameter to load_delta_table_to_df
where I define the output DataFrame? Thanks a lot guys!Andy H
02/10/2021, 5:18 PMFran Sanchez
02/10/2021, 6:14 PMPaul Wyatt
02/10/2021, 6:37 PMdagster.core.errors.DagsterInvalidDefinitionError: Invalid type: dagster_type must be DagsterType, a python scalar, or a python type that has been marked usable as a dagster type via @usable_dagster_type or make_python_type_usable_as_dagster_type: got typing.NoReturn
on something that was previously working.
Is there a dagster typed equivalent of NoReturn or should I mark NoReturn as usable?Yichen
02/10/2021, 6:48 PMhttps://youtu.be/lodcK3Z3TUs▾
antonl
02/10/2021, 8:43 PMPaul Wyatt
02/11/2021, 2:55 AMOperation name: RunsRootQuery
Message: Exactly 5 or 6 columns has to be specified for iteratorexpression.
Path: ["repositoriesOrError","nodes",1,"schedules",1,"futureTicks"]
Locations: [{"line":183,"column":3}]
I'm hopeful that moving off the system cron scheduler will remediate, but any guidance is nonetheless helpfulJosh Taylor
02/11/2021, 4:59 AMdhume
02/11/2021, 2:06 PMworkspace.yaml
? I tried similar syntax to the dagster.yaml
and that didn’t seem to workRubén Lopez Lozoya
02/11/2021, 3:58 PMBrian Abelson
02/11/2021, 6:04 PMOperation name: SchedulesRootQuery
Message: Invariant failed.
Path: ["repositoryOrError","schedules",0,"futureTicks"]
Locations: [{"line":130,"column":3}]
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/graphql/execution/executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File "/usr/local/lib/python3.8/site-packages/graphql/execution/executors/sync.py", line 16, in execute
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster_graphql/schema/schedules/schedules.py", line 96, in resolve_futureTicks
tick_times.append(next(time_iter).timestamp())
File "/usr/local/lib/python3.8/site-packages/dagster/utils/schedules.py", line 27, in schedule_execution_time_iterator
check.invariant(len(cron_parts) == 5)
File "/usr/local/lib/python3.8/site-packages/dagster/check/__init__.py", line 172, in invariant
raise_with_traceback(CheckError("Invariant failed."))
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
Michael T
02/11/2021, 6:22 PMThe Dagster type system is independent from the PEP 484 Python type system, although we overload the type annotation syntax on functions to make it easier to specify the input and output types of your solids.
Michael T
02/11/2021, 6:22 PMThe Dagster type system is independent from the PEP 484 Python type system, although we overload the type annotation syntax on functions to make it easier to specify the input and output types of your solids.
sandy
02/11/2021, 6:45 PMantonl
02/11/2021, 6:47 PMsandy
02/11/2021, 6:49 PM@solid(
input_defs=[InputDefinition(dagster_type=create_pandas_dataframe_type(/* express column constraints */))],
output_defs=[OutputDefinition(dagster_type=create_pandas_dataframe_type(/* express column constraints */))]
def my_solid(_, input1: pd.DataFrame) -> pd.DataFrame:
...
Michael T
02/11/2021, 7:07 PMantonl
02/11/2021, 7:09 PMMichael T
02/11/2021, 7:11 PMantonl
02/11/2021, 7:13 PMMichael T
02/11/2021, 7:15 PMantonl
02/11/2021, 7:16 PMTYPE_CHECKING
block, and create aliases to dagster.Any
otherwise.sandy
02/11/2021, 7:19 PM@solid
def my_solid(_, input1: pd.DataFrame) -> pd.DataFrame:
...
Diff: https://dagster.phacility.com/D5115Michael T
02/11/2021, 7:19 PMantonl
02/11/2021, 7:21 PM_
. There is a @lambda_solid
decorator that doesn’t have this argument, but I think that may be going away. It makes the API hard to learn if there are multiple (sometimes-equivalent) ways of doing something.Michael T
02/11/2021, 7:22 PMsandy
02/11/2021, 7:22 PMantonl
02/11/2021, 7:24 PMsandy
02/11/2021, 7:25 PMMichael T
02/11/2021, 7:26 PMwith load_config as context:
)antonl
02/11/2021, 7:27 PM@configured
decorator that allows you to define solids with some configuration baked in.Michael T
02/11/2021, 7:27 PMantonl
02/11/2021, 7:28 PMsandy
02/11/2021, 7:28 PMwith load_config as context
would be outside the solid definition or inside?Michael T
02/11/2021, 7:31 PMsandy
02/11/2021, 7:35 PMMichael T
02/11/2021, 7:38 PMsandy
02/11/2021, 7:45 PM_
argument in the cases where I don't need access to a solid's context. As mentioned above, we used to put more emphasis on APIs like lambda_solid
that allowed users to avoid this, but ended up not finding it to be a big sticking point.Michael T
02/11/2021, 7:49 PM