https://dagster.io/ logo
#announcements
Title
# announcements
g

Gaetan DELBART

07/22/2020, 9:07 AM
Hello ! I'm trying to run schedule pipeline on my k8s cluster. The "setup" (create the entry in in crontab) is working great, but, when the pipeline is launched with the generated sh script, I get the following error in the logs :
Copy code
"message": null, "serializable_error_info": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "DagsterInvalidConfigError", "message": "dagster.core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for <dagster.config.field_utils.Selector object at 0x7fa311c0c790>.\n    Error 1: Post processing at path root:postgres_db:password of original value {'env': 'DAGSTER_PG_PASSWORD'} failed:\n(PostProcessingError) - dagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable \"DAGSTER_PG_PASSWORD\" which is not set. In order for this execution to succeed it must be set in this environment.\n\nStack Trace: \n  File \"/usr/local/lib/python3.7/site-packages/dagster/config/post_process.py\", line 72, in _post_process\n    new_value = context.config_type.post_process(config_value)\n  File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 42, in post_process\n    return str(_ensure_env_variable(cfg))\n  File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 23, in _ensure_env_variable\n    ).format(var=var)\n\n", "stack": ["  File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/ipc.py\", line 116, in ipc_write_stream\n    yield FileBasedWriteStream(file_path)\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/cli/api.py\", line 434, in launch_scheduled_execution\n    instance = DagsterInstance.get()\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 255, in get\n    return DagsterInstance.from_config(_dagster_home())\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 277, in from_config\n    return DagsterInstance.from_ref(instance_ref)\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 285, in from_ref\n    run_storage=instance_ref.run_storage,\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/ref.py\", line 185, in run_storage\n    return self.run_storage_data.rehydrate()\n", "  File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/__init__.py\", line 378, in rehydrate\n    config_dict,\n"]}}
When I launch the
sh
script manually, it works great (my user has access to the env variables injected by k8s) So, I supposed it has to do with the "shell" used by cron Has somenone got any lead on this, or manage to run scheduled pipeline successfully on k8s pod ?
✔️ 1
s

sashank

07/22/2020, 12:52 PM
Hey @Gaetan DELBART, you are right that this is because cron runs in a separate, mostly empty environment. However, each schedule decorator takes an
environment_vars
argument that let’s you copy over the required env vars to the shell script that is invoked by the scheduler.
g

Gaetan DELBART

07/22/2020, 12:55 PM
🙏 Thx for the example. I'll try with the annotation !
s

sashank

07/22/2020, 12:58 PM
Sounds great, let us know if you run into any more issues
g

Gaetan DELBART

07/22/2020, 2:26 PM
Re, I've added the envs vars from the example and the "Run" is started 🎉 But, we use
k8sRunLauncher
=> The kubernete job is launched, but, it can't find others "env var" 🤔 What I mean is : In our k8sRunLauncher config, we have the following
Copy code
run_launcher:
     module: dagster_k8s.launcher
     class: K8sRunLauncher
     config:
       dagster_home: /opt/dagster/dagster_home
env_config_maps:
- dagster-pipeline-env
env_secrets:
- dasgter-aws
The
dagster-aws
secret contains our
AWS_ACCESS_KEY_ID
&
AWS_SECRET_ACCESS_KEY
. When I launched the pipeline with dagit, it can access those env vars without any issue But, when the schedule launch it, I get the following error
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
which means that it cannot access our AWS credentials. I think if I add those two variables to
environment_vars
it will works, but, I can't understand why my K8s Job is not "given" those envs 🤔
s

sashank

07/22/2020, 2:29 PM
cc @cat @johann
t

Titouan

07/22/2020, 2:38 PM
Hey, I'm working with Gaetan and it seems we have one solid compute which is not run on the k8s job with the others. The first solid is
get_checksums
is not executed in the k8s job (perhaps executed in the cron subprocess). Seems that's why it cannot resolve env vars such as aws creds.
if we look at the job logs:
We can see the
get_checksums
is not executed in the jobs.
If we launch the job through dagit with k8sRunLauncher all solids are correctly computed on the k8s job
Seems a really weird behavior specific to the
launch_schedule_execution
, feel free to ask us anything to help debug this. We are investigating this if we could find more info.
The k8s jobs description if launch via the scheduler:
Copy code
$ kubectl describe jobs/dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926 -n master
Name:           dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926
Namespace:      master
Selector:       controller-uid=bb4d73d0-61d1-46ed-b311-585c0604f9de
Labels:         <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
                <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                <http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
Annotations:    <none>
Parallelism:    1
Completions:    1
Start Time:     Wed, 22 Jul 2020 16:10:06 +0200
Completed At:   Wed, 22 Jul 2020 16:10:24 +0200
Duration:       18s
Pods Statuses:  0 Running / 1 Succeeded / 0 Failed
Pod Template:
  Labels:           <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
                    <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                    <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                    <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                    <http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
                    controller-uid=bb4d73d0-61d1-46ed-b311-585c0604f9de
                    job-name=dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926
  Service Account:  dagster
  Containers:
   dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926:
    Image:      <http://107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest|107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest>
    Port:       <none>
    Host Port:  <none>
    Command:
      dagster-graphql
    Args:
      -p
      executeRunInProcess
      -v
      {"repositoryLocationName": "<<in_process>>", "repositoryName": "companies_repository", "runId": "19be3d4c-97b2-4edd-914b-62b3ff35b926"}
      --remap-sigterm
    Environment Variables from:
      dagster-pipeline-env  ConfigMap  Optional: false
      dasgter-aws           Secret     Optional: false
    Environment:
      DAGSTER_HOME:         /opt/dagster/dagster_home
      DAGSTER_PG_PASSWORD:  <set to the key 'postgresql-password' in secret 'dagster-postgresql-secret'>  Optional: false
    Mounts:
      /opt/dagster/dagster_home/dagster.yaml from dagster-instance (rw,path="dagster.yaml")
  Volumes:
   dagster-instance:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      dagster-instance
    Optional:  false
Events:
  Type    Reason            Age   From            Message
  ----    ------            ----  ----            -------
  Normal  SuccessfulCreate  43m   job-controller  Created pod: dagster-run-19be3d4c-97b2-4edd-914b-62b3ff35b926-l967d
And if launch via dagit:
Copy code
$ kubectl describe jobs/dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60 -n master
Name:           dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60
Namespace:      master
Selector:       controller-uid=07c6fa1f-efd5-4b3a-8d92-c14e1d14ee36
Labels:         <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
                <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                <http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
Annotations:    <none>
Parallelism:    1
Completions:    1
Start Time:     Wed, 22 Jul 2020 16:13:28 +0200
Completed At:   Wed, 22 Jul 2020 16:13:59 +0200
Duration:       31s
Pods Statuses:  0 Running / 1 Succeeded / 0 Failed
Pod Template:
  Labels:           <http://app.kubernetes.io/component=runmaster|app.kubernetes.io/component=runmaster>
                    <http://app.kubernetes.io/instance=dagster|app.kubernetes.io/instance=dagster>
                    <http://app.kubernetes.io/name=dagster|app.kubernetes.io/name=dagster>
                    <http://app.kubernetes.io/part-of=dagster|app.kubernetes.io/part-of=dagster>
                    <http://app.kubernetes.io/version=0.8.8|app.kubernetes.io/version=0.8.8>
                    controller-uid=07c6fa1f-efd5-4b3a-8d92-c14e1d14ee36
                    job-name=dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60
  Service Account:  dagster
  Containers:
   dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60:
    Image:      <http://107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest|107841323531.dkr.ecr.eu-west-3.amazonaws.com/cnty-data-pipelines:master-latest>
    Port:       <none>
    Host Port:  <none>
    Command:
      dagster-graphql
    Args:
      -p
      executeRunInProcess
      -v
      {"repositoryLocationName": "companies_repository", "repositoryName": "companies_repository", "runId": "bd3644ff-ab43-47a7-8098-3456acae9b60"}
      --remap-sigterm
    Environment Variables from:
      dagster-pipeline-env  ConfigMap  Optional: false
      dasgter-aws           Secret     Optional: false
    Environment:
      DAGSTER_HOME:         /opt/dagster/dagster_home
      DAGSTER_PG_PASSWORD:  <set to the key 'postgresql-password' in secret 'dagster-postgresql-secret'>  Optional: false
    Mounts:
      /opt/dagster/dagster_home/dagster.yaml from dagster-instance (rw,path="dagster.yaml")
  Volumes:
   dagster-instance:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      dagster-instance
    Optional:  false
Events:
  Type    Reason            Age   From            Message
  ----    ------            ----  ----            -------
  Normal  SuccessfulCreate  43m   job-controller  Created pod: dagster-run-bd3644ff-ab43-47a7-8098-3456acae9b60-9lwc8
We can see the
-v
is different: jobs via scheduler ->
"repositoryLocationName": "<<in_process>>"
but jobs via dagit ->
"repositoryLocationName": "companies_repository"
👍 1
could be a lead to something. But our first solids are not computed on the created k8s jobs on the scheduler launch.
s

sashank

07/22/2020, 3:02 PM
What version of dagster are you on?
g

Gaetan DELBART

07/22/2020, 3:03 PM
0.8.8
a

alex

07/22/2020, 3:17 PM
what executor are you using?
are there any differences in the
run_config
between
19be3d4c-97b2-4edd-914b-62b3ff35b926
and
bd3644ff-ab43-47a7-8098-3456acae9b60
?
good catch on the
repostioryLocationName
but that is a red herring for this specific issue
g

Gaetan DELBART

07/22/2020, 3:24 PM
In our pipeline configuration
Copy code
execution:
  multiprocess:
    config:
      max_concurrent: 2
So I think it's the
multiprocess_executor
👍 1
t

Titouan

07/22/2020, 3:26 PM
Copy code
execution:
  multiprocess:
    config:
      max_concurrent: 2
intermediate_storage:
  s3:
    config:
      s3_bucket: ***HIDDEN***-storage
solids:
  download_csvs:
    inputs:
      coordinates:
        basepath: /tmp/***HIDDEN***/master/company/search/
        bucket: ***HIDDEN***-storage
        checksums: checksums
        s3path: company/search/
    solids:
      fetch_resources_api:
        config:
          api_key: ****HIDDEN****
  extract_csvs:
    inputs:
      coordinates:
        basepath: /tmp/***HIDDEN***/master/company/search/
        bucket: ***HIDDEN***-storage
        checksums: checksums
        s3path: company/search/
  push_to_elasticsearch:
    config:
      elasticsearch_host: elasticsearch-elasticsearch-search
      elasticsearch_port: '9200'
      elasticsearch_scheme: http
    inputs:
      coordinates:
        basepath: /tmp/***HIDDEN***/master/company/search/
        bucket: ***HIDDEN***-storage
        checksums: checksums
        s3path: company/search/
The run_config is exactly the same between the 2 jobs. (just offuscated some secrets to share this with you).
a

alex

07/22/2020, 3:27 PM
ok thanks for checking that
so the
kubectl logs
can be a bit off due to some manipulation we do to stdout. What do you see if you look at the event logs in
dagit
for the two runs?
g

Gaetan DELBART

07/22/2020, 3:32 PM
19be3d4c-97b2-4edd-914b-62b3ff35b926
(the one launched in schedule)
bd3644ff-ab43-47a7-8098-3456acae9b60
(the one launched with dagit)
a

alex

07/22/2020, 3:35 PM
ok so it appears the
run_config
is different - the launched in schedule one is using the in process executor. How did you verify they were the same above?
this button in dagit will show what the run DB has
t

Titouan

07/22/2020, 3:38 PM
in the Schedule view config it shows exactly the same run_config BUT when we go to the jobs (LATEST RUN) and view configuration on the latest run yes it misses the execution node in the config.
a

alex

07/22/2020, 3:42 PM
how do you deploy? does this config look like it could be from an old stale cron job?
t

Titouan

07/22/2020, 3:46 PM
We build a docker image with Dagster/Dagit & dataPipeline and rollout the new Docker image in CI/CD (Gitlab). The infrastructure is deployed with Helm on the cluster k8s. Seems it's a config problem on our end.
Copy code
@schedule(
    pipeline_name='company_init_pipeline',
    cron_schedule='*/10 * * * *',
    name='company_init_pipeline_schedule',
    mode='kubernetes',
    environment_vars={
            key: os.environ.get(key)
            for key in [
                'DAGSTER_PG_PASSWORD',
                'DAGSTER_K8S_CELERY_BROKER',
                'DAGSTER_K8S_CELERY_BACKEND',
                'DAGSTER_K8S_PIPELINE_RUN_IMAGE',
                'DAGSTER_K8S_PIPELINE_RUN_NAMESPACE',
                'DAGSTER_K8S_INSTANCE_CONFIG_MAP',
                'DAGSTER_K8S_PG_PASSWORD_SECRET',
                'DAGSTER_K8S_PIPELINE_RUN_ENV_CONFIGMAP',
                'DAGSTER_K8S_PIPELINE_RUN_IMAGE_PULL_POLICY',
                'KUBERNETES_SERVICE_HOST',
                'KUBERNETES_SERVICE_PORT',
            ]
            if key in os.environ
        },
    )
def daily_company_init_pipeline(instance):
    env = get_env()
    if env != 'dev':
        files = [
            script_relative_path('environments/init/common/*'),
            script_relative_path(f'environments/init/{env}/*')
        ]
    else:
        files = [
            script_relative_path('environments/init/dev/*'),
        ]
    config = PresetDefinition.from_files(
            env,
            mode='kubernetes',
            config_files=files,
        ),
    return config[0].run_config
In the scheduler run_config we see the right config but not in jobs.
a

alex

07/22/2020, 3:49 PM
hmm interesting - i wonder if
script_relative_path
is getting tripped up when cron runs it from its own weird working directory
t

Titouan

07/22/2020, 3:51 PM
our get_env resolver
Copy code
import os
import json


def get_env():
    filename = os.path.join('/', 'tmp', 'cnty.json')
    if os.path.exists(filename):
        # read the file & return the env
        with open(filename, 'rb') as f:
            cnty_envs = json.loads(f.read())
            return cnty_envs['env']

    # Check in env variables
    if os.environ.get('CNTY_ENV') is not None:
        return os.environ.get('CNTY_ENV')

    # backup to default env
    return 'dev'
g

Gaetan DELBART

07/22/2020, 3:52 PM
it seems it always return 'dev', since, the schedule run use our dev config
t

Titouan

07/22/2020, 3:52 PM
seems in the cron runs we cannot resolve the env var neither the file
/tmp/cnty.json
a

alex

07/22/2020, 3:52 PM
add
CITY_ENV
to the environment_vars portion of the schedule set up so that its set when run from cron?
t

Titouan

07/22/2020, 3:52 PM
yep we're on it
a

alex

07/22/2020, 3:53 PM
kk seems like you have a good lead - good luck!
👍 1
t

Titouan

07/22/2020, 4:51 PM
🙏 thanks for your help, it fixed it. cron subprocess hell... 😉
g

Gaetan DELBART

07/22/2020, 4:51 PM
It was indeed a config issue. Thanks for pointing that out 🙏
a

alex

07/22/2020, 5:05 PM
ya we have some early prototypes for having a scheduler implementation that leverages the k8s scheduling primitives so keep an eye out for that in the next month or so
😍 3
2 Views