hey folks, I'm trying to learn different options t...
# ask-community
c
hey folks, I'm trying to learn different options to authenticate from dagster to Databricks (azure), does anyone know if I can use MSI or service principal?
z
If you're using the
create_databricks_run_now_op
or
create_databricks_submit_run_op
you should be able to use a service principal. The
databricks_pyspark_step_launcher
currently does not allow using a service principal, although there is a PR out for it so it should be possible soon
❤️ 1
👍 1
c
thank you!
🎉 1
What if I use JobsApi instead?
jobs_client = JobsApi(context.resources.databricks_client.api_client)
run_dbx_job = jobs_client.run_now(....)
z
Yeah if you're using the api_client exposed by the databricks_client resource you can do whatever the Databricks jobs API exposes. However be aware that the
api_client
property is deprecated in
dagster-databricks==0.20.10
and later versions as it uses the old third-party
databricks-api
library - it should be preferred to use
context.resources.databricks_client.workspace_client
, as this exposes the
WorkspaceClient
class from the databricks-sdk, which is the new-ish SDK that Databricks supports. the
api_client
property looks like it'll probably be removed in the next minor version update, which sounds like it's scheduled for late this month.
c
@Zach thanks for all of your help. Now I'm wondering which is the right place to define the service principal, I'm using "databricks_client" to define host and token and passing this as a decoration parameter in the job. Should I use same structure to define the service principal?, if that's the case which one would be the json keys?
databricks_client_instance = databricks_client.configured(
{
"host": "<HOST>",
"token": "<TOKEN>",
}
)
@job(resource_defs={"databricks_client": databricks_client_instance})
def hist_load_dynamic_graph():
.......
@op(ins={"start_after": In(Nothing)}, required_resource_keys={"databricks_client"})
def dbx_historical_load_op(context, config: DbxConfig):
jobs_client = JobsApi(context.resources.databricks_client.api_client)
......
......
z
Ah okay I apologize, it seems I was mistaken in my assumption that you'd be able to instantiate the JobsApi using a service principal. I actually haven't used the DatabricksClientResource a lot myself. My PR to enable oauth just landed so I think the functionality will be enabled in the next release which should come out tomorrow. Sorry about that, looking at the current version again I don't think it's possible to interact with the Databricks API using a service principal. If you're unable to upgrade to the latest version tomorrow when it comes out, you could just pull the code for the new client implementation out of the Dagster repo here.
keanu thanks 1
c
Hi @Zach, Thanks for all the information. I'm trying to use service principal authentication available in the latest release, and i'd like to ask you if you have an example of how to define the configuration... I'm trying this way
databricks_client_instance = databricks_client.configured(
{
"host": "<HOST>",
"oauth_credentials": {"client_id":"<sp_id>", "client_secret":"<secret>"}
}
)
@job(resource_defs={"databricks_client": databricks_client_instance})
def hist_load_dynamic_graph():
dbx_historical_load_op()
An also I'm getting an error related to a missing module, do I need to install an additional module to use this feature?
ValueError: default auth: runtime: default auth: cannot configure default credentials. Config: host=<HOST>, client_id=<sp_id>, client_secret=***
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/errors.py", line 286, in user_code_error_boundary
yield
File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/resources_init.py", line 324, in single_resource_event_generator
resource_def.resource_fn(context)
File "/usr/local/lib/python3.10/dist-packages/dagster_databricks/resources.py", line 85, in databricks_client
return DatabricksClientResource.from_resource_context(init_context).get_client()
File "/usr/local/lib/python3.10/dist-packages/dagster_databricks/resources.py", line 70, in get_client
return DatabricksClient(
File "/usr/local/lib/python3.10/dist-packages/dagster_databricks/databricks.py", line 46, in __init__
self._workspace_client = WorkspaceClient(
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/__init__.py", line 108, in __init__
config = client.Config(host=host,
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 521, in __init__
raise ValueError(message) from e
The above exception was caused by the following exception:
ValueError: default auth: runtime: default auth: cannot configure default credentials
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 516, in __init__
self._init_auth()
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 836, in _init_auth
raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e
The above exception was caused by the following exception:
ValueError: runtime: default auth: cannot configure default credentials
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 831, in _init_auth
self._header_factory = self._credentials_provider(self)
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 438, in __call__
raise ValueError(f'{auth_type}: {e}') from e
The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 432, in __call__
header_factory = provider(cfg)
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 62, in wrapper
return func(cfg)
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 95, in runtime_native_auth
from databricks.sdk.runtime import (init_runtime_legacy_auth,
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 101, in <module>
dbutils = RemoteDbUtils()
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/dbutils.py", line 169, in __init__
self._config = Config() if not config else config
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 521, in __init__
raise ValueError(message) from e
The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 516, in __init__
self._init_auth()
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 836, in _init_auth
raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e
The above exception was caused by the following exception:
ValueError: cannot configure default credentials
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 831, in _init_auth
self._header_factory = self._credentials_provider(self)
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 439, in __call__
raise ValueError('cannot configure default credentials')
The above exception occurred during handling of the following exception:
NameError: name 'spark' is not defined
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 98, in <module>
from .stub import *
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/stub.py", line 7, in <module>
sc = spark.sparkContext
The above exception occurred during handling of the following exception:
ModuleNotFoundError: No module named 'dbruntime'
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 76, in <module>
from dbruntime import UserNamespaceInitializer
z
Hi @Cristian Fuentes - I'll take a look into this today. I'm not sure about the missing module part, that's very odd and I don't recognize the dbruntime package at all - it's not installed in my environment. But I think I know what the auth issue is and will hopefully have a fix out for this week's release
Hmm actually I'm not able to reproduce the error. I copied your example code with a simple op and it initialized the DatabricksClientResource successfully. Is that the entire traceback that you provided? Also, one thing to try would be to recreate your python environment. What version of
databricks-sdk
do you have installed?
c
Hi @Zach, I'm using "databricks-sdk==0.6.0", I was checking the source code of databricks sdk and I realized that the the try-except is not working properly, I'm trying to understand the reason. https://github.com/databricks/databricks-sdk-py/blob/cef100c3a3c9dce91bc2bc8d6c59a93febd8f707/databricks/sdk/runtime/__init__.py#L76
z
Hmm interesting, yeah that's the same version I'm using. That's the entire traceback too, right?
It feels like it's not recognizing the oauth credentials and is instead trying to import your credentials from the environment or from the databricks config file. Do you happen to have databricks auth environment variables set up?
c
Yes is the entire traceback... as far as I know I don't have set up any databricks auth environment, how can I check that? by the way I'm deploying dagster in kubernetes, not sure if this is information is useful.
z
Hmm alright. I'll continue to try to poke at it later today when I get some time. I don't think the kubernetes deployment part should matter, but it would be interesting to know if you get a similar error with a local Dagster deployment if you have a way to set one up
c
Ok, thank you!
z
I’ll echo here to say we’re also running into the same issue. Running locally and trying to use simple Host & Access token ends up failing when urllib cant make a connection to
DATABRICKS_METADATA_SERVICE_URL
, which is somehow set during runtime to a address ~
127.0.0.1:/<Some random id/token>
Where or how this variable is being set, I haven’t been able to figure out. Searching for it only leads me to the go sdk docs
z
Can you provide the code you're using?
Also, was this code that you had working when using a previous version of Dagster?
@Cristian Fuentes were you able to reproduce the error in a local deployment?
z
We use the databricks pyspark step launcher, and have been for several months now. Seems like it’s breaking with the new databricks sdk
Copy code
def db_cluster_config(
    cluster_name:str,
    n_workers: int,
) -> dict:
    return {
        "run_config": {
            "run_name": cluster_name,
            "cluster": {
                "new": {
                    "init_scripts": [
                        {
                            "s3": {
                                "destination": (
                                    "our_s3_location"
                                ),
                                "endpoint": "<https://s3.amazonaws.com>",
                                "region": "us-east-1",
                            }
                        },
                    ],
                    "ssh_public_keys": [],  # spec up to 10 keys here.
                    "enable_elastic_disk": True,
                    "size": {"num_workers": n_workers},
                    "spark_version": "13.3.x-photon-scala2.12",
                    "nodes": {
                        "node_types": {
                            # For some reason, these get inverted?
                            "node_type_id": "m6id.16xlarge"
                        }
                    },  # May also define instance pool id here instead.
                    "aws_attributes": {
                        "first_on_demand": 1,
                        "availability": "SPOT_WITH_FALLBACK",
                        "instance_profile_arn": (
                            "arn::our_instance_profile"
                        ),
                    },
                    "cluster_log_conf": {"dbfs": {"destination": "dbfs:/dagster_staging"}},
                    "custom_tags": [{"key": "team", "value": "dataeng"}],
                }
            },
            "libraries": [
                ... # various libs
            ],
            "install_default_libraries": True,
        },
        "permissions": {
            "job_permissions": {"CAN_VIEW": [{"group_name": "Data Engineers"}]},
            "cluster_permissions": {"CAN_ATTACH_TO": [{"group_name": "Data Engineers"}]},
        },
        "secrets_to_env_variables": [
            {
                "name": "DATABRICKS_HOST",
                "key": "DATABRICKS_HOST",
                "scope": "dagster",
            },
            {
                "name": "DATABRICKS_TOKEN",
                "key": "DATABRICKS_TOKEN",
                "scope": "dagster",
            },
        ],
        "databricks_host": {"env": "DATABRICKS_HOST"},
        "databricks_token": {"env": "DATABRICKS_TOKEN"},
        "local_pipeline_package_path": str(Path(__file__).parent.parent),
        "env_variables": {
           k:v for k,v in secure_get_from_ssm() # gets some needed tokens to connect to other serices
        },
        "wait_for_logs": True,
    }
databricks_pyspark_step_launcher.configured(**db_cluster_config)
z
Can you show the traceback you get when this happens? And what specific version are you using? And what is your Databricks host? Does it start with https://?
I've tested the step launcher in the latest version with both token auth and oauth and was able to launch jobs successfully, so it feels like it could be an environment-specific thing.
z
We’re trying to update to dagster 1.4.12, our db host is in the form
https://<our id>.<http://cloud.databricks.com|cloud.databricks.com>
z
Dang, was hoping that'd be it.
z
Pulling up the stack trace atm
Copy code
alueError: default auth: metadata-service: HTTPConnectionPool(host='127.0.0.1', port=65262): Max retries exceeded with url: /<Some UUID?> (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused')). Config: host=<https://our_host.cloud.databricks.com>, token=***, auth_type=metadata-service, metadata_service_url=***. Env: DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_AUTH_TYPE, DATABRICKS_METADATA_SERVICE_URL
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 286, in user_code_error_boundary
    yield
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 324, in single_resource_event_generator
    resource_def.resource_fn(context)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 189, in databricks_pyspark_step_launcher
    return DatabricksPySparkStepLauncher(**context.resource_config)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 255, in __init__
    self.databricks_runner = DatabricksJobRunner(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster_databricks/databricks.py", line 327, in __init__
    self._client: DatabricksClient = DatabricksClient(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/dagster_databricks/databricks.py", line 46, in __init__
    self._workspace_client = WorkspaceClient(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/__init__.py", line 108, in __init__
    config = client.Config(host=host,
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 521, in __init__
    raise ValueError(message) from e
The above exception was caused by the following exception:
ValueError: default auth: metadata-service: HTTPConnectionPool(host='127.0.0.1', port=65262): Max retries exceeded with url: /<some uuid?> (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused'))
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 516, in __init__
    self._init_auth()
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 836, in _init_auth
    raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e
The above exception was caused by the following exception:
ValueError: metadata-service: HTTPConnectionPool(host='127.0.0.1', port=65262): Max retries exceeded with url: /<some uuid?> (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused'))
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 831, in _init_auth
    self._header_factory = self._credentials_provider(self)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 438, in __call__
    raise ValueError(f'{auth_type}: {e}') from e
The above exception was caused by the following exception:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=65262): Max retries exceeded with url: /<some uuid> (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused'))
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 432, in __call__
    header_factory = provider(cfg)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 62, in wrapper
    return func(cfg)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 400, in metadata_service
    token_source.token()
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/oauth.py", line 140, in token
    self._token = self.refresh()
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/databricks/sdk/core.py", line 372, in refresh
    resp = requests.get(self.url,
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/api.py", line 73, in get
    return request("get", url, params=params, **kwargs)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/api.py", line 59, in request
    return session.request(method=method, url=url, **kwargs)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/adapters.py", line 519, in send
    raise ConnectionError(e, request=request)
The above exception occurred during handling of the following exception:
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='127.0.0.1', port=65262): Max retries exceeded with url: /<ugh the same uuid> (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused'))
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py", line 798, in urlopen
    retries = retries.increment(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
The above exception occurred during handling of the following exception:
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x1535d6590>: Failed to establish a new connection: [Errno 61] Connection refused
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py", line 714, in urlopen
    httplib_response = self._make_request(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py", line 415, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connection.py", line 244, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/opt/homebrew/Cellar/python@3.10/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py", line 1283, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/opt/homebrew/Cellar/python@3.10/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py", line 1329, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/opt/homebrew/Cellar/python@3.10/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py", line 1278, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/opt/homebrew/Cellar/python@3.10/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py", line 1038, in _send_output
    self.send(msg)
  File "/opt/homebrew/Cellar/python@3.10/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py", line 976, in send
    self.connect()
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
The above exception occurred during handling of the following exception:
ConnectionRefusedError: [Errno 61] Connection refused
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/Users/zpaden/Workspace/locallogic-dagster/.venv/lib/python3.10/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
z
One thing that would be interesting for you to try if possible would be to remove DATABRICKS_HOST and DATABRICKS_TOKEN from the environment and temporarily hardcoding them
z
Interesting, I’ll give it a shot. I’m also seeing what happens if I manually set
DATABRICKS_METADATA_SERVICE_URL
to “”.
z
The Databricks SDK will look through a number of different places that auth values could be stored, and it feels like it's seeing some environment variables being set and trying to authenticate using those. In your case it seems to be trying to default to the oauth flow which is strange
z
Yeah, I’ve been debugging in
DefaultCredentials
of databricks.sdk.core and it seems to be something going on there.
z
Yeah it's a bit confusing in there but it feels like somehow the PAT isn't getting picked up. It would be interesting for you to drop some breakpoints into that part of
databricks.sdk.core
to see what kwargs are getting passed into that Config class. I'd also be curious if you tried just instantiating a WorkspaceClient manually whether you'll run into the same issue:fr
Copy code
from databricks.sdk import WorkspaceClient

workspace_client = WorkspaceClient(
            host=host,
            token=token,
            product="dagster-databricks",
            product_version=__version__,
        )
If the same error results, then it would be interesting to try to force PAT auth like this:
Copy code
workspace_client = WorkspaceClient(
            host=host,
            token=token,
            product="dagster-databricks",
            product_version=__version__,
            auth_type="pat"
        )
z
Yeah, just running WorksspaceClient() works well. At first I just assumed my token was expired
Strangely I’ve unset the
DATABRICKS_HOST
and token, but then when I launch dagster somehow, somewhere is populating the value again with a non-https value? I’m now super confused where this is coming from 🤯
z
That feels highly suspicious
Maybe try working your way out - you could try instantiating the
DatabricksClient
class from
dagster-databricks
and see if that works, then the
DatabricksJobRunner
I'm surprised that instantiating the WorkspaceClient manually worked well - was this before or after you unset the env vars?
z
before I unset!
after unsetting, workspace client cant auth.
z
Could you show me your instantiation call?
z
Before attempting to unset;
Copy code
wc = WorkspaceClient()
Just attempted to create a DatabricksClient from dagster in a new setup (eg: not running dagster-webserver) and was able to do it with hardcoding
z
Ah okay, I'm specifically interested in what happens when a token and host are provided explicitly to the WorkspaceClient AND the variables are present in the environment
z
ahh let met try that
providing both seems to work (eg: running
WorkspaceClient(host="https://***.<http://cloud.databricks.com|cloud.databricks.com>", token="xxx").catalogs.list()
resolves to our catalogs
I do recall seeing some other kwargs being passed, going to go back and see what those were.
z
Okay hmm. I wish I could get this to reproduce on my end, I've tried adding the env vars on my end and haven't been able to force the same issue yet.
When I remove the token argument I get the same error as @Cristian Fuentes, so potentially their issue is related to missing environment variables being used to configure the DatabricksClient
z
Hmmm I think you may be on to something re: https:// In .zshrc I have it set to a https url; but somehow when launching dagster-webserver it gets reset to the same url without https://
I’m not aware of anything in our code base that would set this (doesnt mean that their couldnt be, but I’ve tried to look and see and came up with nothing)
c
Hi folks, In my case when I use token, it's working fine,
databricks_client_instance = databricks_client.configured(
{
"host": "https://<id>.<http://azuredatabricks.net|azuredatabricks.net>",
"token": "<MY_TOKEN>",
}
)
My problem is when I try to use oauth_credentials
databricks_client_instance = databricks_client.configured(
{
"host": "https://<id>.<http://azuredatabricks.net|azuredatabricks.net>",
"oauth_credentials": {"client_id":"<Service_principal>", "client_secret":"<Secret>"}
}
)
z
Okay, I think this is an issue with vscode on my end.
Swapping to just using iterm, I’m able to connect and upload everything to start a job and the environmental variables don’t get mangled.
z
Thanks for the clarification @Cristian Fuentes - have you been able to check whether there are any Databricks-related environment variables set in your execution environment? You could check this by using a test op that just logs the environment variables:
Copy code
import os
@op
def something(context):
  <http://context.log.info|context.log.info>(f"environment: {os.environ}")
Also, have you had a chance to try instantiating the DatabricksClient from
dagster-databricks
locally? It would be interesting to see if you run into the same issue locally to determine whether this error is specific to your kubernetes environment
Copy code
def test():
    from databricks_cli.jobs.api import JobsApi
    from dagster_databricks import DatabricksClient
    databricks_client = DatabricksClient(host="<your-host>", oauth_client_id="<your-client-id>", oauth_client_secret="<your-client-secret>")
    databricks_client.workspace_client.jobs.list()
c
Ok I'll check using the the test op, I haven't tested locally I need to setup my local environment
z
Okay, you should be able to run the test code I sent locally without Dagster
c
oh ok you are rigth, I'll try
Copy code
{
  "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
  "HOSTNAME": "dagster-run-2178b608-86a7-4fc1-a3d6-aee77f10b061-jzhgr",
  "ASPNETCORE_URLS": "",
  "DOTNET_RUNNING_IN_CONTAINER": "true",
  "DOTNET_VERSION": "6.0.18",
  "ASPNET_VERSION": "6.0.18",
  "DOTNET_GENERATE_ASPNET_CERTIFICATE": "false",
  "DOTNET_NOLOGO": "true",
  "DOTNET_SDK_VERSION": "6.0.410",
  "DOTNET_USE_POLLING_FILE_WATCHER": "true",
  "NUGET_XMLDOC_MODE": "skip",
  "POWERSHELL_DISTRIBUTION_CHANNEL": "PSDocker-DotnetSDK-Ubuntu-22.04",
  "DAGSTER_PG_PASSWORD": "test",
  "DAGSTER_RUN_JOB_NAME": "nrt_load_dynamic_graph",
  "DAGSTER_HOME": "/opt/dagster/dagster_home",
  "ETL_DAGSTER_TEST_1_SERVICE_PORT": "3031",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "CDC_TOOL_INT_SERVICE_HOST": "10.0.102.243",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_PORT": "3032",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_PORT": "80",
  "CDC_TOOL_INT_SERVICE_PORT": "5001",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_PORT_GRPC": "3030",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_PORT": "3030",
  "ETL_DAGSTER_TEST_2_SERVICE_HOST": "10.0.182.10",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_HOST": "10.0.27.57",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_HOST": "10.0.188.187",
  "ETL_DAGSTER_TEST_1_SERVICE_HOST": "10.0.124.61",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_PORT": "3030",
  "KUBERNETES_PORT_443_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_PORT_GRPC": "3032",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_PROTO": "tcp",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_PORT": "8080",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_PORT": "3031",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_PORT": "5432",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.7.221",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP": "<tcp://10.0.28.124:8090>",
  "CDC_TOOL_INT_PORT": "<tcp://10.0.102.243:5001>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_2_PORT": "<tcp://10.0.176.12:3032>",
  "CONFLUENT_OPERATOR_PORT_7778_TCP": "<tcp://10.0.100.94:7778>",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_PROTO": "tcp",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT_KAFKA_CONNECT": "80",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_PORT": "7778",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT": "<tcp://10.0.27.57:80>",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_HOST": "10.0.162.2",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_ADDR": "10.0.27.57",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_ADDR": "10.0.217.5",
  "DPF_PGADMIN_SERVICE_SERVICE_PORT_DPF_PGADMIN": "8090",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_PORT": "3030",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_ADDR": "10.0.176.12",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_ADDR": "10.0.182.10",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT": "<tcp://10.0.7.221:5432>",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_PORT_HTTP": "80",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP": "<tcp://10.0.176.12:3032>",
  "ETL_DAGSTER_TEST_2_SERVICE_PORT": "3031",
  "ETL_DAGSTER_TEST_2_PORT": "<tcp://10.0.182.10:3031>",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT": "8080",
  "DPF_NRT_CONNECT_SRVC_PORT": "<tcp://10.0.105.184:8080>",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_PROTO": "tcp",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_PORT": "8090",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_ADDR": "10.0.28.124",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP": "<tcp://10.0.218.187:3030>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_PORT": "9187",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_PORT": "80",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_PORT": "3032",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_HOST": "10.0.7.221",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_PORT": "5432",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_HOST": "10.0.12.120",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_HOST": "10.0.35.98",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "KUBERNETES_SERVICE_PORT_HTTPS": "443",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_PORT": "3032",
  "CONFLUENT_OPERATOR_SERVICE_PORT": "7778",
  "CONFLUENT_OPERATOR_SERVICE_PORT_HTTP_METRIC": "7778",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT": "<tcp://10.0.35.98:5432>",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_PORT": "3030",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_PORT": "3032",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP": "<tcp://10.0.217.5:3032>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_PORT_HTTP_METRICS": "9187",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP": "<tcp://10.0.162.2:80>",
  "ETL_DAGSTER_TEST_1_SERVICE_PORT_GRPC": "3031",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT": "<tcp://10.0.18.91:5432>",
  "KUBERNETES_PORT": "<tcp://10.0.0.1:443>",
  "CDC_TOOL_INT_PORT_5001_TCP_PORT": "5001",
  "DPF_DAGSTER_REPLICATION_1_PORT": "<tcp://10.0.217.5:3032>",
  "DPF_NRT_CONNECT_SRVC_SERVICE_HOST": "10.0.105.184",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT": "<tcp://10.0.188.187:9187>",
  "KUBERNETES_PORT_443_TCP": "<tcp://10.0.0.1:443>",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_ADDR": "10.0.105.184",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_PORT": "9187",
  "KUBERNETES_PORT_443_TCP_PORT": "443",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP": "<tcp://10.0.124.61:3031>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.35.98:5432>",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_PORT_GRPC": "3030",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP": "<tcp://10.0.221.135:3030>",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.7.221:5432>",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_ADDR": "10.0.162.2",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_HOST": "10.0.176.12",
  "KUBERNETES_SERVICE_PORT": "443",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP": "<tcp://10.0.105.184:80>",
  "CONFLUENT_OPERATOR_PORT": "<tcp://10.0.100.94:7778>",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_PORT": "3031",
  "DPF_PGADMIN_SERVICE_PORT": "<tcp://10.0.28.124:8090>",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT": "<tcp://10.0.162.2:80>",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_PORT_HTTP": "9115",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP": "<tcp://10.0.12.120:9115>",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_HOST": "10.0.221.135",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_ADDR": "10.0.221.135",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_PORT_GRPC": "3032",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.35.98",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP": "<tcp://10.0.27.57:80>",
  "DPF_PGADMIN_SERVICE_SERVICE_HOST": "10.0.28.124",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_PROTO": "tcp",
  "CDC_TOOL_INT_PORT_5001_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_HOST": "10.0.218.187",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_ADDR": "10.0.218.187",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_PORT": "5432",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_ADDR": "10.0.12.120",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.18.91",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_PORT": "80",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_PORT": "9115",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP": "<tcp://10.0.105.184:8080>",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_PROTO": "tcp",
  "CONFLUENT_OPERATOR_SERVICE_HOST": "10.0.100.94",
  "K8S_EXAMPLE_USER_CODE_2_PORT": "<tcp://10.0.221.135:3030>",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_HOST": "10.0.217.5",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_ADDR": "10.0.105.184",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT": "<tcp://10.0.12.120:9115>",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_PORT": "80",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_ADDR": "10.0.188.187",
  "CDC_TOOL_INT_PORT_5001_TCP": "<tcp://10.0.102.243:5001>",
  "CDC_TOOL_INT_PORT_5001_TCP_ADDR": "10.0.102.243",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_ADDR": "10.0.124.61",
  "KUBERNETES_SERVICE_HOST": "10.0.0.1",
  "ETL_DAGSTER_TEST_2_SERVICE_PORT_GRPC": "3031",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_PORT_HTTP": "80",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_PROTO": "tcp",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_PORT": "9115",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.18.91:5432>",
  "KUBERNETES_PORT_443_TCP_ADDR": "10.0.0.1",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP": "<tcp://10.0.182.10:3031>",
  "ETL_DAGSTER_TEST_1_PORT": "<tcp://10.0.124.61:3031>",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_ADDR": "10.0.100.94",
  "DPF_PGADMIN_SERVICE_SERVICE_PORT": "8090",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP": "<tcp://10.0.188.187:9187>",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_PORT": "80",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT_CONNECT_JMX": "8080",
  "K8S_EXAMPLE_USER_CODE_3_PORT": "<tcp://10.0.218.187:3030>",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_HOST": "10.0.18.91",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_PROTO": "tcp",
  "HOME": "/root",
  "LC_CTYPE": "C.UTF-8",
  "DATABRICKS_TOKEN": ""
}
z
Is that DATABRICKS_TOKEN env var actually empty? Or did you just edit it out of the text so it wasn't exposed?
c
is empty, and I'm trying to check where is this variable set I don't have any reference on my project or dagster deployment
z
Yeah there's a strong possibility that's where your error is coming from when you're using oauth - I think it's detecting that key and then trying to authenticate using a token whose value ends up just being an empty string
c
hmmm ok, I'll figure it out where is this variable set.. 🤷‍♂️
z
Yeah that's super odd, especially since it's the only one. Zach P seemed to be running into some weird things with Databricks env vars being set unexpectedly too, I wonder if the databricks-sdk is doing something weird under the hood with setting env vars on the fly. I'll dig into the source some more and see if I can find something to that effect. When you ran this test op to print the environment, did that same Dagster environment contain the original job that prompted this whole thread? More specifically, was
Copy code
databricks_client_instance = databricks_client.configured(
    {
        "host": "<HOST>",
        "oauth_credentials": {"client_id":"<sp_id>", "client_secret":"<secret>"}
    }
)
@job(resource_defs={"databricks_client": databricks_client_instance})
def hist_load_dynamic_graph():
       dbx_historical_load_op()
also loaded into Dagster? Also, it's a bit of a hack but you could try something like this:
Copy code
import os
del os.environ["DATABRICKS_TOKEN"]

databricks_client_instance = databricks_client.configured(
    {
        "host": "<HOST>",
        "oauth_credentials": {"client_id":"<sp_id>", "client_secret":"<secret>"}
    }
)
@job(resource_defs={"databricks_client": databricks_client_instance})
def hist_load_dynamic_graph():
       dbx_historical_load_op()
c
Yes that is actually what I'm doing right now.
del os.environ......
I added an operation to delete the env variable, previous to the databricks op, but still same problem.
Copy code
{
  "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
  "HOSTNAME": "dagster-run-34a461b9-c614-463f-a11c-3851bf7017ac-qlzb8",
  "ASPNETCORE_URLS": "",
  "DOTNET_RUNNING_IN_CONTAINER": "true",
  "DOTNET_VERSION": "6.0.18",
  "ASPNET_VERSION": "6.0.18",
  "DOTNET_GENERATE_ASPNET_CERTIFICATE": "false",
  "DOTNET_NOLOGO": "true",
  "DOTNET_SDK_VERSION": "6.0.410",
  "DOTNET_USE_POLLING_FILE_WATCHER": "true",
  "NUGET_XMLDOC_MODE": "skip",
  "POWERSHELL_DISTRIBUTION_CHANNEL": "PSDocker-DotnetSDK-Ubuntu-22.04",
  "DAGSTER_RUN_JOB_NAME": "nrt_load_dynamic_graph",
  "DAGSTER_HOME": "/opt/dagster/dagster_home",
  "DAGSTER_PG_PASSWORD": "test",
  "KUBERNETES_SERVICE_PORT_HTTPS": "443",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT_CONNECT_JMX": "8080",
  "CDC_TOOL_INT_PORT_5001_TCP_ADDR": "10.0.102.243",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.35.98:5432>",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_PORT": "8090",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP": "<tcp://10.0.162.2:80>",
  "ETL_DAGSTER_TEST_2_PORT": "<tcp://10.0.182.10:3031>",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_ADDR": "10.0.124.61",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_ADDR": "10.0.221.135",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_HOST": "10.0.176.12",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_PORT": "3030",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_ADDR": "10.0.162.2",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_PORT": "3031",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_PORT_HTTP": "9115",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_HOST": "10.0.7.221",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_ADDR": "10.0.105.184",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_PORT": "3030",
  "CONFLUENT_OPERATOR_SERVICE_HOST": "10.0.100.94",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_HOST": "10.0.27.57",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_PORT": "5432",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_ADDR": "10.0.217.5",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.7.221:5432>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT": "<tcp://10.0.188.187:9187>",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_HOST": "10.0.18.91",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP": "<tcp://10.0.105.184:8080>",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_PROTO": "tcp",
  "KUBERNETES_PORT_443_TCP_ADDR": "10.0.0.1",
  "ETL_DAGSTER_TEST_1_SERVICE_PORT": "3031",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT": "8080",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_HOST": "10.0.12.120",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.7.221",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP": "<tcp://10.0.176.12:3032>",
  "CONFLUENT_OPERATOR_SERVICE_PORT": "7778",
  "K8S_EXAMPLE_USER_CODE_2_PORT": "<tcp://10.0.221.135:3030>",
  "DPF_PGADMIN_SERVICE_SERVICE_PORT": "8090",
  "PROMETHEUS_BLACKBOX_EXPORTER_SERVICE_PORT": "9115",
  "K8S_EXAMPLE_USER_CODE_3_PORT": "<tcp://10.0.218.187:3030>",
  "DPF_NRT_CONNECT_SRVC_PORT": "<tcp://10.0.105.184:8080>",
  "CDC_TOOL_INT_SERVICE_PORT": "5001",
  "CDC_TOOL_INT_PORT_5001_TCP_PORT": "5001",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_SERVICE_PORT": "5432",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT": "<tcp://10.0.7.221:5432>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_PORT_HTTP_METRICS": "9187",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.18.91",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP": "<tcp://10.0.27.57:80>",
  "ETL_DAGSTER_TEST_2_SERVICE_PORT": "3031",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_PORT": "3030",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP": "<tcp://10.0.124.61:3031>",
  "ETL_DAGSTER_TEST_2_SERVICE_HOST": "10.0.182.10",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_PORT_TCP_POSTGRESQL": "5432",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT_80_TCP_PORT": "80",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP": "<tcp://10.0.12.120:9115>",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_ADDR": "10.0.218.187",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_PORT_GRPC": "3032",
  "KUBERNETES_PORT_443_TCP_PORT": "443",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT": "<tcp://10.0.18.91:5432>",
  "CONFLUENT_OPERATOR_PORT_7778_TCP": "<tcp://10.0.100.94:7778>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP": "<tcp://10.0.188.187:9187>",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_PORT": "9115",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_PROTO": "tcp",
  "DPF_PGADMIN_SERVICE_PORT": "<tcp://10.0.28.124:8090>",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP": "<tcp://10.0.217.5:3032>",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT": "<tcp://10.0.12.120:9115>",
  "KUBERNETES_PORT_443_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE_DAGIT_PORT": "<tcp://10.0.162.2:80>",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP": "<tcp://10.0.182.10:3031>",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_2_PORT": "<tcp://10.0.176.12:3032>",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP": "<tcp://10.0.28.124:8090>",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_HOST": "10.0.217.5",
  "CDC_TOOL_INT_PORT_5001_TCP_PROTO": "tcp",
  "DPF_PGADMIN_SERVICE_SERVICE_PORT_DPF_PGADMIN": "8090",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_PORT": "9187",
  "CONFLUENT_OPERATOR_PORT": "<tcp://10.0.100.94:7778>",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_ADDR": "10.0.105.184",
  "CDC_TOOL_INT_SERVICE_HOST": "10.0.102.243",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_HOST": "10.0.162.2",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_PORT": "80",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_PORT": "80",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP": "<tcp://10.0.105.184:80>",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_PORT": "80",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_PORT": "3032",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_PORT": "7778",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_HOST": "10.0.218.187",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_PORT": "9187",
  "DPF_NRT_CONNECT_SRVC_PORT_8080_TCP_PORT": "8080",
  "DPF_PGADMIN_SERVICE_SERVICE_HOST": "10.0.28.124",
  "DPF_PGADMIN_SERVICE_PORT_8090_TCP_ADDR": "10.0.28.124",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_PORT": "3032",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_PORT": "3030",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_PROTO": "tcp",
  "KUBERNETES_SERVICE_PORT": "443",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_ADDR": "10.0.188.187",
  "ETL_DAGSTER_TEST_1_SERVICE_HOST": "10.0.124.61",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_ADDR": "10.0.35.98",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP": "<tcp://10.0.218.187:3030>",
  "DPF_DAGSTER_REPLICATION_2_SERVICE_PORT": "3032",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_PORT_HTTP": "80",
  "DPF_NRT_CONNECT_SRVC_SERVICE_PORT_KAFKA_CONNECT": "80",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT": "<tcp://10.0.35.98:5432>",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_PORT": "3031",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_ADDR": "10.0.100.94",
  "CONFLUENT_OPERATOR_SERVICE_PORT_HTTP_METRIC": "7778",
  "CONFLUENT_OPERATOR_PORT_7778_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT_80_TCP_ADDR": "10.0.27.57",
  "DPF_NRT_CONNECT_SRVC_SERVICE_HOST": "10.0.105.184",
  "CDC_TOOL_INT_PORT_5001_TCP": "<tcp://10.0.102.243:5001>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_SERVICE_HOST": "10.0.35.98",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_SERVICE_PORT": "5432",
  "ETL_DAGSTER_TEST_1_SERVICE_PORT_GRPC": "3031",
  "KUBERNETES_PORT_443_TCP": "<tcp://10.0.0.1:443>",
  "PROMETHEUS_BLACKBOX_EXPORTER_PORT_9115_TCP_ADDR": "10.0.12.120",
  "KUBERNETES_PORT": "<tcp://10.0.0.1:443>",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_SERVICE_HOST": "10.0.188.187",
  "DPF_DAGSTER_REPLICATION_1_SERVICE_PORT_GRPC": "3032",
  "DPF_DAGSTER_RELEASE_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_2_PORT_3030_TCP": "<tcp://10.0.221.135:3030>",
  "DPF_DAGSTER_RELEASE_DAGIT_SERVICE_PORT_HTTP": "80",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_METRICS_PORT_9187_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "DPF_DAGSTER_RELEASE12_DAGIT_SERVICE_PORT": "80",
  "DPF_POSTGRES_SRVR_DEV_POSTGRESQL_PORT_5432_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_PORT_GRPC": "3030",
  "ETL_DAGSTER_TEST_2_SERVICE_PORT_GRPC": "3031",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_ADDR": "10.0.176.12",
  "ETL_DAGSTER_TEST_1_PORT_3031_TCP_PROTO": "tcp",
  "DPF_NRT_CONNECT_SRVC_PORT_80_TCP_PROTO": "tcp",
  "DPF_DAGSTER_REPLICATION_1_PORT_3032_TCP_PROTO": "tcp",
  "K8S_EXAMPLE_USER_CODE_3_PORT_3030_TCP_PROTO": "tcp",
  "CDC_TOOL_INT_PORT": "<tcp://10.0.102.243:5001>",
  "DPF_DAGSTER_REPLICATION_1_PORT": "<tcp://10.0.217.5:3032>",
  "DPF_DAGSTER_RELEASE12_DAGIT_PORT": "<tcp://10.0.27.57:80>",
  "K8S_EXAMPLE_USER_CODE_2_SERVICE_HOST": "10.0.221.135",
  "ETL_DAGSTER_TEST_2_PORT_3031_TCP_ADDR": "10.0.182.10",
  "KUBERNETES_SERVICE_HOST": "10.0.0.1",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP": "<tcp://10.0.18.91:5432>",
  "DPF_DAGSTER_RELEASE12_POSTGRESQL_PORT_5432_TCP_PORT": "5432",
  "ETL_DAGSTER_TEST_1_PORT": "<tcp://10.0.124.61:3031>",
  "DPF_DAGSTER_REPLICATION_2_PORT_3032_TCP_PORT": "3032",
  "K8S_EXAMPLE_USER_CODE_3_SERVICE_PORT_GRPC": "3030",
  "HOME": "/root",
  "LC_CTYPE": "C.UTF-8"
}
and by the way this is working fine locally.
Copy code
def test():
    from databricks_cli.jobs.api import JobsApi
    from dagster_databricks import DatabricksClient
    databricks_client = DatabricksClient(host="<your-host>", oauth_client_id="<your-client-id>", oauth_client_secret="<your-client-secret>")
    databricks_client.workspace_client.jobs.list()
z
Bummer I was really hoping that'd fix it. So it feels like there's still something about your k8s environment that's causing issues... I never asked you to clarify, but does the error occur when your Dagster code is being loaded? Or when you go to execute a job that uses the DatabricksClientResource?
c
when I execute a job, I'm gonna try using DatabricksClient (as the test code) instead of using
@job(resource_defs={"databricks_client": databricks_client_instance})
z
Yeah that's a good idea. I'd also be curious to see if you're able to instantiate the WorkspaceClient directly using oauth from within your op in your k8s environment
c
Copy code
I tested this code:
Copy code
from dagster import Config, op, Failure, In, Nothing, DynamicOut, DynamicOutput
from databricks_cli.jobs.api import JobsApi
from dagster_databricks import DatabricksClient

class DbxJobConf(Config):
    job_name : str
import os
@op(ins={"start_after": In(Nothing)}) #, required_resource_keys={"databricks_client"}
def dbx_get_job_id(context, config: DbxJobConf):
    del os.environ['DATABRICKS_TOKEN']
    databricks_client = DatabricksClient(host="<host>", oauth_client_id="<service_principal>", oauth_client_secret="<secret>")
    <http://context.log.info|context.log.info>(f"environment: {os.environ}")
    # jobs_client = JobsApi(context.resources.databricks_client.api_client)
    jobs_client = JobsApi(databricks_client.api_client)
    jobs = jobs_client.list_jobs()
But still having the same problem, what I don't understand about this error in the trace I have folloging:
NameError: name 'spark' is not defined
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 50, in <module>
from .stub import *
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/stub.py", line 7, in <module>
sc = spark.sparkContext
The above exception occurred during handling of the following exception:
ModuleNotFoundError: No module named 'dbruntime'
File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 28, in <module>
from dbruntime import UserNamespaceInitializer
Which indicates a missing module but if you check the code of databricks-sdk all imports of dbruntime have a try/except but it seems that is not working.
z
Yeah it's confusing, if you dig into the part around that code it looks like it's trying a number of different authentication mechanisms, one of which seems to be meant for use when you're executing from within a Databricks cluster, where the
spark
session will already be available as a global variable, and seems like there's some
dbruntime
package they also include when running in a spark cluster
That stuff is only tangentially related to the error you're getting, there are more relevant errors higher up in the stack that indicate it's trying to use token access, potentially from the environment, instead of the oauth credentials. What are you passing into the required resources for
databricks_client
in this last test code you just showed?
c
Do you mean this?
#, required_resource_keys={"databricks_client"}
I just commented, my understanding is that is using what I'm defining in
@job(resource_defs={"databricks_client": databricks_client_instance})
which is basically this
Copy code
databricks_client_instance = databricks_client.configured(
    {
        "host": "<HOST>",
        "oauth_credentials": {"client_id":"<sp_id>", "client_secret":"<secret>"}
    }
)
z
Oh that's the thing that's failing in the first place though. So to do a better test I'd remove that as a required resource key for now
The error you just posted is almost certainly still from trying to initialize the
databricks_client
resource, so the
dbx_get_job_id
probably isn't actually getting into the op where you're manually trying to initialize the
DatabricksClient
Also, this would be a separate issue that would only occur after we get the DatabricksClient to initialize with oauth credentials, but when using oauth with the
DatabricksClient
you won't be able to use
databricks_client.api_client
, because that api client only uses token authentication as it uses a legacy databricks API. You'll need to switch to using
databricks_client.workspace_client
. But let's not concern ourselves with that part yet, we're still not getting the workspace client to initialize
c
"Oh that's the thing that's failing in the first place though. So to do a better test I'd remove that as a required resource key for now", do you mean this code?
Copy code
databricks_client_instance = databricks_client.configured(
    {
        "host": "<HOST>",
        "oauth_credentials": {"client_id":"<sp_id>", "client_secret":"<secret>"}
    }
)

OR 

@job(resource_defs={"databricks_client": databricks_client_instance})

This last code I have already commented

@job#(resource_defs={"databricks_client": databricks_client_instance})
def nrt_load_dynamic_graph():
    dbx_get_job_id()
z
blob facepalm sorry, I didn't see the comment there
I'm assuming you'll get the same error if you do a test op with instantiating the workspace client directly like this:
Copy code
from databricks.sdk import WorkspaceClient

@op
def dbx_get_job():
  client = WorkspaceClient(
            host="https://...",
            client_id="<your_client_id>",
            client_secret="<oauth_client_secret>",
        )
  client.jobs.list()
c
This is the complete error:
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "dbx_get_job_id":

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 474, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 157, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 93, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 203, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 172, in _yield_compute_results
    for event in iterate_with_context(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 444, in iterate_with_context
    with context_fn():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
ValueError: default auth: runtime: default auth: cannot configure default credentials. Config: host=<https://adb-4757284179629494.14.azuredatabricks.net>, client_id=8c87326a-535e-4b2f-a4ce-b4692628310c, client_secret=***

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 446, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 126, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 120, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/dpf_dagster_replication/dpf_dagster_replication/ops/dbx_jobs_admin.py", line 12, in dbx_get_job_id
    databricks_client = DatabricksClient(host="<https://adb-4757284179629494.14.azuredatabricks.net>", oauth_client_id="8c87326a-535e-4b2f-a4ce-b4692628310c", oauth_client_secret="UOK8Q~yMg5KXN_9hCapjOdI2PetFwrYdIy16Haeb")
  File "/usr/local/lib/python3.10/dist-packages/dagster_databricks/databricks.py", line 46, in __init__
    self._workspace_client = WorkspaceClient(
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/__init__.py", line 106, in __init__
    config = client.Config(host=host,
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 517, in __init__
    raise ValueError(message) from e

The above exception was caused by the following exception:
ValueError: default auth: runtime: default auth: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 512, in __init__
    self._init_auth()
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 822, in _init_auth
    raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e

The above exception was caused by the following exception:
ValueError: runtime: default auth: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 817, in _init_auth
    self._header_factory = self._credentials_provider(self)
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 434, in __call__
    raise ValueError(f'{auth_type}: {e}') from e

The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 428, in __call__
    header_factory = provider(cfg)
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 61, in wrapper
    return func(cfg)
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 94, in runtime_native_auth
    from databricks.sdk.runtime import init_runtime_native_auth
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 53, in <module>
    dbutils = RemoteDbUtils()
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/dbutils.py", line 169, in __init__
    self._config = Config() if not config else config
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 517, in __init__
    raise ValueError(message) from e

The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 512, in __init__
    self._init_auth()
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 822, in _init_auth
    raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e

The above exception was caused by the following exception:
ValueError: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 817, in _init_auth
    self._header_factory = self._credentials_provider(self)
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 435, in __call__
    raise ValueError('cannot configure default credentials')

The above exception occurred during handling of the following exception:
NameError: name 'spark' is not defined

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 50, in <module>
    from .stub import *
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/stub.py", line 7, in <module>
    sc = spark.sparkContext

The above exception occurred during handling of the following exception:
ModuleNotFoundError: No module named 'dbruntime'

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/runtime/__init__.py", line 28, in <module>
    from dbruntime import UserNamespaceInitializer
z
Although it'd be interesting to see if we can force the auth type like this:
Copy code
from databricks.sdk import WorkspaceClient

@op
def dbx_get_job():
  client = WorkspaceClient(
            host="https://...",
            client_id="<your_client_id>",
            client_secret="<oauth_client_secret>",
            auth_type="oauth-m2m"
        )
  client.jobs.list()
if that works, then I could add the auth_type argument to the WorkspaceClient instantiation within
DatabricksClient
to fix this
c
Ok I'll try
z
If that doesn't work it might be worth making an issue in the databricks-sdk github as we'll have pinned down that the issue is specific to the
WorkspaceClient
, and they should have more knowledge on how to debug this
c
We are digging in an internal Kubernetes configuration about MSI authentication that might be the reason. I have tested the code you've sent me anyways and I'm getting a different error.
Copy code
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "dbx_get_job_id":

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 273, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 474, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 157, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/execute_step.py", line 93, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 203, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn):
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute.py", line 172, in _yield_compute_results
    for event in iterate_with_context(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 444, in iterate_with_context
    with context_fn():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials. Config: host=<>, auth_type=oauth-m2m. Env: DATABRICKS_HOST

  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/dist-packages/dagster/_utils/__init__.py", line 446, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 126, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/usr/local/lib/python3.10/dist-packages/dagster/_core/execution/plan/compute_generator.py", line 120, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/dpf_dagster_replication/dpf_dagster_replication/ops/dbx_jobs_admin.py", line 39, in dbx_get_job_id
    client = WorkspaceClient(
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/__init__.py", line 106, in __init__
    config = client.Config(host=host,
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 517, in __init__
    raise ValueError(message) from e

The above exception was caused by the following exception:
ValueError: default auth: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 512, in __init__
    self._init_auth()
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 822, in _init_auth
    raise ValueError(f'{self._credentials_provider.auth_type()} auth: {e}') from e

The above exception was caused by the following exception:
ValueError: cannot configure default credentials

  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 817, in _init_auth
    self._header_factory = self._credentials_provider(self)
  File "/usr/local/lib/python3.10/dist-packages/databricks/sdk/core.py", line 435, in __call__
    raise ValueError('cannot configure default credentials')
@Zach after a couple of tries this code is working!. for client id and secret is working using either env vars or parameters.. 👍
Copy code
@op
def dbx_get_job_id(context, config: DbxJobConf):
    # os.environ["DATABRICKS_HOST"] = "<host>"
    os.environ["ARM_TENANT_ID"] = "<tenant_id>"
    # os.environ["ARM_CLIENT_ID"] = "<client_id>"
    # os.environ["ARM_CLIENT_SECRET"] = "<secret_id>"
    del os.environ['DATABRICKS_TOKEN']
    <http://context.log.info|context.log.info>(f"environment: {os.environ}")
    client = WorkspaceClient(
            host="<host>",
            azure_client_id="<client_id>",
            azure_client_secret="<secret_id>",
            auth_type="azure-client-secret"
        )
    jobs = client.jobs.list()
z
I wonder if it was an issue with using azure oauth credentials in place of the oauth-m2m credentials administered by Databricks. Seems like that would be a weird way to fail if that were the case though
I'll try to get azure oauth set up as an option for
dagster-databricks
before the next release
c
It would be useful to have auth_type parameter (optional) as part of the DatabricksClientResource.. in order to build the configuration like this:
Copy code
databricks_client_instance = databricks_client.configured(
     {
         "host": "<>",
         "oauth_credentials": {"client_id":"<>", "client_secret":"<>"},
         "auth_type": "<auth_type>"
     }
 )
Do you thing is doable?
z
I was thinking like this:
Copy code
databricks_client_instance = databricks_client.configured(
     {
         "host": "<>",
         "azure_oauth_credentials": {"client_id":"<>", "client_secret":"<>"},
     }
 )
and then having
auth_type
be inferred. Mainly because I feel it's a bit more explicit than having to set two different parameters to indicate that the oauth credentials being used are azure and need to be passed to a different underlying parameter when instantiating the WorkspaceClient. The
auth_type
would then be inferred and also added to the
WorkspaceClient
initialization call at runtime. Does that sound reasonable?
❤️ 1
Another possible option could be
Copy code
databricks_client_instance = databricks_client.configured(
     {
         "host": "<>",
         "oauth_credentials": {"client_id":"<>", "client_secret":"<>", "client_type": "DATABRICKS|AZURE"},
     }
 )
Although maybe we should make better support for environment / profile configuration of credentials. I'll have to think about that
Seems like for azure it's a requirement to set the ARM_TENANT_ID too, which makes me think that azure credentials should be handled separately from normal oauth credentials as they don't require that.
c
yes you are right!