Darren Haken
07/13/2021, 12:23 PM@pipeline(mode_defs=[ModeDefinition(resource_defs={"databricks_client": databricks_client})])
def spark_job():
data_bricks_job_solid = create_databricks_job_solid(name="daily_stock")
data_bricks_job_solid()
This code works.
How do I add config to databricks for the job?
I tried:
data_bricks_job_solid(
{
"name": "SparkPi Python job",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"spark_python_task": {
"python_file": "dbfs:/docs/pi.py",
"parameters": [
"10"
]
}
}
)
But I get an exceptionsandy
07/13/2021, 3:07 PMDarren Haken
07/13/2021, 6:14 PMDarren Haken
07/13/2021, 6:14 PMsandy
07/13/2021, 8:23 PM.configured
on the returned solid to create a version of the solid that has that configuration included.
docs reference: https://docs.dagster.io/concepts/configuration/configuredDarren Haken
07/14/2021, 10:18 AMDarren Haken
07/14/2021, 10:42 AMdata_bricks_job_solid.configured(
{"region": "us-east-1", "use_unsigned_session": False}
)
However when running via dagit I get the following exception:
ypeError: configured() missing 1 required positional argument: 'name'
/Users/darren.haken/Library/Caches/pypoetry/virtualenvs/dagster-spike-GoqeGS_9-py3.8/lib/python3.8/site-packages/dagster/core/workspace/context.py:475: UserWarning: Error loading repository location hello_databricks.py:TypeError: configured() missing 1 required positional argument: 'name'
Darren Haken
07/14/2021, 2:40 PM@pipeline(mode_defs=[ModeDefinition(resource_defs={"databricks_client": databricks_client})])
def spark_job():
data_bricks_job_solid = create_databricks_job_solid(name="daily_stock")
@configured(data_bricks_job_solid)
def dev_s3(_):
return {
"job": {
"name": "SparkPi Python job",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"spark_python_task": {
"python_file": "dbfs:/docs/pi.py",
"parameters": [
"10"
]
}
}
}
data_bricks_job_solid()
Darren Haken
07/14/2021, 2:41 PMname
param and the config_or_config_fn
paramDarren Haken
07/17/2021, 1:04 PMconfigured
function? I could not find one online.
I have tried the following but the job config is still empty:
data_bricks_job_solid.configured(name="daily_stock", config_or_config_fn={
"job": {
"run_name": "my spark task",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND"
},
"num_workers": 10
},
"notebook_task": {
"notebook_path": "/Data Engineering/Dagster-Spike/dagster-spike",
# "revision_timestamp": 1
}
# "libraries": [
# {
# "jar": "dbfs:/my-jar.jar"
# },
# {
# "maven": {
# "coordinates": "org.jsoup:jsoup:1.7.2"
# }
# }
# ],
# "spark_jar_task": {
# "main_class_name": "com.databricks.ComputeModels"
}
})
sandy
07/18/2021, 4:20 PMconfig_or_config_fn
. Those are essentially "curried in" to the solid, and no longer part of the run configuration. However, they should still be passed to databricks when the solid is executed.Darren Haken
07/18/2021, 4:31 PMDarren Haken
07/18/2021, 4:31 PMsandy
07/18/2021, 4:33 PMx_solid.configured
returns a new solid, it doesn't mutate the original solid. so if you want to include it in a pipeline, you need to invoke the returned solid, not x_solid
. e.g.
@pipeline
def my_pipeline():
configured_solid = x_solid.configured(...)
configured_solid()
Darren Haken
07/18/2021, 4:34 PMDarren Haken
07/18/2021, 4:34 PMDarren Haken
07/18/2021, 4:36 PMDarren Haken
07/18/2021, 4:36 PMDarren Haken
07/18/2021, 4:37 PMAny
. Is that expected?Darren Haken
07/19/2021, 11:03 AMsandy
07/19/2021, 4:53 PMAny
is expected. That's the default configuration for all solids without an explicitly-provided config schema. It's a little weird though. I filed an issue to change it: https://github.com/dagster-io/dagster/issues/4373.sandy
07/19/2021, 4:57 PMDarren Haken
07/19/2021, 6:29 PMDarren Haken
07/19/2021, 6:29 PMsandy
07/19/2021, 8:42 PM@sandy thanks! Is there something of a best practice i should follow for defining the config schema or is that not the case given its a third party solid?we wouldn't expect someone to define the config schema in this case
sandy
07/19/2021, 8:42 PMKubernetes - are there any docs around using K8 both for workers and for executing solids?it would be best to ask about this in a separate top-level thread. I'm not an expert on that part of the system
sandy
07/19/2021, 8:43 PMre the job completion - if an early databricks job failed would it cause the pipeline to stop?it would cause the pipeline to stop
Darren Haken
07/20/2021, 7:59 AMit would be best to ask about this in a separate top-level thread. I’m not an expert on that part of the systemwill do
Darren Haken
07/20/2021, 8:00 AMsandy
07/20/2021, 3:20 PMDarren Haken
07/20/2021, 4:48 PMsandy
08/18/2021, 4:34 PMDarren Haken
08/18/2021, 4:59 PMSandeep Mankikar
10/08/2021, 9:25 PMMarcus Simonsen
11/03/2021, 7:40 PMMarcus Simonsen
11/05/2021, 4:20 PM