Hi everyone, we currently operate the open source ...
# ask-community
m
Hi everyone, we currently operate the open source deployment using helm. We have an issue with the
code location server
(gRPC server): Our jobs and assets are built dynamically based on some configuration (provided by an API). Now what we want, is to rebuilt the job/asset graph on a reload. However we did not get it to work with
dagster api grpc --python-file repo.py
From what we reverse engineered, is that the
repo.py
is only evaluated on startup, but not on a reload. repo.py:
Copy code
from dagster import (repository)

print("_______ Outside __________")

@repository(name="repo")
def dagster():
    print("_______ Inside the repository function__________")
    return build_dynamic_repository()
When I run this locally using
dagster api grpc --python-file repo.py
I get
Copy code
_______ Outside __________
_______ Inside the repository function__________
dagster.code_server - INFO - Started Dagster code server
But when I hit
reload
in dagit (or graphQL) it never prints neither
Outside
nor
Inside
Any pointers on how I could set this up differently? Moving to definitions would not help if the whole python-file is never re-evaluated on reload. The only option right now is to restart the grpc server which is not feasable for us
🤖 1
d
Hey Manuel here - it's a bit buried, but there's a
RepositoryData
option that you can use here that lets you return an object with a different caching setup for this case: https://docs.dagster.io/_apidocs/repositories#dagster.repository
Can you share more about why restarting the grpc server isn't feasible?
m
We currently built our system that a user has an UI with very restricted settings that we use to generate the repository from. Whenever a configuration changes we want to reload the repository as assets / schedules changed. As we are running on k8s, restarting the grpc server would mean we would have to kill the user-code-deployment pod on every small onChangeNotification originating from our users
Thanks a lot, I was actually browsing the page a lot but still missed the RepositoryData. I will try to get it running with your suggestion, thank you
Will something like this be supported in later versions (as we are moving away from
@repository
? Also this method expects the legacy pipeline definitions, do you have any pointers in the documentation on how to get from “assets, jobs, schedules” to PipelineDefinitions?
Note for others reading this:
@repository
instantiates a
CachingRepositoryData
which can be found in repository_definition.py
d
Yes, no plans to remove that functionality (or repository) for the forseeable future
You can return jobs from get_all_pipelines (it's a bit confusing) Looking at the public definitions on RepositoryData may be the easiest way: https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/definitions/repository_definition.py#L267-L491
a
@daniel have there been any updates to this? We are on the new versions using code locations and facing the issue of having to restart the gRPC server which is too much..
We have assets, jobs and sensors..
m
We are usingthe following @Abhishek Agrawal
Copy code
class NonCachingRepositoryData(RepositoryData):
    __underlying_repository_data: RepositoryData

    def _reload_underlying_repository_data(self):
        self.__underlying_repository_data = CachingRepositoryData.from_list(assets, jobs, sensors)

    def get_all_jobs(self) -> Sequence[JobDefinition]:
        return self.__underlying_repository_data.get_all_jobs()

    def get_all_pipelines(self) -> Sequence[PipelineDefinition]:
        # We have to reload here
        # this is a technical detail, a reload triggers this method first, can be changed in future releases!
        self._reload_underlying_repository_data()
        return self.__underlying_repository_data.get_all_pipelines()

    def get_all_schedules(self) -> Sequence[ScheduleDefinition]:
        return self.__underlying_repository_data.get_all_schedules()

    def get_all_sensors(self) -> Sequence[SensorDefinition]:
        return self.__underlying_repository_data.get_all_sensors()

    def get_all_partition_sets(self) -> Sequence[PartitionSetDefinition]:
        return self.__underlying_repository_data.get_all_partition_sets()