Gareth Brickman
02/12/2024, 2:37 PMcreate_databricks_run_now_op
?Scout
02/12/2024, 2:37 PMcreate_databricks_run_now_op
, you can pass a dictionary to the databricks_job_configuration
parameter when creating the op. This dictionary should contain the configuration for triggering a new job run of a Databricks Job, as described in Databricks’ Job API. Here is an example from the documentation on how to use create_databricks_run_now_op
with the necessary job parameters: python from dagster import job from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource DATABRICKS_JOB_ID = 1234 run_now_op = create_databricks_run_now_op( databricks_job_id=DATABRICKS_JOB_ID, databricks_job_configuration={ "python_params": [ "--input", "schema.db.input_table", "--output", "schema.db.output_table", ], }, ) @job( resource_defs={ "databricks": DatabricksClientResource( host=EnvVar("DATABRICKS_HOST"), token=EnvVar("DATABRICKS_TOKEN") ) } ) def do_stuff(): run_now_op()
In this example, databricks_job_configuration
is a dictionary that includes python_params
as a key, and a list of parameters to be passed to the Databricks job. You can include additional fields as required by your Databricks job configuration. The DATABRICKS_JOB_ID
is the ID of the Databricks Job to be executed, and you must replace 1234
with the actual job ID you intend to run. The DatabricksClientResource
is configured with the host and token for Databricks, which are read from environment variables DATABRICKS_HOST
and DATABRICKS_TOKEN
. Finally, the do_stuff
job is defined with the databricks
resource, and the run_now_op
is invoked within the job. Remember to replace the placeholders and example values with your actual Databricks job ID, host, token, and any other job-specific parameters you need to configure.Phuoc Nguyen
02/12/2024, 4:04 PMScout
02/12/2024, 4:04 PMdatabricks_job_configuration
parameter, you would typically define the configuration within your Dagster job and then pass it to the create_databricks_run_now_op
function. However, the create_databricks_run_now_op
function expects a static configuration dictionary at the time of op creation, and it's not designed to dynamically accept the run config from the Dagster job execution context.
If you need to pass dynamic parameters from the Dagster run config to the Databricks job configuration, you would have to create a custom op that takes the run config as input and then constructs the Databricks job configuration accordingly.
Here's a conceptual example of how you might approach this:
python
from dagster import job, op, graph, RunConfig
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource
# Define a custom op that accepts run config and constructs the Databricks job config
@op(required_resource_keys={"databricks"})
def custom_databricks_op(context, run_config):
# Construct the Databricks job configuration from the Dagster run config
databricks_job_config = {
"python_params": run_config.get("python_params", []),
# Add other Databricks job configuration parameters as needed
}
# Create a Databricks run now op with the constructed job configuration
run_now_op = create_databricks_run_now_op(
databricks_job_id=run_config["databricks_job_id"],
databricks_job_configuration=databricks_job_config,
)
# Execute the Databricks run now op
run_now_op(context)
# Define a graph that uses the custom op
@graph
def custom_databricks_graph(run_config):
custom_databricks_op(run_config)
# Convert the graph to a job with the necessary resources
custom_databricks_job = custom_databricks_graph.to_job(
resource_defs={
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
# Example run config that would be passed when executing the job
example_run_config = {
"ops": {
"custom_databricks_op": {
"config": {
"databricks_job_id": 1234,
"python_params": ["--input", "schema.db.input_table", "--output", "schema.db.output_table"],
}
}
}
}
# Execute the job with the example run config
if __name__ == "__main__":
result = custom_databricks_job.execute_in_process(run_config=RunConfig(example_run_config))
In this example, the custom_databricks_op
is defined to accept a run_config
parameter, which is a dictionary containing the necessary configuration for the Databricks job. The op then constructs the databricks_job_configuration
Scout
02/12/2024, 4:06 PMcreate_databricks_run_now_op
function with this configuration.
When executing the job, you pass the example_run_config
as the run_config
parameter to the execute_in_process
method. This run config should match the structure expected by the custom op.
Please note that this is a conceptual example and may require adjustments to fit the specifics of your use case and the version of Dagster you are using. Op jobs | Dagster button dagster_databricks - support arbitrary e... button Thank you for your feedback!