Kaushik Tadikonda
02/22/2024, 5:37 PMstart_cluster
operation an asset and defining dependencies using deps
in downstream pyspark based assets be the correct way to achieve this? Relevant code in 🧵Kaushik Tadikonda
02/22/2024, 5:39 PMstart_cluster
"asset"
@asset(
required_resource_keys={"databricks_config", "databricks_client"},
description="Starts a Databricks cluster and returns its ID",
compute_kind="databricks"
)
def start_cluster(context: AssetExecutionContext) -> list[str]:
client = context.resources.databricks_client.workspace_client
cluster_id = context.resources.databricks_config.cluster_id
<http://context.log.info|context.log.info>(f"Starting Cluster {cluster_id}")
try:
client.clusters.ensure_cluster_is_running(cluster_id)
except Exception as e:
context.log.error(f"Failed to start cluster {cluster_id}")
raise e
context.add_output_metadata(metadata={"cluster_id": cluster_id, "running": True})
return [cluster_id]
example downstream asset
@asset(deps=[start_cluster])
def pyspark_asset(context: AssetExecutionContext) -> None:
...
Zach
02/22/2024, 7:02 PMKaushik Tadikonda
02/22/2024, 7:29 PMclass DatabricksIOManager(ConfigurableIOManager):
"""
Reads and writes dataframes as a table in the Unity Catalog.
Supports standard Spark DataFrame types.
"""
databricks_connect: DatabricksConnectResource
def load_input(self, context: InputContext) -> DataFrame:
"""
Loads a dataframe from a table in the Unity Catalog.
"""
metadata = context.upstream_output.metadata if context.upstream_output else None
metadata = metadata or {}
context.log.info(f"Loading metadata: {metadata}")
self.validate_metadata(metadata)
table_name = self._get_table_name(metadata)
spark = self.databricks_connect.spark_session()
return spark.read.table(table_name)
...
Zach
02/22/2024, 7:59 PMZach
02/22/2024, 8:00 PMKaushik Tadikonda
02/23/2024, 4:16 PMyou might run into some weird race conditions when you have multiple assets running in parallel that want to start up the same cluster, but that would happen if you executed the logic from each asset as wellah! this logic will ensure cluster is running using databricks sdk and I think we should be safe when running multiple assets in parallel.
How's databricks connect working out for you? I'm assuming this is spark 3.5?Yes! Has worked pretty well so far. I felt it was easier to setup than using the
dagster-databricks
library. One quirk I noticed was UDFs require to be defined within the asset function scope or else databricks connect / pyspark throw some weird serialization errors.Zach
02/23/2024, 4:58 PMclient.clusters.ensure_cluster_is_running(cluster_id)
at the same time and both try to start the cluster, which could result in some exception if they both see that the cluster isn't running and then both try to start it at the same time. Unless that API operation is idempotent (I don't know if that's the case)