Arun Kumar
08/02/2022, 12:05 AMJob not found
error towards the end of the job runs intermittently. Any thoughts on why this could happen?
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 3734: dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline/job {pipeline_name} in repository
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/child_process_executor.py", line 70, in _execute_command_in_child_process
for step_event in command.execute():
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/multiprocess.py", line 65, in execute
known_state=self.known_state,
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 745, in create_execution_plan
pipeline_def = pipeline.get_definition()
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 152, in get_definition
defn = self.repository.get_definition().get_pipeline(self.pipeline_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository_definition.py", line 1083, in get_pipeline
return self._repository_data.get_pipeline(name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository_definition.py", line 284, in get_pipeline
f"Could not find pipeline/job {pipeline_name} in repository"
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 785, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/multiprocess.py", line 283, in execute
subprocess_error_infos=list(errs.values()),
class MetricsRepoRepositoryData(RepositoryData):
def __init__(self):
self._jobs: Dict[str, JobDefinition] = {}
self._schedules = []
self._sensors = []
self._jobs_load_ts = None
self._schedules_load_ts = None
self._sensors_load_ts = None
def get_all_pipelines(self):
... fetch job defns from DB (cached)
return list(self._jobs.values())
def get_all_partition_sets(self):
... fetch partition_sets defns from DB (cached)
return job_partition_sets
def get_all_schedules(self):
... fetch scheule defns from DB (cached)
return self._schedules
def get_all_sensors(self):
... fetch sensor defns from DB (cached)
return self._sensors
@repository
def metrics_repo():
return MetricsRepoRepositoryData()
owen
08/02/2022, 4:22 PMArun Kumar
08/02/2022, 7:20 PMowen
08/02/2022, 7:57 PM