https://dagster.io/ logo
#ask-ai
Title
# ask-ai
g

Gareth Brickman

02/12/2024, 2:37 PM
How do I use Dagster’s Run Configuration to set the databricks’ job parameters for an op created from
create_databricks_run_now_op
?
s

Scout

02/12/2024, 2:37 PM
To set the Databricks job parameters for an op created from
create_databricks_run_now_op
, you can pass a dictionary to the
databricks_job_configuration
parameter when creating the op. This dictionary should contain the configuration for triggering a new job run of a Databricks Job, as described in Databricks’ Job API. Here is an example from the documentation on how to use
create_databricks_run_now_op
with the necessary job parameters:
Copy code
python from dagster import job from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource  DATABRICKS_JOB_ID = 1234  run_now_op = create_databricks_run_now_op(     databricks_job_id=DATABRICKS_JOB_ID,     databricks_job_configuration={         "python_params": [             "--input",             "schema.db.input_table",             "--output",             "schema.db.output_table",         ],     }, )  @job(     resource_defs={         "databricks": DatabricksClientResource(             host=EnvVar("DATABRICKS_HOST"),             token=EnvVar("DATABRICKS_TOKEN")         )     } ) def do_stuff():     run_now_op()
In this example,
databricks_job_configuration
is a dictionary that includes
python_params
as a key, and a list of parameters to be passed to the Databricks job. You can include additional fields as required by your Databricks job configuration. The
DATABRICKS_JOB_ID
is the ID of the Databricks Job to be executed, and you must replace
1234
with the actual job ID you intend to run. The
DatabricksClientResource
is configured with the host and token for Databricks, which are read from environment variables
DATABRICKS_HOST
and
DATABRICKS_TOKEN
. Finally, the
do_stuff
job is defined with the
databricks
resource, and the
run_now_op
is invoked within the job. Remember to replace the placeholders and example values with your actual Databricks job ID, host, token, and any other job-specific parameters you need to configure.
p

Phuoc Nguyen

02/12/2024, 4:04 PM
@Scout I want to pass the run config of dagster to databricks_job_configuration
s

Scout

02/12/2024, 4:04 PM
To pass the run config of Dagster to the
databricks_job_configuration
parameter, you would typically define the configuration within your Dagster job and then pass it to the
create_databricks_run_now_op
function. However, the
create_databricks_run_now_op
function expects a static configuration dictionary at the time of op creation, and it's not designed to dynamically accept the run config from the Dagster job execution context. If you need to pass dynamic parameters from the Dagster run config to the Databricks job configuration, you would have to create a custom op that takes the run config as input and then constructs the Databricks job configuration accordingly. Here's a conceptual example of how you might approach this:
Copy code
python
from dagster import job, op, graph, RunConfig
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource

# Define a custom op that accepts run config and constructs the Databricks job config
@op(required_resource_keys={"databricks"})
def custom_databricks_op(context, run_config):
    # Construct the Databricks job configuration from the Dagster run config
    databricks_job_config = {
        "python_params": run_config.get("python_params", []),
        # Add other Databricks job configuration parameters as needed
    }
    
    # Create a Databricks run now op with the constructed job configuration
    run_now_op = create_databricks_run_now_op(
        databricks_job_id=run_config["databricks_job_id"],
        databricks_job_configuration=databricks_job_config,
    )
    
    # Execute the Databricks run now op
    run_now_op(context)

# Define a graph that uses the custom op
@graph
def custom_databricks_graph(run_config):
    custom_databricks_op(run_config)

# Convert the graph to a job with the necessary resources
custom_databricks_job = custom_databricks_graph.to_job(
    resource_defs={
        "databricks": DatabricksClientResource(
            host=EnvVar("DATABRICKS_HOST"),
            token=EnvVar("DATABRICKS_TOKEN")
        )
    }
)

# Example run config that would be passed when executing the job
example_run_config = {
    "ops": {
        "custom_databricks_op": {
            "config": {
                "databricks_job_id": 1234,
                "python_params": ["--input", "schema.db.input_table", "--output", "schema.db.output_table"],
            }
        }
    }
}

# Execute the job with the example run config
if __name__ == "__main__":
    result = custom_databricks_job.execute_in_process(run_config=RunConfig(example_run_config))
In this example, the
custom_databricks_op
is defined to accept a
run_config
parameter, which is a dictionary containing the necessary configuration for the Databricks job. The op then constructs the
databricks_job_configuration
and calls the
create_databricks_run_now_op
function with this configuration. When executing the job, you pass the
example_run_config
as the
run_config
parameter to the
execute_in_process
method. This run config should match the structure expected by the custom op. Please note that this is a conceptual example and may require adjustments to fit the specifics of your use case and the version of Dagster you are using. Op jobs | Dagster button dagster_databricks - support arbitrary e... button Thank you for your feedback!
5 Views