Gaetan DELBART
07/22/2020, 9:07 AM"message": null, "serializable_error_info": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "DagsterInvalidConfigError", "message": "dagster.core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for <dagster.config.field_utils.Selector object at 0x7fa311c0c790>.\n Error 1: Post processing at path root:postgres_db:password of original value {'env': 'DAGSTER_PG_PASSWORD'} failed:\n(PostProcessingError) - dagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable \"DAGSTER_PG_PASSWORD\" which is not set. In order for this execution to succeed it must be set in this environment.\n\nStack Trace: \n File \"/usr/local/lib/python3.7/site-packages/dagster/config/post_process.py\", line 72, in _post_process\n new_value = context.config_type.post_process(config_value)\n File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 42, in post_process\n return str(_ensure_env_variable(cfg))\n File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 23, in _ensure_env_variable\n ).format(var=var)\n\n", "stack": [" File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/ipc.py\", line 116, in ipc_write_stream\n yield FileBasedWriteStream(file_path)\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/cli/api.py\", line 434, in launch_scheduled_execution\n instance = DagsterInstance.get()\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 255, in get\n return DagsterInstance.from_config(_dagster_home())\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 277, in from_config\n return DagsterInstance.from_ref(instance_ref)\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 285, in from_ref\n run_storage=instance_ref.run_storage,\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/ref.py\", line 185, in run_storage\n return self.run_storage_data.rehydrate()\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/__init__.py\", line 378, in rehydrate\n config_dict,\n"]}}
When I launch the sh
script manually, it works great (my user has access to the env variables injected by k8s)
So, I supposed it has to do with the "shell" used by cron
Has somenone got any lead on this, or manage to run scheduled pipeline successfully on k8s pod ?sashank
07/22/2020, 12:52 PMenvironment_vars
argument that let’s you copy over the required env vars to the shell script that is invoked by the scheduler.Gaetan DELBART
07/22/2020, 12:55 PMsashank
07/22/2020, 12:58 PMGaetan DELBART
07/22/2020, 2:26 PMk8sRunLauncher
=> The kubernete job is launched, but, it can't find others "env var" 🤔
What I mean is :
In our k8sRunLauncher config, we have the following
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
dagster_home: /opt/dagster/dagster_home
env_config_maps:
- dagster-pipeline-env
env_secrets:
- dasgter-aws
The dagster-aws
secret contains our AWS_ACCESS_KEY_ID
& AWS_SECRET_ACCESS_KEY
.
When I launched the pipeline with dagit, it can access those env vars without any issue
But, when the schedule launch it, I get the following error botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
which means that it cannot access our AWS credentials.
I think if I add those two variables to environment_vars
it will works, but, I can't understand why my K8s Job is not "given" those envs 🤔sashank
07/22/2020, 2:29 PMTitouan
07/22/2020, 2:38 PMget_checksums
is not executed in the k8s job (perhaps executed in the cron subprocess). Seems that's why it cannot resolve env vars such as aws creds.get_checksums
is not executed in the jobs.launch_schedule_execution
, feel free to ask us anything to help debug this. We are investigating this if we could find more info.$ kubectl describe jobs/dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926 -n master
Name: dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926
Namespace: master
Selector: controller-uid=bb4d73d0-61d1-46ed-b311-585c0604f9de
Labels: <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
<http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
<http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
<http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
<http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
Annotations: <none>
Parallelism: 1
Completions: 1
Start Time: Wed, 22 Jul 2020 16:10:06 +0200
Completed At: Wed, 22 Jul 2020 16:10:24 +0200
Duration: 18s
Pods Statuses: 0 Running / 1 Succeeded / 0 Failed
Pod Template:
Labels: <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
<http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
<http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
<http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
<http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
controller-uid=bb4d73d0-61d1-46ed-b311-585c0604f9de
job-name=dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926
Service Account: dagster
Containers:
dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926:
Image: <http://107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest|107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest>
Port: <none>
Host Port: <none>
Command:
dagster-graphql
Args:
-p
executeRunInProcess
-v
{"repositoryLocationName": "<<in_process>>", "repositoryName": "companies_repository", "runId": "19be3d4c-97b2-4edd-914b-62b3ff35b926"}
--remap-sigterm
Environment Variables from:
dagster-pipeline-env ConfigMap Optional: false
dasgter-aws Secret Optional: false
Environment:
DAGSTER_HOME: /opt/dagster/dagster_home
DAGSTER_PG_PASSWORD: <set to the key 'postgresql-password' in secret 'dagster-postgresql-secret'> Optional: false
Mounts:
/opt/dagster/dagster_home/dagster.yaml from dagster-instance (rw,path="dagster.yaml")
Volumes:
dagster-instance:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: dagster-instance
Optional: false
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 43m job-controller Created pod: dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926-l967d
$ kubectl describe jobs/dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60 -n master
Name: dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60
Namespace: master
Selector: controller-uid=07c6fa1f-efd5-4b3a-8d92-c14e1d14ee36
Labels: <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
<http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
<http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
<http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
<http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
Annotations: <none>
Parallelism: 1
Completions: 1
Start Time: Wed, 22 Jul 2020 16:13:28 +0200
Completed At: Wed, 22 Jul 2020 16:13:59 +0200
Duration: 31s
Pods Statuses: 0 Running / 1 Succeeded / 0 Failed
Pod Template:
Labels: <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
<http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
<http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
<http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
<http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
controller-uid=07c6fa1f-efd5-4b3a-8d92-c14e1d14ee36
job-name=dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60
Service Account: dagster
Containers:
dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60:
Image: <http://107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest|107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest>
Port: <none>
Host Port: <none>
Command:
dagster-graphql
Args:
-p
executeRunInProcess
-v
{"repositoryLocationName": "companies_repository", "repositoryName": "companies_repository", "runId": "bd3644ff-ab43-47a7-8098-3456acae9b60"}
--remap-sigterm
Environment Variables from:
dagster-pipeline-env ConfigMap Optional: false
dasgter-aws Secret Optional: false
Environment:
DAGSTER_HOME: /opt/dagster/dagster_home
DAGSTER_PG_PASSWORD: <set to the key 'postgresql-password' in secret 'dagster-postgresql-secret'> Optional: false
Mounts:
/opt/dagster/dagster_home/dagster.yaml from dagster-instance (rw,path="dagster.yaml")
Volumes:
dagster-instance:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: dagster-instance
Optional: false
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 43m job-controller Created pod: dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60-9lwc8
-v
is different:
jobs via scheduler -> "repositoryLocationName": "<<in_process>>"
but jobs via dagit -> "repositoryLocationName": "companies_repository"
sashank
07/22/2020, 3:02 PMGaetan DELBART
07/22/2020, 3:03 PMalex
07/22/2020, 3:17 PMrun_config
between 19be3d4c-97b2-4edd-914b-62b3ff35b926
and bd3644ff-ab43-47a7-8098-3456acae9b60
?repostioryLocationName
but that is a red herring for this specific issueGaetan DELBART
07/22/2020, 3:24 PMexecution:
multiprocess:
config:
max_concurrent: 2
So I think it's the multiprocess_executor
Titouan
07/22/2020, 3:26 PMexecution:
multiprocess:
config:
max_concurrent: 2
intermediate_storage:
s3:
config:
s3_bucket: ***HIDDEN***-storage
solids:
download_csvs:
inputs:
coordinates:
basepath: /tmp/***HIDDEN***/master/company/search/
bucket: ***HIDDEN***-storage
checksums: checksums
s3path: company/search/
solids:
fetch_resources_api:
config:
api_key: ****HIDDEN****
extract_csvs:
inputs:
coordinates:
basepath: /tmp/***HIDDEN***/master/company/search/
bucket: ***HIDDEN***-storage
checksums: checksums
s3path: company/search/
push_to_elasticsearch:
config:
elasticsearch_host: elasticsearch-elasticsearch-search
elasticsearch_port: '9200'
elasticsearch_scheme: http
inputs:
coordinates:
basepath: /tmp/***HIDDEN***/master/company/search/
bucket: ***HIDDEN***-storage
checksums: checksums
s3path: company/search/
alex
07/22/2020, 3:27 PMkubectl logs
can be a bit off due to some manipulation we do to stdout. What do you see if you look at the event logs in dagit
for the two runs?Gaetan DELBART
07/22/2020, 3:32 PM19be3d4c-97b2-4edd-914b-62b3ff35b926
(the one launched in schedule)bd3644ff-ab43-47a7-8098-3456acae9b60
(the one launched with dagit)alex
07/22/2020, 3:35 PMrun_config
is different - the launched in schedule one is using the in process executor. How did you verify they were the same above?Titouan
07/22/2020, 3:38 PMalex
07/22/2020, 3:42 PMTitouan
07/22/2020, 3:46 PM@schedule(
pipeline_name='company_init_pipeline',
cron_schedule='*/10 * * * *',
name='company_init_pipeline_schedule',
mode='kubernetes',
environment_vars={
key: os.environ.get(key)
for key in [
'DAGSTER_PG_PASSWORD',
'DAGSTER_K8S_CELERY_BROKER',
'DAGSTER_K8S_CELERY_BACKEND',
'DAGSTER_K8S_PIPELINE_RUN_IMAGE',
'DAGSTER_K8S_PIPELINE_RUN_NAMESPACE',
'DAGSTER_K8S_INSTANCE_CONFIG_MAP',
'DAGSTER_K8S_PG_PASSWORD_SECRET',
'DAGSTER_K8S_PIPELINE_RUN_ENV_CONFIGMAP',
'DAGSTER_K8S_PIPELINE_RUN_IMAGE_PULL_POLICY',
'KUBERNETES_SERVICE_HOST',
'KUBERNETES_SERVICE_PORT',
]
if key in os.environ
},
)
def daily_company_init_pipeline(instance):
env = get_env()
if env != 'dev':
files = [
script_relative_path('environments/init/common/*'),
script_relative_path(f'environments/init/{env}/*')
]
else:
files = [
script_relative_path('environments/init/dev/*'),
]
config = PresetDefinition.from_files(
env,
mode='kubernetes',
config_files=files,
),
return config[0].run_config
In the scheduler run_config we see the right config but not in jobs.alex
07/22/2020, 3:49 PMscript_relative_path
is getting tripped up when cron runs it from its own weird working directoryTitouan
07/22/2020, 3:51 PMimport os
import json
def get_env():
filename = os.path.join('/', 'tmp', 'cnty.json')
if os.path.exists(filename):
# read the file & return the env
with open(filename, 'rb') as f:
cnty_envs = json.loads(f.read())
return cnty_envs['env']
# Check in env variables
if os.environ.get('CNTY_ENV') is not None:
return os.environ.get('CNTY_ENV')
# backup to default env
return 'dev'
Gaetan DELBART
07/22/2020, 3:52 PMTitouan
07/22/2020, 3:52 PM/tmp/cnty.json
alex
07/22/2020, 3:52 PMCITY_ENV
to the environment_vars portion of the schedule set up so that its set when run from cron?Titouan
07/22/2020, 3:52 PMalex
07/22/2020, 3:53 PMTitouan
07/22/2020, 4:51 PMGaetan DELBART
07/22/2020, 4:51 PMalex
07/22/2020, 5:05 PM