https://dagster.io/ logo
#ask-community
Title
# ask-community
a

Arun Kumar

08/02/2022, 12:05 AM
Hi team, I am seeing the following
Job not found
error towards the end of the job runs intermittently. Any thoughts on why this could happen?
Copy code
dagster.core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 3734: dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline/job {pipeline_name} in repository

Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/child_process_executor.py", line 70, in _execute_command_in_child_process
for step_event in command.execute():
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/multiprocess.py", line 65, in execute
known_state=self.known_state,
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 745, in create_execution_plan
pipeline_def = pipeline.get_definition()
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 152, in get_definition
defn = self.repository.get_definition().get_pipeline(self.pipeline_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository_definition.py", line 1083, in get_pipeline
return self._repository_data.get_pipeline(name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/repository_definition.py", line 284, in get_pipeline
f"Could not find pipeline/job {pipeline_name} in repository"

  File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 785, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/multiprocess.py", line 283, in execute
    subprocess_error_infos=list(errs.values()),
🤖 1
Also, we have a very specific setup for our repository to allow dynamic reloading of all the Dagster definitions. We extend the RepositoryData class to sync our definitions from DB periodically as below. Not sure if this could be due to this setup
Copy code
class MetricsRepoRepositoryData(RepositoryData):
    def __init__(self):
        self._jobs: Dict[str, JobDefinition] = {}
        self._schedules = []
        self._sensors = []
        self._jobs_load_ts = None
        self._schedules_load_ts = None
        self._sensors_load_ts = None

    def get_all_pipelines(self):
        ... fetch job defns from DB (cached)
        return list(self._jobs.values())

    def get_all_partition_sets(self):
        ... fetch partition_sets defns from DB (cached)
        return job_partition_sets

    def get_all_schedules(self):
        ... fetch scheule defns from DB (cached)
        return self._schedules

    def get_all_sensors(self):
        ... fetch sensor defns from DB (cached)
        return self._sensors


@repository
def metrics_repo():
    return MetricsRepoRepositoryData()
o

owen

08/02/2022, 4:22 PM
hi @Arun Kumar, I think your repository set up is the source of the issues here. Each step (by default) runs in its own process, and will need to get the definition of the pipeline that's being executed (and I believe may need to reload all the definitions). Is name of the missing job in the error the same as the name of the job that you were running, or was it an unrelated job? And is it possible that the job's definition got removed from the db while it was running?
a

Arun Kumar

08/02/2022, 7:20 PM
Thanks @owen for the response. Yes, the job in the error message is the same as the running job. It is not possible that job is not in the DB, but it might be possible that the API used to fetch the definitions from DB may be unavailable. I will check on that
Also, if I run the ops using the in_process executor instead of multi process executor, would Dagster not load the definitions before each step?
o

owen

08/02/2022, 7:57 PM
My guess is that it would not need to load the definitions before each step, but it's definitely possible that it would. In general, dagster doesn't make any guarantees on what will happen if a pipeline is deleted while it is running.
4 Views