Hi everyone, new to Dagster so still learning a lo...
# ask-community
d
Hi everyone, new to Dagster so still learning a lot. Has anyone here worked with Delta Lake and Spark Structured Streaming in Dagster? Essentially I have pipelines that process S3 file resources using Spark structured streaming, then transform them in bronze, silver, gold, Delta tables. The streams process more on the second-minute interval in microbatches, and some only run a few times a day and shut off. Before I start writing my own IO managers, does anyone have any resources they could point me to for Structured Streaming and Delta? Thanks!
dagster bot responded by community 1
z
streaming isn't well supported in Dagster... I don't think spark structured streaming would work very well because it requires a persistent process to manage the stream state. whereas in Dagster the closest you could get is running a schedule that launches a new process every 5-10 seconds, which could very easily end up with overlapping spark processes / duplicate data processing, especially as it'll take probably minimum of 5-10 seconds to get a dagster process running and the spark session instantiated. I usually really hesitate to recommend databricks tools, but Delta Live Tables sounds more fitting for this use case. I've used Delta quite a bit with Dagster and it's pretty much the same as any other flat file source when it comes to writing IO managers, but I don't see a good way to use structured streaming in Dagster. curious if other people have other thoughts
d
TL;DR Could I still integrate these struct stream jobs into dagster without blowing things up? I could take away some of the dagster functionality - I'd like to use it for other batch jobs I have. I'm probably going to avoid DLT, really want to avoid Databricks proprietary solutions (though I'll most likely be running on Databricks, if not EMR). At the very least, I'd like to use dagster as a framework as I have a mix of structured streaming and batch jobs. Just want to figure out how to organize things without introducing some of the issues you mentioned. All(?) of my streams will be using
foreachBatch
and write to target, metrics, etc. tables, on some processingTime interval of >10s (prob a bit more), or processing everything available then shutting down. 1 source readStream maps to N target table writeStreams, with each of those writeStreams writing to multiple tables. Also, Databricks clusters usually take 3-5min to start up. Brand new to all of this, but I'm thinking the common pattern is: • readStream is a resource • off that shared stream, N streams created with different op transformations (none of them write to disk yet) • in foreachBatch the N streams do some more ops and each write to M respective tables. That's where I'd leverage the IOmanager
Thanks for answering btw. Do you have a Delta lake io manager you could point me to?
z
yeah I get it, I try to stay away from Databricks tools as much as possible aside from letting them spin up clusters for me. if the streams are setting up, doing processing and then shutting down, you might be able to get away with it. what are your latency requirements? how often are you hoping to run these streaming jobs? I think you might be able to rig something up, but the latency is not going to be great. another thing to consider is that for any appreciable workload you'll need to spin up a cluster, and there's currently no supported RunLaunchers for running an entire graph on Databricks so you'd have to run on the expensive interactive clusters unless you're okay with spinning up a new cluster for every step (seems like if you're trying to do streaming this is not going to be sufficient). I'm not as familiar with EMR but it seems that you could submit each step to the same cluster there. another thing to consider that really kinda puts a fork in using Dagster with spark is that you can't pass dataframes across op boundaries without writing them to disk, unless you use the InProcessExecutor, in which case all the ops / assets will run sequentially. also, resources are re-initialized for every op, not sure if that would cause issues with the shared readStream.
maybe there's a workaround but I've also tried running a whole graph on a Databricks cluster but it runs into issues when steps are trying to run in parallel because it ends up with multiple spark sessions .
here's a delta IO Manager that I use, it's very specific to my use case though
d
huh I think I need to understand what's going on under the dagster hood more before I get into that mess. Unfortunately I don't have a straight answer on latency requirements yet, but it's likely in the min-hour range for sourcing the raw data. I'll probably be leveraging the availableNow trigger a lot, have you used it? I'd like to start with the struct streaming model in mind in case the latency requirements get into the second range. For now, an interactive cluster with the entire graph would probably be worth it so I don't have to wait 5min between jobs but the real solution would be to get dagster to use Databricks cluster reuse for Databricks multitasks jobs. Dagster would need to submit the entire graph as a single Databricks job and let the Databricks orchestrator handle the tasks. Not sure if that goes against some of dagster's philosophy. readStream resource re-initialization could be a major stopper. I'm pretty sure that would cause Spark to read the source multiple times. And running the ops sequentially would hurt too because Spark has a FAIR scheduler that allows it to writeStreams in parallel. What are the possible solutions? • Could I pass a resource definition to vanilla python functions without dagster re-initializing the spark session and readStream, and w/o writing to disk? • The IO manager is then just defined for Delta Lake ops in the foreachBatch logic. Again, not sure what issues I could have here if dagster tries to re-initialize sessions, or anything like that. Maybe I should scrap dagster, but I'd love to capture at least some of the bigger logic in dagster so I can get DAG visualization, orchestration for the larger jobs, etc.
Thanks for the gist, gives me some ideas.
z
yeah cluster reuse would be cool, I've thought about that too - with the current tools Dagster provides you'd basically end up with one op that just launches your Databricks job and waits for it to complete, but you lose any insight into the databricks job structure / retry-ability at the task level (at least from Dagster), and would have to do quite a bit of work to get the logs streamed back to Dagster. at that point Dagster becomes just a way to launch a databricks job and wait for it to finish, although it could still be useful if that job needs to be stitched together with other jobs into a larger DAG. the biggest, not streaming-specific issue (but one of the main issues you're facing) that I've faced with Dagster + spark is that every op is launched in a different process (at least if you want to be able to parallelize), so you have to materialize your dataframe at the end of each op. there's no way that I'm aware of to pass a spark DataFrame in-memory from one process to the next. the closest I can think of is materializing to an arrow container and using something like arrow DataFusion as your IO manager store. so the only real effective way of combining the Dagster and spark is to put all your logic for a specific materialization into a single asset or op, or else you take a huge performance hit just to be able to organize your transformation logic into separate ops / assets. so the simplest way forward would probably be to just put your whole pipeline in one op or asset / multi-asset. your streaming steps would just be vanilla python ops that the spark session gets passed to. then the IO manager is just used at the end of the op to write to your delta table. but I don't think this will end up running the streams in parallel - don't the writeStreams have to be submitted in different spark apps to get recognized as separate jobs by the scheduler? also if your latency requirements aren't in the seconds, I'd really reconsider using spark streaming if you can. it kinda sounds like you're gonna be taking spark streaming and running it in batches, which kinda defeats the whole purpose of using structured streaming. but that's kinda beside the point here. it would break up your DAG into separate platforms but one idea would be to do the streaming part to your raw / bronze table (whatever your first table is) in DLT and then source that table as a SourceAsset and do the rest of your batch-oriented logic in Dagster. you'd still run into the issues listed above around having to materialize your dataframes between steps.
d
Agreed, Dagster could run a subDAG in Databricks, though I'm not sure I'd use that in practice. Looking into the API, you can get the state of individual tasks in a Databricks job, cancel a job run (all tasks), and repair a job run with the specific tasks to rerun. I'm not sure how close that gets Databricks multitask jobs to being Dagster "compliant" (needing to cancel the entire job is unideal); but it'd be pretty beneficial in cost and cluster startup time savings to integrate DB multi-task jobs with reused clusters into a Dagster step launcher. I'll look into creating a feature request. That approach makes sense to me. I'm going to test it out and see if I run into any trouble. I'd use scheduler pools to parallelize my write streams. I'm going to avoid arrow-datafusion, but there's definitely a lot of really good use cases to integrate that into Dagster. Also there's a proposal open for streaming execution support in datafusion: https://github.com/apache/arrow-datafusion/issues/4285 You're convincing me to consider batch processing if I can get a hard data latency requirement. I've liked using Spark Structured Streaming in the past because of it's tight integration with delta, checkpointing, and the nice abstraction of swapping between processing indefinitely as a stream and running a availableNow one time batch process. You end up treating stream and file sources the same too. Now outside the scope of dagster, so maybe I need to research myself, but how have you retrieved CDC from delta without using spark structured streaming (and without using the new
enableChangeDataFeed
, it doesn't work well with column mapping)? I'll have upserts and deletes so I can't do a:
.read.format("delta").load().filter("last_modified > last_job_run_time")
because I wouldn't retrieve deleted rows that I need to delete downstream. Either I need to read the transaction logs myself or keep the event type info somewhere else. Unless I'm missing something.