Hi all! I'm new to dagster and would like some gui...
# ask-community
k
Hi all! I'm new to dagster and would like some guidance! I have some pyspark based assets that need to communicate with a databricks cluster. Before these assets get materialized, I need to ensure my databricks cluster is started or else they will fail to materialize. I like the declarative framework of using assets to define dependencies. Would making
start_cluster
operation an asset and defining dependencies using
deps
in downstream pyspark based assets be the correct way to achieve this? Relevant code in 🧵
start_cluster
"asset"
Copy code
@asset(
    required_resource_keys={"databricks_config", "databricks_client"},
    description="Starts a Databricks cluster and returns its ID",
    compute_kind="databricks"
)
def start_cluster(context: AssetExecutionContext) -> list[str]:
    client = context.resources.databricks_client.workspace_client
    cluster_id = context.resources.databricks_config.cluster_id

    <http://context.log.info|context.log.info>(f"Starting Cluster {cluster_id}")
    try:
        client.clusters.ensure_cluster_is_running(cluster_id)
    except Exception as e:
        context.log.error(f"Failed to start cluster {cluster_id}")
        raise e

    context.add_output_metadata(metadata={"cluster_id": cluster_id, "running": True})

    return [cluster_id]
example downstream asset
Copy code
@asset(deps=[start_cluster])
def pyspark_asset(context: AssetExecutionContext) -> None:
...
z
This feels like a bit of an abuse of the asset concept, and will require that you always materialize this asset before materializing any assets that need the cluster. I'd just put the code to start the Databricks cluster in a basic python function that gets checks if the cluster is already running and creates it if not, which just gets called at the beginning of any assets that require the Databricks cluster.
k
I agree. It definitely felt weird when writing. I have an IO manager for databricks pyspark based assets. Do you think moving this logic there makes sense? For context
Copy code
class DatabricksIOManager(ConfigurableIOManager):
    """
    Reads and writes dataframes as a table in the Unity Catalog.
    Supports standard Spark DataFrame types.
    """
    databricks_connect: DatabricksConnectResource

    def load_input(self, context: InputContext) -> DataFrame:
        """
        Loads a dataframe from a table in the Unity Catalog.
        """
        metadata = context.upstream_output.metadata if context.upstream_output else None
        metadata = metadata or {}
        context.log.info(f"Loading metadata: {metadata}")
        self.validate_metadata(metadata)
        table_name = self._get_table_name(metadata)

        spark = self.databricks_connect.spark_session()
        return spark.read.table(table_name)

...
z
Seems like it could work, you might run into some weird race conditions when you have multiple assets running in parallel that want to start up the same cluster, but that would happen if you executed the logic from each asset as well. Seems like it'd be worth a shot
How's databricks connect working out for you? I'm assuming this is spark 3.5? I've used the old one and the experience wasn't super great, but I'm eager to try the new version once I can get my stuff upgraded to spark 3.5
k
you might run into some weird race conditions when you have multiple assets running in parallel that want to start up the same cluster, but that would happen if you executed the logic from each asset as well
ah! this logic will ensure cluster is running using databricks sdk and I think we should be safe when running multiple assets in parallel.
How's databricks connect working out for you? I'm assuming this is spark 3.5?
Yes! Has worked pretty well so far. I felt it was easier to setup than using the
dagster-databricks
library. One quirk I noticed was UDFs require to be defined within the asset function scope or else databricks connect / pyspark throw some weird serialization errors.
z
It's a bit minor but I believe there's a race condition in that two assets might check
client.clusters.ensure_cluster_is_running(cluster_id)
at the same time and both try to start the cluster, which could result in some exception if they both see that the cluster isn't running and then both try to start it at the same time. Unless that API operation is idempotent (I don't know if that's the case)
👍 1