Hi all, I have a question about reload repository....
# ask-community
p
Hi all, I have a question about reload repository. I’m using Dagster for my Noti-center app, and facing up with some problems with repository reloading. My current approach is scheduling runs from the external configuration. I used
@repository
with the lazy loading method to schedule runs. For example, I have these op and job:
Copy code
import os
from dagster import *

@op(
    ins={
        "x": In(dagster_type=int),
        "y": In(dagster_type=int)
    },
    out={
        "sum": Out(dagster_type=int)
    },
)
def calculate_sum_x_y(x, y):
    return x + y


@job
def sum_x_y_job():
    calculate_sum_x_y()
With it, I’ll have a bunch of schedules having run_config and cron information stored in an external file named
test_dagster_new.txt
. Then, I have the repository definition:
Copy code
@repository
def lazy_loaded_repository():
    names = []
    with open('test_dagster_new.txt', 'r') as f:
        names = f.readlines()
        xycron = [(int(line.rstrip('\n').split(',')[0]), int(line.rstrip('\n').split(',')[1]), line.rstrip('\n').split(',')[2]) for line in names]
    schedule_collection = {}
    for idx, x_y_cron, in enumerate(xycron):
        x, y, cron = x_y_cron
        s_name = f'sum_x_y_job_schedule_{idx}'
        run_config = {
                "ops": {
                    "calculate_sum_x_y": {
                        "inputs": {
                            "x": x,
                            "y": y
                }}}
            }
        s = ScheduleDefinition(job=sum_x_y_job, name=s_name, run_config=run_config, cron_schedule=cron)
        schedule_collection.update(
            {
                s_name: s
            }
        )
    return {
        'jobs': {'sum_x_y_job': sum_x_y_job},
        'schedules': schedule_collection
    }
I started Dagster with its instances, and it worked. Next step, I expected that once the external file has changed, my schedules will be updated also if I call reloading the repository. I tried to use API client and Dagit UI but it didn’t help. Anyone has experiences for that? Thank you so much 🙏
s
I would definitely expect the schedules to update after you click the button in Dagit to reload the repository. Are you able to put a print statement inside the module to make sure that it's getting imported?
d
Just one followup question to sandy's question - are you possibly running your code in a separate grpc server process or docker container?
p
@sandy I put stdout into repository loading, and it just actually prints once after container started; when clicking reload on UI, nothing happens on Docker log (UI show the pop-up with a Successful message). @daniel I run it on both docker container & a separate grpc server with
workspace.yml
configuration:
Copy code
load_from:
  - grpc_server:
      host: pp_analytics_comply_alerts_workspace
      port: 4000
      location_name: "comply_alerts"
And workspace is being run on a separate container.
One more piece of information, when I use
python_file
for load_from workspace, it worked as expected. However, I need to run on multiple workspaces my current use-cases. So really need your help
Hmm, I found this warning log for Daemon instance, should it be a reason?
Copy code
2023-02-28 04:20:45 +0000 - dagster.daemon.SchedulerDaemon - WARNING - Could not load location comply_alerts to check for schedules due to the following error: dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server

Stack Trace:
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/workspace/context.py", line 545, in _load_location
    location = self._create_location_from_origin(origin)
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/workspace/context.py", line 470, in _create_location_from_origin
    return origin.create_location()
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/origin.py", line 331, in create_location
    return GrpcServerRepositoryLocation(self)
  File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 573, in __init__
    list_repositories_response = sync_list_repositories_grpc(self.client)
  File "/usr/local/lib/python3.7/site-packages/dagster/_api/list_repositories.py", line 19, in sync_list_repositories_grpc
    api_client.list_repositories(),
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 211, in list_repositories
    res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 141, in _query
    raise DagsterUserCodeUnreachableError("Could not reach user code server") from e

The above exception was caused by the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1677558042.502459042","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1677558042.502453667","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>

Stack Trace:
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 139, in _query
    return self._get_response(method, request=request_type(**kwargs), timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 129, in _get_response
    return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
It may related to this reported issue (?) https://github.com/dagster-io/dagster/issues/4289
d
I filed a feature request issue that summarizes the problem that you're running into here: https://github.com/dagster-io/dagster/issues/12581 Right now the definitions inside the @repository are cached, and when you reload them in Dagit the function doesn't actually run again. We'd like to fix that in general in the future, but in the short-term you can create a
RepositoryData
subclass and return that from your @repository function instead, as described here (like the "ComplexRepositoryData" example here): https://docs.dagster.io/_apidocs/repositories#dagster.repository
👍 1
p
Oh it worked for me now. 😄 Thank you for your support @daniel @sandy 🙏
condagster 1