has anyone ever tried running databricks-connect i...
# random
z
has anyone ever tried running databricks-connect in their user-code to run spark code within dagster ops on a remote databricks cluster?
h
I have. It works, but you lose some nice properties like expectations on the spark dataframes. I think it really depends on what your compute looks like if this solution makes sense - if you just need to do some filtering/aggregation in spark to produce a pandas (or similar) dataframe, and this dataframe is small enough to fit in memory on the dagster workers it works quite nicely.
For me the machines that dagster lives on are too small to do our data processing, so I ended up doing everything on databricks
z
hey thanks for your feedback. I'm in a similar boat - I don't plan on deploying dagster on spark clusters that are the size I'd need but would love to have at least some of the data processing logic surfaced in ops (even if it's running through databricks-connect) or even just be able to use /dbfs/ so I can do stuff like gather delta table metadata to report on as AssetMaterializations/Observations. Sounds like it's worth giving a shot
h
Yeah, both options are quite nice IMO. With the dagster-databricks integration you gain the possibility to chain ops that fully live within databricks (which makes working with larger datasets super nice, because you can just pass a spark dataframe around). With the databricks-connect flow your setup is a bit easier, and you don’t risk spinning up a 250GB ram cluster for a simple pandas operation 🙂
z
To clarify the dagster-databricks integration, are you using the
databricks_pyspark_step_launcher?
and this still doesn't let you pass DataFrames between ops does it?
h
Correct, I use the
databricks_pyspark_step_launcher
- together with an IO manager that persists dataframes as parquet files
it’s a bit of a leaky abstraction, because you lose some information in the process, but if you’re aware of this conversion between ops it’s not so bad
z
very cool, I'll have to try that out. I suppose you can't have different ops in the same job use different launchers can you? run launchers seem to be global to each user code deployment
h
oh no you can
Copy code
@op(
    required_resource_keys={
        "schedule",
        "databricks"
    }
)
def write_datalake(context, df):
    pass
for instance, this would run on databricks (given that the databricks resource is a configured run_launcher)
z
ahh wunderbar!
h
so my flow is: dagster k8s -> job k8s -> databricks job
dagster kicks off a kubernetes job, which basically zips itself and uploads the code to databricks, which runs the code, the k8s job then pulls that result from databricks and gives it back to the dagit UI
for me the biggest benefit is that I don’t need databricks for everything - sending a slack notification is 100% fine within k8s
however, keep in mind that if you plan to share data between databricks ops and k8s ops, that both need to be able to access it So an IO manager that stores data in dbfs doesn’t really work. S3 would.
z
yeah that's basically where I'm trying to get. my flow is starting to look very similar: dagster ECS -> job ECS -> databricks job, although we've been using a custom op that uses the jobs/run/now endpoint to launch predefined notebook jobs as my teammates are still pretty reliant on notebooks for getting logging info from the jobs (and most of our pipeline code is in notebooks as of right now 😿). it appears the databricks_pyspark_step_launcher tries to solve this by reading the logs from S3 after the run has completed - do you find that to be pretty reliable?
h
Getting the logs has been underwhelming in my experience. I typically end up with a generic “databricks error” when something went wront
Copy code
dagster_databricks.databricks.DatabricksError: Run 246292 failed with result state: DatabricksRunResultState.Failed. Message:
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/api.py", line 775, in pipeline_execution_iterator
    for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/executor/in_process.py", line 38, in execute
    yield from iter(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/api.py", line 869, in __iter__
    yield from self.iterator(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 86, in inner_plan_execution_iterator
    for step_event in check.generator(_dagster_event_sequence_for_step(step_context)):
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 321, in _dagster_event_sequence_for_step
    raise unexpected_exception
  File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 222, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.8/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 180, in launch_step
    yield from self.step_events_iterator(step_context, step_key, databricks_run_id)
  File "/usr/local/lib/python3.8/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 222, in step_events_iterator
    done = poll_run_state(
  File "/usr/local/lib/python3.8/site-packages/dagster_databricks/databricks.py", line 271, in poll_run_state
    raise DatabricksError(error_message)
^ this is an example of a failed job from today, so I really have to go to the databricks logs to figure out what went wrong
For the happy paths (stuff works as it should) it does work rather nicely, but I find I’m not too interested in that 😉
but it could be that I’m just missing a configuration option, and this is supported somehow