Abdo Mohamed
09/09/2021, 10:37 AMkir
09/09/2021, 3:11 PMDarren Ng
09/09/2021, 3:12 PM@solid(required_resource_keys={"value"})
to both solids and use them within the solids by context.resources.value
. I have also added into the pipeline decorator mode_defs=[ModeDefinition(resource_defs={"value": make_values_resource()})]
and included run_config={"resources": {"value": {"config": token}}}
in execute_pipeline
where token
is my token for both solids in string. Running it with dagit gives me this error
/home/dazza/anaconda3/envs/versedai/lib/python3.8/site-packages/dagster/core/workspace/context.py:504: UserWarning: Error loading repository location gather_pipeline.py:dagster.core.errors.DagsterInvalidDefinitionError: Resource key "value" is required by solid def query_NLP_data, but is not provided by mode "default". In mode "default", provide a resource for key "value", or change "value" to one of the provided resources keys: ['io_manager', 'values'].
What have I done wrong here?chrispc
09/09/2021, 4:48 PMJeremy Fisher
09/09/2021, 9:13 PMJeremy Fisher
09/09/2021, 9:13 PMArun Kumar
09/09/2021, 10:35 PMkir
09/10/2021, 9:37 AMDavid Hyman
09/10/2021, 1:02 PM0.12.9
does anyone have any hints/tips/corroborative tales?
• I have some solid factories that are invoked at the module level, defining some solids (that happen to yield dynamic outputs )
• typically on the first run through, everything works as expected
• if I start a brand new run for the same pipeline, or attempt a rerun, or try either of those having fully restarted dagit + dagster-daemon, I get one of the two errors below
• dagster/check/__init__.py", line 170
dagster.check.CheckError: Invariant failed. Description: pipeline_X has no solid named factory_solid_Y.
occasionally mid-run (i.e. runtime failure)
dagster/core/execution/plan/plan.py", line 684
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 637582: dagster.core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: factory_solid_Y
this is in a development environment, so using QueuedRunCoordinator, SqliteRunStorage, multiprocess execution
repeatedly trying to 'launch execution' or 'rexecute all' from dagit can get me through the first error. might then catch the second error, or might succeed entirely (or get caught on a downstream failure, which is fine)
I have a feeling there's something weird happening with the solids being called once on module import. Seems like valid Python to me. Not yet sure how much time & effort it's going to take to go back to basics for a minimal reproduction...
by way of the simplest example I can imagine (solid_factory could look like the one in the docs, or even simpler)
@solid
def regular_solid():
# works as expected
pass
factory_solid_Y = solid_factory(some_closure_args) # flaky
Carlos Sanoja
09/10/2021, 3:59 PM@daily_schedule(
pipeline_name="topic_models_pipeline",
start_date=datetime.datetime(2021, 1, 1),
execution_time=datetime.time(11, 47),
execution_timezone="US/Eastern",
)
def topics_schedule(date):
return {
"solids": {
"load_data": {
"inputs": {"date": {"value": date.strftime("%Y-%m-%d")}}
}
}
}
However, when I run it, it asks me that it must be running the dagster daemon, so I proceed to activate the following command
dagster-daemon run
However, nothing happens, dagit does not seem to detect that it is running, from the demon terminal I get the following output:
2021-09-10 08:46:18 - dagster-daemon - INFO - instance is configured with the following daemons: ['BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2021-09-10 08:46:18 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-09-10 08:46:18 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-09-10 08:46:18 - BackfillDaemon - INFO - No backfill jobs requested.
2021-09-10 08:46:48 - BackfillDaemon - INFO - No backfill jobs requested.
2021-09-10 08:46:48 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-09-10 08:47:18 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-09-10 08:47:18 - BackfillDaemon - INFO - No backfill jobs requested.
2021-09-10 08:47:18 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-09-10 08:47:48 - BackfillDaemon - INFO - No backfill jobs requested.
2021-09-10 08:47:48 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
It is important to note that both processes run in a pipenv environment
, that is, in both terminals, I must start the environment and then run the daemon and in the other start the dagit.Carlos Sanoja
09/10/2021, 7:29 PMdagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline topic_models_pipeline
Error 1: Missing required config entry "resources" at the root. Sample config for missing entry: {'resources': {'slack': {'config': {'token': '...'}}}}
But I actually do have those configuration parameters and I can run them through the dagit playground interface clicking in the Launch button.
When running the schedule, does this process look for this env variables someplace else?Nathan Saccon
09/10/2021, 8:02 PMChris Chan
09/10/2021, 9:20 PMArun Kumar
09/11/2021, 1:59 AMDevaraj Nadiger
09/13/2021, 4:22 AMpipeline1_solid:
config:
param1:temp
param2: temp2
pipeline2_solid:
config:
param1:temp
param2: temp2
Basically I want to have individual pipeline solid configuration in single YAML config file.Alexander Shirokov
09/13/2021, 8:07 AM__init__.py
repositories.py
solids
solids.py
pipelines
pipelines.py
schedules
schedules.py
Is there any other possibilities?Alejandro DE LA CRUZ LOPEZ
09/13/2021, 8:25 AMconfigSource?
Benoit Perigaud
09/13/2021, 9:13 AMRubén Lopez Lozoya
09/13/2021, 12:03 PMPIPELINE_FAILURE
Caught an error for run a564083e-8f75-45a9-b9a2-63e5382d5e52 while removing it from the queue. Marking the run as failed and dropping it from the queue: grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1631520273.921130387","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3008,"referenced_errors":[{"created":"@1631520273.921127399","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}"
>
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 154, in run_iteration
self._dequeue_run(instance, run, workspace)
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 206, in _dequeue_run
location = workspace.get_location(repository_location_origin)
File "/usr/local/lib/python3.7/site-packages/dagster/cli/workspace/dynamic_workspace.py", line 36, in get_location
location = existing_location if existing_location else origin.create_location()
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/origin.py", line 255, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/repository_location.py", line 478, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.7/site-packages/dagster/api/list_repositories.py", line 13, in sync_list_repositories_grpc
api_client.list_repositories(), (ListRepositoriesResponse, SerializableErrorInfo)
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 143, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 89, in _query
response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
Dylan Hunt
09/13/2021, 2:34 PMLiu Cao
09/13/2021, 4:28 PMCeleryK8sRunLauncher
which allows us to have resource/compute separation of solids within a pipeline, but seems like dagster doesn’t allow us to have package / environment separation natively.Chris Evans
09/13/2021, 8:59 PMtag_concurrency_limits
. I am trying to control concurrency of runs via tags. I have setup a test locally w/ the docker run launcher where I have been manually triggering the below pipeline at approx the same time via Dagit. While I expect them to run one a time, each run immediately starts. Do I have this test setup correctly?
dagster.yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: test
value: value
limit: 1
repo.py
@solid
def sleep():
time.sleep(60)
@pipeline(tags={'test': 'value'})
def sleep_graph():
sleep()
@repository
def repository() -> List[Any]:
return [
sleep_graph,
]
Chris Chan
09/13/2021, 9:14 PMdagster.check.ParameterCheckError: Param "run_launcher" is not a RunLauncher. Got None which is type <class 'NoneType'>.
I’m having a hard time figuring out what I’m doing wrongXu Zhang
09/13/2021, 10:00 PMnew_run_id: str = client.submit_pipeline_execution(
'hello_graph',
run_config={},
mode="default",
)
Xu Zhang
09/13/2021, 10:03 PMNavneet Sajwan
09/14/2021, 5:58 AMNavneet Sajwan
09/14/2021, 6:02 AMRemi Gabillet
09/14/2021, 10:51 AMraaid
09/14/2021, 12:53 PMasset_sensor
access the asset metadata? I thought perhaps the asset_event
input would provide an avenue for that, but I think it is of type EventLogEntry
and that doesn’t have the asset’s metadata accessible as far as I can tellDylan Hunt
09/14/2021, 2:42 PM