Phong Le Ngo Duy
02/27/2023, 4:46 AM@repository
with the lazy loading method to schedule runs. For example, I have these op and job:
import os
from dagster import *
@op(
ins={
"x": In(dagster_type=int),
"y": In(dagster_type=int)
},
out={
"sum": Out(dagster_type=int)
},
)
def calculate_sum_x_y(x, y):
return x + y
@job
def sum_x_y_job():
calculate_sum_x_y()
With it, I’ll have a bunch of schedules having run_config and cron information stored in an external file named test_dagster_new.txt
. Then, I have the repository definition:
@repository
def lazy_loaded_repository():
names = []
with open('test_dagster_new.txt', 'r') as f:
names = f.readlines()
xycron = [(int(line.rstrip('\n').split(',')[0]), int(line.rstrip('\n').split(',')[1]), line.rstrip('\n').split(',')[2]) for line in names]
schedule_collection = {}
for idx, x_y_cron, in enumerate(xycron):
x, y, cron = x_y_cron
s_name = f'sum_x_y_job_schedule_{idx}'
run_config = {
"ops": {
"calculate_sum_x_y": {
"inputs": {
"x": x,
"y": y
}}}
}
s = ScheduleDefinition(job=sum_x_y_job, name=s_name, run_config=run_config, cron_schedule=cron)
schedule_collection.update(
{
s_name: s
}
)
return {
'jobs': {'sum_x_y_job': sum_x_y_job},
'schedules': schedule_collection
}
I started Dagster with its instances, and it worked. Next step, I expected that once the external file has changed, my schedules will be updated also if I call reloading the repository. I tried to use API client and Dagit UI but it didn’t help. Anyone has experiences for that?
Thank you so much 🙏sandy
02/27/2023, 5:44 PMdaniel
02/27/2023, 6:01 PMPhong Le Ngo Duy
02/28/2023, 4:16 AMworkspace.yml
configuration:
load_from:
- grpc_server:
host: pp_analytics_comply_alerts_workspace
port: 4000
location_name: "comply_alerts"
And workspace is being run on a separate container.python_file
for load_from workspace, it worked as expected. However, I need to run on multiple workspaces my current use-cases. So really need your help2023-02-28 04:20:45 +0000 - dagster.daemon.SchedulerDaemon - WARNING - Could not load location comply_alerts to check for schedules due to the following error: dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/_core/workspace/context.py", line 545, in _load_location
location = self._create_location_from_origin(origin)
File "/usr/local/lib/python3.7/site-packages/dagster/_core/workspace/context.py", line 470, in _create_location_from_origin
return origin.create_location()
File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/origin.py", line 331, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 573, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.7/site-packages/dagster/_api/list_repositories.py", line 19, in sync_list_repositories_grpc
api_client.list_repositories(),
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 211, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 141, in _query
raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1677558042.502459042","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1677558042.502453667","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 139, in _query
return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 129, in _get_response
return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
daniel
02/28/2023, 3:37 PMRepositoryData
subclass and return that from your @repository function instead, as described here (like the "ComplexRepositoryData" example here): https://docs.dagster.io/_apidocs/repositories#dagster.repositoryPhong Le Ngo Duy
03/01/2023, 3:24 AM