Hi all, I posted about this a while back - ive bee...
# ask-community
d
Hi all, I posted about this a while back - ive been picking this up on and off. I am trying to use Dagster to execute jobs on Data Bricks.
Copy code
@pipeline(mode_defs=[ModeDefinition(resource_defs={"databricks_client": databricks_client})])
def spark_job():
    data_bricks_job_solid = create_databricks_job_solid(name="daily_stock")
    data_bricks_job_solid()
This code works. How do I add config to databricks for the job? I tried:
Copy code
data_bricks_job_solid(
        {
            "name": "SparkPi Python job",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2
            },
            "spark_python_task": {
                "python_file": "dbfs:/docs/pi.py",
                "parameters": [
                    "10"
                ]
            }
        }
    )
But I get an exception
s
Hey Darren, as you've noticed, our documentation on this is poor right now, and the best thing to do is to look at the config schema defined in the implementation: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-databricks/dagster_databricks/solids.py#L38 Basically, you need to nest the config that you want to send to Databricks under a "job" entry. I filed an issue to document this better: https://github.com/dagster-io/dagster/issues/4322
d
@sandy thanks that’s great. It looks like you have to externalise the config for the job, it’s not passed in as an argument to the factor method. That’s right yeah?
For example as a YML file or through the playground
s
that's right. however, you don't need to pass it as a yaml file. you can invoke
.configured
on the returned solid to create a version of the solid that has that configuration included. docs reference: https://docs.dagster.io/concepts/configuration/configured
d
Ah thats great - i really did want to define the configuration as code over external config.
@sandy have I found another bug? In the docs it says I can do something like this:
Copy code
data_bricks_job_solid.configured(
    {"region": "us-east-1", "use_unsigned_session": False}
)
However when running via dagit I get the following exception:
Copy code
ypeError: configured() missing 1 required positional argument: 'name'
/Users/darren.haken/Library/Caches/pypoetry/virtualenvs/dagster-spike-GoqeGS_9-py3.8/lib/python3.8/site-packages/dagster/core/workspace/context.py:475: UserWarning: Error loading repository location hello_databricks.py:TypeError: configured() missing 1 required positional argument: 'name'
I had to do this to get it working:
Copy code
@pipeline(mode_defs=[ModeDefinition(resource_defs={"databricks_client": databricks_client})])
def spark_job():
    data_bricks_job_solid = create_databricks_job_solid(name="daily_stock")

    @configured(data_bricks_job_solid)
    def dev_s3(_):
        return {
            "job": {
                "name": "SparkPi Python job",
                "new_cluster": {
                    "spark_version": "7.3.x-scala2.12",
                    "node_type_id": "i3.xlarge",
                    "num_workers": 2
                },
                "spark_python_task": {
                    "python_file": "dbfs:/docs/pi.py",
                    "parameters": [
                        "10"
                    ]
                }
            }
        }

    data_bricks_job_solid()
the alternative was to define the
name
param and the
config_or_config_fn
param
@sandy sorry im still lost by this, can you help? The above code doesn’t work - when I look in the playground the “job” part of the config dict is just {} - empty. Can you share an example which uses the
configured
function? I could not find one online. I have tried the following but the job config is still empty:
Copy code
data_bricks_job_solid.configured(name="daily_stock", config_or_config_fn={
        "job": {
            "run_name": "my spark task",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "r3.xlarge",
                "aws_attributes": {
                    "availability": "ON_DEMAND"
                },
                "num_workers": 10
            },
            "notebook_task": {
                "notebook_path": "/Data Engineering/Dagster-Spike/dagster-spike",
                # "revision_timestamp": 1
            }
            # "libraries": [
            #     {
            #         "jar": "dbfs:/my-jar.jar"
            #     },
            #     {
            #         "maven": {
            #             "coordinates": "org.jsoup:jsoup:1.7.2"
            #         }
            #     }
            # ],
            # "spark_jar_task": {
            #     "main_class_name": "com.databricks.ComputeModels"
        }
    })
s
Hi Darren - that looks right to me. Have you tried launching a the job using this code? I would not expect the Playground to show the values that you passed to
config_or_config_fn
. Those are essentially "curried in" to the solid, and no longer part of the run configuration. However, they should still be passed to databricks when the solid is executed.
d
@sandy when I executed the above it did not work but when I added the values manually into the Playground I did find Data Bricks launched a job
to clarify - I used the Playground to launch it, not via the CLI. Should that work?
s
ah - one thing that maybe I missed.
x_solid.configured
returns a new solid, it doesn't mutate the original solid. so if you want to include it in a pipeline, you need to invoke the returned solid, not
x_solid
. e.g.
Copy code
@pipeline
def my_pipeline():
    configured_solid = x_solid.configured(...)
    configured_solid()
d
OK let me try that, 1 second
is this the idiomatic way to configure a solid with Dagster? im curious to know which way you’d normally recommend
@sandy now i know you need to execute on the returning function its worked
👍
ive noticed if i use the configure function as recommended the config is now labelled as
Any
. Is that expected?
@sandy and another one from me… Once I have executed a Solid (like Data Bricks) - how do I then run the next node in the DAG based on its success? Normally in Airflow we’d use sensors. A lot of the examples on the Dagster site are simple Python functions chained together. I will also want to answer the same question when executing Solids on Kubernetes.
s
Any
is expected. That's the default configuration for all solids without an explicitly-provided config schema. It's a little weird though. I filed an issue to change it: https://github.com/dagster-io/dagster/issues/4373.
The databricks solid will wait for the databricks job to complete, so any solids in the pipeline that depend on the databricks solids will run after the databricks job completes
d
@sandy thanks! Is there something of a best practice i should follow for defining the config schema or is that not the case given its a third party solid? re the job completion - if an early databricks job failed would it cause the pipeline to stop? Kubernetes - are there any docs around using K8 both for workers and for executing solids? I saw blog posts which led me to believe this is an area of investment for the project. We currently use Airflow and K8 pods to execute nodes. This has worked well for encapsulating logic - teams are able to provide a container runtime for their custom job.
And sorry for asking for so much help. I hope you can appreciate its from a place of interest for the project
s
@sandy thanks! Is there something of a best practice i should follow for defining the config schema or is that not the case given its a third party solid?
we wouldn't expect someone to define the config schema in this case
Kubernetes - are there any docs around using K8 both for workers and for executing solids?
it would be best to ask about this in a separate top-level thread. I'm not an expert on that part of the system
re the job completion - if an early databricks job failed would it cause the pipeline to stop?
it would cause the pipeline to stop
d
it would be best to ask about this in a separate top-level thread. I’m not an expert on that part of the system
will do
re the job - is there a way to validate the job before continuing to the next step? With Airflow we’d have a sensor to query the DataBricks API and validate the job succeeded
s
if the databricks run fails, the databricks solid will raise an error, and the pipeline won't proceed. is that the desired behavior for you?
d
That's probably good enough to be honest.
s
@Darren Haken - FYI, I added the code you posted to the documentation for `create_databricks_job_solid`: https://github.com/dagster-io/dagster/pull/4569/files
d
@sandy that’s great! always great to see progress on the documentation - often a forgotten part of a project
s
@sandy @Darren Haken Stderr link in Dagster UI doesnt shows the actual error from Databricks. Do we need to configure in Dagster solid to capture the exact error message from Databricks
m
@Darren Haken - any chance you got your pipeline/job configured. Would appreciate if you had a working code snippet.
After spending some time debugging I found an error in the documentation for create_databricks_job_op I have it kicking off and running databricks jobs now on new and pooled clusters.