Have made quite a bit of progress but after findin...
# deployment-kubernetes
t
Have made quite a bit of progress but after finding out we can't have dagster really execute runs in ECS without some serious workarounds we are looking at having it done via our EKS clusters. We are not running Dagster/Dagit in EKS, it is on it's own EC2 instance, however we want our jobs and operations to run in EKS where they can utilize Fargate profiles so that way the jobs actually spin up and execute in Fargate. Kubectl and the necessary kubeconfig are installed on the dagster instance. We are getting an error during run time that I just can't reconcile within the code for our job however so I'm posting up here hoping for guidance.
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "extract_tableau":
The above exception was caused by the following exception:
TypeError: execute_k8s_job() missing 1 required positional argument: 'context'
We don't have a run launcher in our dagster.yaml as we aren't sure which one we would need for this or what info we would need to add in to it for this to run the way we would need it to. Below is the code with certain items redacted for security.. https://gist.github.com/telderfts/9a52686c5743a24b9169efb87ff83308 Any assistance is appreciated
d
You can pass through the context arg from the op to execute_k8s_job:
Copy code
execute_k8s_job(context, ...)
another thing that jumps out there (not causing this error, but possibly causing future errors) - instead of:
Copy code
resources={
            "limit_memory": "12Gi",
            "request_memory": "12Gi",
            "limit_cpu": 2,
            "request_cpu": 2,
        },
I think you want:
Copy code
resources={
            "limits": {"memory": "12Gi", "cpu": "2"},
            "requests": {"memory": "12Gi", "cpu": "2"},
        },
(just stealth-edited "limit" to "limits" and "request" to "requests")
t
Awesome, I'll give that a shot. Thanks Daniel!
@daniel So would the code block look like this??
Copy code
execute_k8s_job(context(
        image=f"<http://xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0|xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0>",
        env_vars=[
            f"AWS_ACCESS_KEY_ID={secretsmanager_secrets_aws_key}",
            f"AWS_SECRET_ACCESS_KEY={secretsmanager_secrets_aws_secret}",
            "EXTRACT_PATH=/opt/extract.hyper",
            "PARQUET_PATH=xxxxxxx/donor_bom.parquet",
            "DATASOURCE_NAME=dagster_test",
            "ORG_EXCLUDE=",
            f"TABLEAU_USER={secretsmanager_secrets_tableau_user}",
            f"TABLEAU_PASS={secretsmanager_secrets_tableau_pass}",
            f"TABLEAU_PROJECT=QA",
        ],
        resources={
            "limits": {"memory": "12Gi", "cpu": "2"},
            "requests": {"memory": "12Gi", "cpu": "2"},
        },
        labels={
            "xxxxxxxx/role": "datasci",
            "xxxxxxxx/app": "airflow",
            "xxxxxxxx/purpose": "",
            "xxxxxxxx/environment": "produsa",
        },
        timeout=600,
    )
    )
Or would we need to format it a different way?
d
context would be the first argument to the function
Copy code
execute_k8s_job(
        context,
        image=f"<http://xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0|xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0>",
        env_vars=[
        ...
or if you like:
Copy code
execute_k8s_job(
        context=context,
        image=f"<http://xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0|xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0>",
        env_vars=[
        ...
t
ahhhh, gotcha. Working with Airflow for so long it's taking a little bit to get used to the syntax changes 🙂
d
all good!
t
Unfortunately I add that in and it won't let me reload the code location... https://gist.github.com/telderfts/1d80ed3b57754151212e0a963b1ffeb2
oh, sorry - i think context needs to be the first argument?
Copy code
def extract_tableau(context, orgs):
👍 1
Error message there could definitely be better
t
Progress!!! It spit the error out below, I'm guessing because we don't have a run launcher in dagster.yaml possibly? Like I mentioned I'm not sure which one out of the 2-3 we would need in there for this if that is the case..
Copy code
agster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "extract_tableau":
The above exception was caused by the following exception:
kubernetes.config.config_exception.ConfigException: Service host/port is not set.
d
The run launcher isn't going to come into play at all if you're using execute_k8s_job
A full stack trace for the error you're seeing will be helpful
t
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "extract_tableau":
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_plan.py", line 268, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 369, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 90, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/compute.py", line 186, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/compute.py", line 155, in _yield_compute_results
    for event in iterate_with_context(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 445, in iterate_with_context
    return
  File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
kubernetes.config.config_exception.ConfigException: Service host/port is not set.
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 443, in iterate_with_context
    next_output = next(iterator)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/compute_generator.py", line 122, in _coerce_solid_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_core/execution/plan/compute_generator.py", line 116, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/opt/dagster/FTS_Data_Science/dagster-jobs/dagster_jobs/dynamic_graph.py", line 77, in extract_tableau
    execute_k8s_job(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster/_annotations.py", line 106, in inner
    return target(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/dagster_k8s/ops/k8s_job_op.py", line 279, in execute_k8s_job
    kubernetes.config.load_incluster_config()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 118, in load_incluster_config
    InClusterConfigLoader(
  File "/home/ubuntu/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 54, in load_and_set
    self._load_config()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 62, in _load_config
    raise ConfigException("Service host/port is not set.")
d
If you're not running this within a k8s cluster, you'll want to set the load_incluster_config parameter to False and specify a kubeconfig_file: https://docs.dagster.io/_apidocs/libraries/dagster-k8s#dagster_k8s.execute_k8s_job
👍 1
t
Is that done within the op or the dagster.yaml file?
d
The op
t
Cool, got past all of that and it's a matter of permissions now. Thanks Daniel!
🎉 1