Manny Schneck
12/06/2021, 10:55 PMManny Schneck
12/06/2021, 10:55 PMConsider using the module-based options `-m` for CLI-based targets or the `python_package` workspace.yaml target.
Ryan Riopelle
12/06/2021, 11:57 PMEric
12/07/2021, 12:30 AMResource
that encapsulated all the methods I would need for communicating with my rest api. However, the problem I'm running into is I don't know how to pass arguments to a resources method. My current workaround is to define the book_name
as Noneable and would only be defined in jobs that use this method within the resource but something seems strange with this setup.
Perhaps I'm thinking / going about this the wrong way ?
Example code below shows where I'm stuck trying to pass in a book_name
into the method fetch_book_by_name
of the resource.
class MyBooksRESTApi:
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.json_headers = {
"Content-Type": "application/json"
}
# how to pass book_name argument ?
def fetch_book_by_name(self, book_name):
response = requests.get(
f"<https://www.mycoolsite.com/api/books/{book_name}>",
auth=HTTPBasicAuth(self.username, self.password),
headers=self.json_headers
)
return response
# example method that does NOT need book_name defined
def fetch_all_books(self):
response = requests.get(
f"<https://www.mycoolsite.com/api/books>",
auth=HTTPBasicAuth(self.username, self.password),
headers=self.json_headers
)
return response
@resource(config_schema={
"host": String,
"username": String,
"password": String,
"book_name": Noneable
})
def my_books_rest_api_resource(init_context):
return MyBooksRESTApi(
init_context.resource_config["host"],
init_context.resource_config["snow_user_email"],
init_context.resource_config["snow_user_password"]
)
Thomas Mignon
12/07/2021, 9:25 AMexecution:
dask:
config:
cluster:
pbs:
queue: mpi_4
project: dagster
walltime: '00:05:00'
memory: '120gb'
cores: 28
resource_spec: 'select=4:ncpus=28:mem=120gb'
n_workers: 1
I got this error :
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "dask" at path root:execution. Expected: "{ config: { cluster: { existing: { address: (String | { env: String }) } kube(/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11) semexp@datarmor3:/home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/pipelines> Error 2: Missing required config entry "config" at path root:execution. Sample config for missing entry: {'config': {'cluster': '<selector>'}}
And my job looks like that :
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
template_runner_op()
Matthias Queitsch
12/07/2021, 5:13 PM2021-12-07 17:08:59 - SensorDaemon - ERROR - Sensor daemon caught an error for sensor Generic_Usage_Scenario_Sensor : grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "
{
"created": "@1638896939.637237528",
"description": "Failed to pick subchannel",
"file": "src/core/ext/filters/client_channel/client_channel.cc",
"file_line": 3093,
"referenced_errors": [
{
"created": "@1638896939.637236219",
"description": "failed to connect to all addresses",
"file": "src/core/lib/transport/error_utils.cc",
"file_line": 163,
"grpc_status": 14
}
]
}
"
>
Stack Trace:
File "/usr/local/lib/python3.9/site-packages/dagster/daemon/sensor.py", line 191, in execute_sensor_iteration
repo_location = workspace.get_location(origin)
File "/usr/local/lib/python3.9/site-packages/dagster/core/workspace/dynamic_workspace.py", line 36, in get_location
location = existing_location if existing_location else origin.create_location()
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/origin.py", line 271, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/repository_location.py", line 495, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.9/site-packages/dagster/api/list_repositories.py", line 14, in sync_list_repositories_grpc
deserialize_json_to_dagster_namedtuple(api_client.list_repositories()),
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 163, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 110, in _query
response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
And for the dagit service:
/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py:538: UserWarning: Error loading repository location analytics-opal-pipelines:grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "
{
"created": "@1638894475.880671826",
"description": "Failed to pick subchannel",
"file": "src/core/ext/filters/client_channel/client_channel.cc",
"file_line": 3093,
"referenced_errors": [
{
"created": "@1638894475.880670730",
"description": "failed to connect to all addresses",
"file": "src/core/lib/transport/error_utils.cc",
"file_line": 163,
"grpc_status": 14
}
]
}
"
>
Stack Trace:
File "/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py", line 535, in _load_location
location = self._create_location_from_origin(origin)
File "/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py", line 454, in _create_location_from_origin
return origin.create_location()
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/origin.py", line 271, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/repository_location.py", line 495, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.9/site-packages/dagster/api/list_repositories.py", line 14, in sync_list_repositories_grpc
deserialize_json_to_dagster_namedtuple(api_client.list_repositories()),
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 163, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 110, in _query
response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
warnings.warn(
Loading repository...
Serving on <http://0.0.0.0:3000> in process 1
/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py:538: UserWarning: Error loading repository location analytics-***-pipelines:grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
David Stefan
12/07/2021, 5:46 PMdagster api grpc --python-file repository.py --host 0.0.0.0 --port "${PORT}"
All three instances point to a Cloud SQL instance, spin up without errors and respond OK (e.g. I can access the dagit UI using the defaul <http://run.app|run.app>
url). My issue is that dagit cannot connect to the "repository" instance, with the error below:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1638898936.563075026","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3093,"referenced_errors":[{"created":"@1638898936.563072225","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}" >
File "/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py", line 535, in _load_location
location = self._create_location_from_origin(origin)
File "/usr/local/lib/python3.9/site-packages/dagster/core/workspace/context.py", line 454, in _create_location_from_origin
return origin.create_location()
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/origin.py", line 271, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.9/site-packages/dagster/core/host_representation/repository_location.py", line 495, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.9/site-packages/dagster/api/list_repositories.py", line 14, in sync_list_repositories_grpc
deserialize_json_to_dagster_namedtuple(api_client.list_repositories()),
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 163, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/client.py", line 110, in _query
response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
Both dagit and dagster daemon are configured to connect to the "repository" instace via the instance's default <http://run.app|run.app>
url and port 443. When I hit that url directly, I get upstream connect error or disconnect/reset before headers. reset reason: protocol error
and though that looks like an error, I attributed that to the fact I'm simply querying it from a browser instead of a gRPC client and assumed that it's responding OK. At this point I don't have any ideas how to go from here, what to inspect or where to turn on debug mode to make dagit and the daemon find my repository gRPC service.
I would appreciate help, if anyone has had similar experience or can spot I'm doing something odd here. Thanks!Jorge Lima
12/07/2021, 6:24 PM@daily_schedule
the date argument returns the full datetime with hours and minutes instead of just the date? 🤔Diana Stan
12/07/2021, 7:03 PMDmytro Tsyliuryk
12/07/2021, 7:44 PMDaniel Suissa
12/07/2021, 9:08 PMDiana Stan
12/07/2021, 9:50 PMAttributeError: 'DataContextConfig' object has no attribute 'validation_operators'
error today, are people pinning to older versions? we were running with everything set at 0.13.10 https://dagster.slack.com/archives/C01U954MEER/p1630077864044000?thread_ts=1630075462.041100&cid=C01U954MEERTadas Barzdžius
12/08/2021, 7:49 AMvolumes:
- name: storage
cephfs:
monitors:
- ip1
- ip2
- ip3
- ip4
- ip5
path: /storage
secretRef:
name: super-secret-secret
user: storage
volumeMounts:
- name: storage
mountPath: /mnt/storage
And I get:
AttributeError: module 'kubernetes.client.models' has no attribute 'list[str]'
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1502, in launch_run
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/launcher.py", line 312, in launch_run
user_defined_k8s_config=user_defined_k8s_config,
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/job.py", line 587, in construct_dagster_k8s_job
volume,
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/models.py", line 51, in k8s_model_from_dict
kwargs[attr] = _k8s_value(value, attr_type, mapped_attr)
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/models.py", line 31, in _k8s_value
return k8s_model_from_dict(klass, data)
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/models.py", line 51, in k8s_model_from_dict
kwargs[attr] = _k8s_value(value, attr_type, mapped_attr)
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/models.py", line 15, in _k8s_value
klass = getattr(kubernetes.client.models, classname)
What am I doing incorrectly?Tadas Barzdžius
12/08/2021, 12:04 PMAmit Arie
12/08/2021, 3:19 PMuser-code
convention of dagster.
when i'm submitting the tasks via dagit
, I'm getting the following error:
orchestration-user_code-1 | 2021-12-08 15:01:58 - dagster - DEBUG - science - d4e4b91d-a808-4af9-8f1d-b932184b8e45 - 19 - RUN_START - Started execution of run for "science".
orchestration-user_code-1 | 2021-12-08 15:01:59 - dagster - DEBUG - science - d4e4b91d-a808-4af9-8f1d-b932184b8e45 - 19 - ENGINE_EVENT - Submitting celery task for step "collect" to queue "dagster".
orchestration-user_code-1 | 2021-12-08 15:02:00 - dagster - DEBUG - science - d4e4b91d-a808-4af9-8f1d-b932184b8e45 - 19 - ENGINE_EVENT - Encountered error during celery task submission.
orchestration-user_code-1 |
orchestration-user_code-1 | kombu.exceptions.OperationalError: [Errno 111] Connection refused
orchestration-user_code-1 |
orchestration-user_code-1 | Stack Trace:
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/dagster_celery/core_execution_loop.py", line 140, in core_celery_execution_loop
orchestration-user_code-1 | step_results[step.key] = step_execution_fn(
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/dagster_celery/executor.py", line 125, in _submit_task
orchestration-user_code-1 | return task_signature.apply_async(
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/celery/canvas.py", line 219, in apply_async
orchestration-user_code-1 | return _apply(args, kwargs, **options)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/celery/app/task.py", line 575, in apply_async
orchestration-user_code-1 | return app.send_task(
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/celery/app/base.py", line 787, in send_task
orchestration-user_code-1 | self.backend.on_task_call(P, task_id)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/celery/backends/rpc.py", line 164, in on_task_call
orchestration-user_code-1 | maybe_declare(self.binding(producer.channel), retry=True)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/messaging.py", line 209, in _get_channel
orchestration-user_code-1 | channel = self._channel = channel()
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/utils/functional.py", line 32, in __call__
orchestration-user_code-1 | value = self.__value__ = self.__contract__()
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/messaging.py", line 225, in <lambda>
orchestration-user_code-1 | channel = ChannelPromise(lambda: connection.default_channel)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/connection.py", line 896, in default_channel
orchestration-user_code-1 | self._ensure_connection(**conn_opts)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/connection.py", line 434, in _ensure_connection
orchestration-user_code-1 | return retry_over_time(
orchestration-user_code-1 | File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
orchestration-user_code-1 | self.gen.throw(typ, value, traceback)
orchestration-user_code-1 | File "/usr/local/lib/python3.9/site-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
orchestration-user_code-1 | raise ConnectionError(str(exc)) from exc
orchestration-user_code-1 |
orchestration-user_code-1 | The above exception was the direct cause of the following exception:
orchestration-user_code-1 | ConnectionRefusedError: [Errno 111] Connection refused
Amit Arie
12/08/2021, 3:19 PMDaniel Suissa
12/08/2021, 7:51 PMStefan Adelbert
12/09/2021, 3:10 AMA
, which depends on another resource, B
, I can use required_resource_keys
as described here: https://docs.dagster.io/concepts/resources#resource-to-resource-dependencies.
But let's say that I want to have two instances of resource A
configured differently, and I want those two instances to depend on two different instances of resource B
- I don't know how to achieve that. If resource A
looks like this:
@resource(required_resource_keys={"b"})
def A:
return None
Then two instances of A
, a1
and a2
, depend on an instance of a resource called b
, but really I would like a1
to depend on b1
and a2
to depend on b2
.
Is there a way to achieve this?Stefan Adelbert
12/09/2021, 3:10 AMresource.configured
. However, when I click View configuration
in dagit for a job's run I don't see the configured config values used for the resource for that run. I also don't have the ability to override the config values for a configured resource when launching a new run in dagit.
I suspect it's because resource.configured
creates a new resource from an existing resource, "hardcoding" the config values; therefore the new resource has no config.
Could someone suggest a way to preconfigure a resource (or op) such that:
• the config is evident in dagit for an historic run
• the config can be inspected and overridden for a new runAmit Arie
12/09/2021, 9:38 AMop
or in general, how can we have a graceful shutdown?
2. What is the most recommended way to update the code itself? the celery workers and the user-code containers are sharing the same volume that includes the code itself. is it fine to change the code on the fly?George Pearse
12/09/2021, 9:47 AMdaniel blinick
12/09/2021, 11:20 AM@usable_as_dagster_type
decorator. some classes inherit from others and i want to be able to type an op inputs using the parent class and pass any of the child classes at runtime. however when i try this it gives me typing errors. is there a way around this?
thanks!mrdavidlaing
12/09/2021, 11:42 AMbotocore.exceptions.ClientError: An error occurred (SlowDown) when calling the PutObject operation (reached max retries: 4)
error when using a (possibly overloaded) internal S3 compatible blobstore to store job compute logs - see 🧵 for details.
Since this doesn't actually cause any of the op()
in the pipeline to fail I'm wondering if there is a way to mark this error as "ignorable"?George Pearse
12/09/2021, 3:03 PMMatthias Queitsch
12/09/2021, 3:33 PMNick Dellosa
12/09/2021, 7:29 PMSa'ar Elias
12/09/2021, 8:29 PM@graph
def resolve_input(inp):
sa, sb = match_to_resolver(inp) # out={'ra': Out(is_required=False), 'rb': Out(is_required=False)}
sum_all_results(resolver_a(sa), resolver_b(sb))
Stefan Adelbert
12/09/2021, 11:39 PMop
which makes 100s of asynchronous calls to an API to build up a dataset, which is then returned as an Output
. Is there a way for the op
to signal its progress with those API calls?
I'd imaged something like yielding a ProgressEvent
.Qwame
12/10/2021, 5:18 AMdbt_cli_resource
and make_values_resource
? If I create a resource using dbt_cli_resource
, can I pass that resource to non-dbt
jobs in the pipeline?Marc Keeling
12/10/2021, 10:42 PM