Danny
06/30/2020, 3:27 PMlaunchPipelineExecution
mutation to start a pipeline. Looking to have it start in process on the dagit instance. I can't figure out the new 0.8 selector I need to specify. Here's what I got so far:
launchPipelineExecution(
executionParams: {
selector: {
repositoryLocationName: "<<in_process>>",
repositoryName: "my_repository",
pipelineName: "my_pipeline",
},
mode: "default",
}
)
Getting this error:
RESPONSE >>> {'data': {'launchPipelineExecution': {'__typename': 'PipelineNotFoundError', 'message': 'Could not find Pipeline <<in_process>>.my_repository.my_pipeline', 'pipelineName': 'my_pipeline'}}}
I'm trying to copy how the dagster_graphql test suite does it but not succeeding. On a side note, might be useful to incorporate "execute pipelines over graphql" machinery into the main dagster_graphql API, because it's very useful for use cases like mine where I have pipelines launching other pipelines, and it's a shame all that useful client code is buried in tests.<<in_process>>
part from https://github.com/dagster-io/dagster/blob/fb351600fb814c681f4c6305bded3dc67548dfe8/python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py#L116. Not sure if that's correct, but I also tried the filename and the path+filename of my repo.py file which contains
@repository
def my_repository():
return [my_pipeline]
and none worked.alex
06/30/2020, 3:38 PMworkspace.yaml
you can specify location_name
on your load_from
targets and then you use that name for repositoryLocationName
Danny
06/30/2020, 3:44 PMDanny
06/30/2020, 3:45 PMalex
06/30/2020, 3:52 PMDanny
06/30/2020, 3:54 PMdef wait_for_run_slot(pipeline_name, max_runs=1):
instance = DagsterInstance.get()
def get_active_runs(pipeline_name):
started_runs = len(instance.get_runs(PipelineRunsFilter(pipeline_name=pipeline_name, status=PipelineRunStatus.STARTED)))
not_started_runs = len(instance.get_runs(PipelineRunsFilter(pipeline_name=pipeline_name, status=PipelineRunStatus.NOT_STARTED)))
return started_runs + not_started_runs
while get_active_runs(pipeline_name) >= max_runs:
time.sleep(2)
This worked nicely when called in the parent pipeline:
wait_for_run_slot("child_pipeline", 10)
alex
06/30/2020, 3:54 PMDanny
06/30/2020, 3:55 PMalex
06/30/2020, 3:58 PMexecuteRunInProcess
at all right? and you are just using the default run launcher?Danny
06/30/2020, 3:58 PMlaunchPipelineExecution
mutation requestalex
06/30/2020, 3:59 PMCliApiRunLauncher
if you look at the “Instance Details” in dagit
Danny
06/30/2020, 4:00 PMRun Launcher:
module: dagster.core.launcher.cli_api_run_launcher
class: CliApiRunLauncher
config:
{}
alex
06/30/2020, 4:01 PMDanny
06/30/2020, 4:02 PMalex
06/30/2020, 4:02 PMDanny
06/30/2020, 4:02 PMalex
06/30/2020, 4:03 PMDanny
06/30/2020, 4:13 PM