Manuel Kollegger
02/07/2023, 2:08 PMcode location server
(gRPC server):
Our jobs and assets are built dynamically based on some configuration (provided by an API).
Now what we want, is to rebuilt the job/asset graph on a reload.
However we did not get it to work with dagster api grpc --python-file repo.py
From what we reverse engineered, is that the repo.py
is only evaluated on startup, but not on a reload.
repo.py:
from dagster import (repository)
print("_______ Outside __________")
@repository(name="repo")
def dagster():
print("_______ Inside the repository function__________")
return build_dynamic_repository()
When I run this locally using dagster api grpc --python-file repo.py
I get
_______ Outside __________
_______ Inside the repository function__________
dagster.code_server - INFO - Started Dagster code server
But when I hit reload
in dagit (or graphQL) it never prints neither Outside
nor Inside
Any pointers on how I could set this up differently? Moving to definitions would not help if the whole python-file is never re-evaluated on reload.
The only option right now is to restart the grpc server which is not feasable for usdaniel
02/07/2023, 3:36 PMRepositoryData
option that you can use here that lets you return an object with a different caching setup for this case: https://docs.dagster.io/_apidocs/repositories#dagster.repositoryManuel Kollegger
02/08/2023, 5:26 AM@repository
?
Also this method expects the legacy pipeline definitions, do you have any pointers in the documentation on how to get from “assets, jobs, schedules” to PipelineDefinitions?@repository
instantiates a CachingRepositoryData
which can be found in repository_definition.pydaniel
02/08/2023, 12:34 PMAbhishek Agrawal
05/04/2023, 6:30 AMManuel Kollegger
05/04/2023, 10:27 AMclass NonCachingRepositoryData(RepositoryData):
__underlying_repository_data: RepositoryData
def _reload_underlying_repository_data(self):
self.__underlying_repository_data = CachingRepositoryData.from_list(assets, jobs, sensors)
def get_all_jobs(self) -> Sequence[JobDefinition]:
return self.__underlying_repository_data.get_all_jobs()
def get_all_pipelines(self) -> Sequence[PipelineDefinition]:
# We have to reload here
# this is a technical detail, a reload triggers this method first, can be changed in future releases!
self._reload_underlying_repository_data()
return self.__underlying_repository_data.get_all_pipelines()
def get_all_schedules(self) -> Sequence[ScheduleDefinition]:
return self.__underlying_repository_data.get_all_schedules()
def get_all_sensors(self) -> Sequence[SensorDefinition]:
return self.__underlying_repository_data.get_all_sensors()
def get_all_partition_sets(self) -> Sequence[PartitionSetDefinition]:
return self.__underlying_repository_data.get_all_partition_sets()