Hi team, I am trying to get the databricks_pyspar...
# ask-community
j
Hi team, I am trying to get the databricks_pyspark_step_launcher working with a simple pyspark job (which executes successfully using the emr_pyspark_step_launcher) and I am running into some issues. I have been having a lot of trouble since there is no working example of the Databricks step launcher available in the dagster documentation, as opposed to the EMR step launcher. I notice that dagster is able to upload the main file, dagster file, step run ref file, and Databricks configuration to our mounted dbfs:/ path on s3, and I can confirm those files were uploaded to our bucket. However, dagster is looking for a stdout and stderr file that are not present, and it doesn’t seem there is any code in the databricks_pyspark_step_launcher.py (or databricks.py) modules that is responsible for uploading these files, even if they are supposed to be blank at first. This code is located in databricks_step_main.py, which I have attached a screenshot of to this message. This leads me to believe that the main step module isn’t running first, and these other modules in the package try to read these files before they are created. Attached to this message is an example of our resource configuration, a screenshot of the error with statements showing the other files were uploaded, and also a screenshot of the file location in s3 that the error message is displaying. As you can see, all other necessary files were uploaded successfully. I would greatly appreciate any help anyone can provide, as I am currently stuck with this issue, and believe this is an issue with the dagster-databricks package. { "pyspark_step_launcher": databricks_pyspark_step_launcher.configured( { "run_config": { "run_name": "test dagster", "cluster": { "existing": "********************" }, "libraries": [], }, "databricks_host": "********************", "databricks_token": "********************", "local_dagster_job_package_path": str(Path(file).parent.parent), "secrets_to_env_variables": [], "wait_for_logs": False, "storage": { "s3": { "secret_scope": "************", "access_key_key": "***********", "secret_key_key": "******************", } }, }, ), "pyspark": pyspark_resource.configured( { "spark_conf": { "spark.executor.memory": "2g", "spark.databricks.driver.strace.enabled":True, "spark.sql.broadcastTimeout": '300000ms' } } ), "db_obj": metadata_resource, "dw_obj": warehouse_resource_selector(), "db_query_obj": metadata_query_resource, "s3": s3_resource.configured({ "region_name": "us-east-1" }), "io_manager": parquet_io_manager.configured( { "path_prefix": "s3://***********/dagster/" } ), }
👍 1
Our Databricks cluster logging setting has our cluster logs going to dbfs:/cluster-logs/<cluster-id>, and under this directory we can read stdout and stderr files, but there is no way to set this directory in our job resource config as the logging location
z
what version are you on?
j
We are currently on dagster 1.0.9
z
hmm okay. my team is still on 0.15.8 and we use the databricks step launcher heavily, but I know there have been some changes to the step launcher in the 0.16.x/0.17.x versions. I feel like I've seen this come up before in dagster-support and that a fix was released to address it.
is it possible for you to try to reproduce the issue in a newer version?
j
Yes we will try to update to a later version tomorrow
o
hi @Jim Nisivoccia! I believe Zach is correct here (and that a version bump should resolve the issue) When this has come up in the past, it's been more of a "masks the real exception" type of problem rather than a "makes the operation fail when it otherwise would have succeeded" type of thing. I'm hopeful that the update will resolve the issue but happy to dig in deeper if it does not.
j
Hi Owen, we recently tried pulling the latest version (dagster 1.2.1) and we are still running into some issues. Our resource config is the same as what is defined above, but now it seems like the step launcher isn't able to properly handle the host name we are passing in. We are really struggling to get this working without a working example, would you guys happen to have some example we could use to build off of? We were able to properly configure the EMR_pyspark_step_launcher using the example provided in the dagster documentation
👍 1
databricks_step_launcher_error_3_13.jpg
o
hi @Jim Nisivoccia -- I haven't seen that error before, it does seem to be some issue with the hostname but it's hard to say for sure. For now, here's some config that I use myself:
Copy code
databricks_pyspark_step_launcher.configured(
    {
        "run_config": {
            "run_name": "test_dagster",
            "cluster": {"existing": "1111-111111-1111aaa1"},
            "libraries": [
                {"pypi": {"package": "dagster==1.2.1"}},
                {"pypi": {"package": "dagster-pyspark==0.18.1"}},
                {"pypi": {"package": "dagster-aws==0.18.1"}},
                {"pypi": {"package": "dagster-databricks==0.18.1"}},
            ],
        },
        "databricks_host": "<http://dbc-1111111-1111.cloud.databricks.com|dbc-1111111-1111.cloud.databricks.com>",
        "databricks_token": "dapi1111111111111111111111111",
        "local_pipeline_package_path": str(Path(__file__).parent),
        "secrets_to_env_variables": [],
        "storage": {
            "s3": {
                "access_key_key": "access_key_key",
                "secret_key_key": "secret_key_key",
                "secret_scope": "secret-scope",
            }
        },
    }
)
it looks mostly identical to what you shared, although it's possible that some of the values that were elided above might not be in the expected format
looking into what might be causing that error message
s
yes i added all the packages in the resource config and still giving the same error
o
and what is the format of your
databricks_host
? also cc @rex if you've seen this before
r
This error is because
databricks_host
needs to start with `https://`: we switched to using the official Databricks Python client, which requires this format
s
Thanks @rex.
https://
fixed the request issue in
Uploading main file to DBFS
step. However, we are again getting the same error (attached) in
Waiting for Databricks run 20643 to complete...
step. Thank you
not sure if this could be a good hint Databricks logs is showing this error
[0;31mUnboundLocalError[0m: local variable 'stderr_filepath' referenced before assignment
i think the file is
databricks_step_main.py
@Zach Hi Zach, have you seen the above error before?
z
I haven't. I'm having trouble seeing how that error could trigger in
databricks_step_main.py
either when I look at the latest version on github - https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_step_main.py#L112-L144 any chance that the
/dagster_staging
prefix still needs to be created? I believe the directory needs to be accessible via /dbfs, so if you can't access it from within a notebook in Databricks then you won't be able to access it using Dagster. Have you tried pointing the
staging_prefix
to a directory you've already mounted with /dbfs?
s
Thanks for the reply Zach @Zach. Yes i first tried with
s3://
then it gave an error it needs to be absolute path. then tried with
/bucket-name/....
, it failed again with the same error. Yes correct, dbfs is mounted on s3 in our case. How about your case? are you using S3 or Azure? Thats a good point you noted, i am going to next try with the inbuilt Databricks notebook. If it fails on Notebook, then we will have to look into the AWS policies. but another point though is that Databricks is able to create
/cluster-logs/..
on S3 (dbfs:/ mounted on S3) while creating a new cluster that means it should have access to dbfs:/. Thanks once again.
z
I'm using S3. are you using Unity Catalog? I haven't really messed with it much but it seems possible that some of the changes in how external tables are handled with Unity Catalog may be causing issues. I would consider making a specific mount for dbfs:///dagster_staging/ to somewhere in S3, it's possible that writes to dbfs:///dagster_staging/ are ending up in the Databricks-managed buckets and it seems that there were some changes with Unity Catalog that make it a lot harder or impossible to access the databricks-managed buckets. One way to check would be looking to see if there's a
/dagster_staging
prefix where you'd expect it to be in S3 - it's obviously getting uploaded somewhere as the step launcher is getting to the point that it launches the job, but then when Databricks is trying to find the entrypoint file it doesn't seem to be able to, which says to me that the location the step launcher is uploading the step resources to is inaccessible from within your Databricks account
s
Hi Zach No we are not using Unity Catalog at this point. To answer the first question, yes Databricks Notebook is able to access the dbfs:/ mount on s3 (attached pic) Yes,
/dagster_staging
is present on s3 but
/stdout.
file is not there (attached) Is Databricks supposed to create this new file
stdout
on dbfs and write its contents or is Databricks supposed to only update the existing file, which is previously created by Dagster? Thanks for your help
z
can you confirm you're on
dagster-databricks==0.18.1
? I believe the error you're encountering is happening around [here](https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py#L276-L279) as the exception error message matches, but it should be trapping the exception and continuing without failing the job
do the databricks jobs ever get anywhere or do they just get canceled as soon as you see this error messsage?
s
yes we are using
dagster-databricks==0.18.1
on both our local and Databricks cluster
do the databricks jobs ever get anywhere or do they just get canceled as soon as you see this error messsage? we are not 100% sure, but we think the error kills the job
z
I think the job run should show as "Canceled" instead of "Failed". I guess I'm just curious because I believe that the error you're seeing logged is occurring within the Dagster worker and not on Databricks. the thing is that when that error gets logged, it doesn't look like from the code that it would cancel your job in that exception condition - it actually occurs after the job has already failed or completed. so is it possible that the databricks job is failing from a separate issue on that end? have you been able to look at the driver logs or the compute logs and seen anything? does it ever even get to a running state or does it die when it's still spinning up? how long does Dagster poll the job before you get this message?
s
Hi Zach, we are updating some policies so the clusters are down. we will update the log information once the cluster is up and running. However, yesterday this is what i saw in the logs
[0;31mUnboundLocalError[0m: local variable 'stderr_filepath' referenced before assignment
i guess, all your other questions are answered in the attached pic. Thanks
z
were those logs the driver logs or the cluster compute logs that get streamed to S3? for some reasons I've found the ones in S3 to be more complete sometimes. a stack trace would be helpful if you could find one. it feels like the exception must be happening here, then that error you're seeing in the logs would be from this accessing that variable before it was assigned (due to the exception) and that we're actually not seeing the root cause of the failure, which seems like it should be getting printed somewhere due to this call
s
Hi Zach Thanks for your questions and input. i think we kind of know what the issue is. we will test it out tomorrow and let you know if we are still stuck. Thank you
z
cool, I hope it works out! I'd be curious as to what your solution is if you're able to share it.
s
We got it working, thanks to Zach and Owen’s working example, and that the issue was we didn’t set up our secrets in the secret scope properly so dagster couldn’t reach out to s3. Thanks all
🎉 1
🎉 1
k
Part of the problem was that we were passing the actual key value, not the key name from the secret scope. In the future if the documentation had some sort of instructions regarding setting up the secret scope and using the key names, that would be helpful.
r
@Sai Gopinath, hi. Please help me to figure out what exactly I do wrong? I have the same problem:
Copy code
Run `49361868` failed with result state: `FAILED`. Message: .
13:03:53.853
test
ERROR
Encountered exception 404 Client Error: Not Found for url: <https://HOST/api/2.0/dbfs/read?path=%2Fdagster_local%2Ff046fea8-92e9-458e-940c-0bae6c6df82e%2Ftest%2Fstdout&length=1048576>
 Response from server: 
 { 'error_code': 'RESOURCE_DOES_NOT_EXIST',
  'message': 'No file or directory exists on path '
             '/dagster_local/f046fea8-92e9-458e-940c-0bae6c6df82e/test/stdout.'} when attempting to load stdout logs for step test. Check the databricks console for more info.
13:03:54.455
test
ERROR
Encountered exception 404 Client Error: Not Found for url: <https://HOST/api/2.0/dbfs/read?path=%2Fdagster_local%2Ff046fea8-92e9-458e-940c-0bae6c6df82e%2Ftest%2Fstderr&length=1048576>
 Response from server: 
 { 'error_code': 'RESOURCE_DOES_NOT_EXIST',
  'message': 'No file or directory exists on path '
             '/dagster_local/f046fea8-92e9-458e-940c-0bae6c6df82e/test/stderr.'} when attempting to load stderr logs for step test. Check the databricks console for more info.
It is what is going on when try to run some JOB in local stage, but everything works on remote side. My Databricks resource config:
Copy code
"local_dagster_job_package_path": str(Path(__file__).parent.parent.parent.parent),
"staging_prefix": "/dagster_local",
"databricks_host": "<https://HOST>",
"databricks_token": {"env": "DATABRICKS_TOKEN"},
"run_config": {
   "run_name": "test_job",
   "cluster": {"existing": "CLUSTER"},
},
"secrets_to_env_variables": [
   {
      "scope": "dagster",
      "name": "DATABRICKS_TOKEN",
      "key": "DATABRICKS_TOKEN",
   },
"storage": {
   "s3": {
      "secret_scope": "global",
      "access_key_key": "aws-access-key",
      "secret_key_key": "aws-secret-key",
    }
}
r
hi, i'm also still getting this error, would love to know how this was solved, @Roman Maliushkin if by chance you got through this error
Hey zach, thanks for replying, I did create a scope and added the secrets in there but still having the same error:
Copy code
databricks secrets list-secrets test_cluster
[
  {
    "key":"s3_access_key_key",
    "last_updated_timestamp":1688610889511
  },
  {
    "key":"s3_secret_key_key",
    "last_updated_timestamp":1688610951504
  }
]
and my resources defined as:
Copy code
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
            {
                "databricks_host": "<https://dbc-xxxxxx.cloud.databricks.com>",
                "databricks_token": "dapixxxxxxxxx",
                "run_config": {
                    "run_name": "dagster_run",
                    "cluster": {
                        "existing": "existing cluster"
                    },
                    "libraries": [
                        {"pypi": {"package": "dagster==1.2.1"}},
                        {"pypi": {"package": "dagster-pyspark==0.18.1"}},
                        {"pypi": {"package": "dagster-aws==0.18.1"}},
                        {"pypi": {"package": "dagster-databricks==0.18.1"}},
                    ],
                },
                "secrets_to_env_variables": [],
                "local_pipeline_package_path": str(Path(__file__).parent.parent),
                "storage": {
                    "s3": {
                        "secret_scope": "test_cluster",
                        "access_key_key": "s3_access_key_key",
                        "secret_key_key": "s3_secret_key_key"
                    }
                }
            }
        )
I think I did that right
fwiw, i'm getting the same error as @Sai Gopinath
Copy code
[0;31mUnboundLocalError[0m: local variable 'stderr_filepath' referenced before assignment
I was able to find another clue in the logs @Zach,
Copy code
[0;31mFileNotFoundError[0m: [Errno 2] No such file or directory: '/dbfs//dagster_staging/a54d6b86-1909-4237-86df-fceec64b4601/databricks_test_table/code.zip'
I'm wondering if the
//
after dbfs means it's looking in the wrong location
ahh, I figured it out. It turns out that I was using a shared cluster in Databricks and that doesn't have access to the root dbfs: https://docs.databricks.com/dbfs/unity-catalog.html
🎉 1