Hi Dagsters! I am very happy to see this project a...
# announcements
p
Hi Dagsters! I am very happy to see this project appear. There has long been a need for a sane development platform for data pipelines and I think Dagster looks like a very promising solution! I am currently trying to re-implement a pipeline, currently written in Snakemake, in Dagster. There are some tasks in this pipeline that are very computationally heavy and the DAG will be highly parallel, so I was thinking that Dask would be the ideal execution substrate for this pipeline. However, some of these tasks are extremely memory hungry and so we would like to put some resource constraints on the tasks, which should allow Dask to not schedule too many of these tasks at once. As far as I can tell, this is how one would do that in Dask: https://distributed.dask.org/en/latest/resources.html So my question is, if there is any way to define these constraints in Dagster and make sure they are propagated down to Dask? Or am I going about this in the wrong way?
👀 2
n
Hey Philip, sounds great! I do think it’d be possible to use Dask’s resources system here to put resource constraints in place, although we haven’t wired that up yet. In your use case, do you need to specify resource constraints at the per-task granularity, or would it be sufficient to have a way to express “all tasks in pipeline X should execute with Y resource constraints”?
👍 1
p
Hi Nate, thanks for the reply. I believe it would be best to be able to specify the constraints per-task, since it is only one or two tasks in a long pipeline that have high memory usage. Putting the constraints on the entire pipeline would tie up a lot of of resources in the Dask cluster while they're not actually needed. Also, I have another question that I hope you don't mind helping me with. The pipeline I am implementing is for satellite imagery processing. We will have a set of images, for each day of some time period. For each of these images, the pipeline is exactly the same. This is where the parallelism comes from, that we can execute all of these pipelines in parallel. Each pipeline in itself doesn't have much parallelism though. Would it be a better approach to merge all of these into one super-pipeline, or should this instead be handled by a scheduler and be scheduled as n independent pipelines? I suspect the latter, but I'm having a hard time picturing how it would work in practice, perhaps this is something that should be handled with Airflow?
n
On the resource management, totally makes sense. Will give some thought on how best to expose that in the system. At first glance, the problem you’re describing seems like it might be better handled at the physical execution engine layer, vs. the orchestration layer. So for example, suppose your pipeline of transformations
processImage(img: ImagePath): ProcessedImage
you want to execute on every image; I think it’d be a good fit to just have a Spark job that loads all the images just maps that function over your images like:
Copy code
loadImages("<s3://our-images/prod/2019/08/10>")
  .map(processImage)
  .map(saveImageToS3)
Then, the scheduler / Dagster would run Spark job to run each day as part of a larger pipeline (which might have other jobs, notebooks, etc. as part of the overall DAG)
p
Sounds good. Resources seems like it should be a concern only for the underlying execution engine, so if you think Dagster should be agnostic about resources, you could simply let users specify some kind of
dask_kwargs
or similar, either in the solid/pipeline definitions or in the config. Just a thought. Interesting solution. So basically moving the mapping over the spatial dimension to the execution layer, while handling the time dimension as part of the scheduling. That does seem to be a good way to separate concerns. Thank you for the help and the quick, positive response to my feedback. It will be very interesting to follow the work you are doing on Dagster!
c
Sorry to raise this thread, but resource management (or at least declaration) seems a pretty general requirement for ML pipelines in particular. To share a bit of experience, I'm currently on a project where we had the need to constraint the resources per task and configuration/input set. For context, the problem involve a pipeline acting on different markets. Market size differs, so you don't want to apply the same resource constraints in USA and Uruguay. The stack involves a bunch of libraries that would make Spark a poor fit. The added complexity was also not worth it. I solved the issue by using Airflow with customized Dask Distributed scheduler. But it was a bunch of customized code I'm trying to get rid of 🙂
❤️ 2
n
Hey Cloves—thanks for sharing thoughts here, this is helpful! Let me ping you in a DM, I’m interested to hear more about your use case as we think about how best to support this type of thing in Dagster. Although in my experience resource management was often handled in a layer below the orchestration layer (e.g. YARN or EMR/Dataproc ephemeral clusters), we always needed task-level resource management. I do expect we’ll want to provide the same in Dagster/Dask, will give some thought to how best we can support that. Transparently, it might be a little while til we get to this, but will keep you both posted!
👍 1
@Cloves Almeida @Philip Graae hey folks! we just published Dagster 0.5.7 yesterday which includes a small change I made to support Dask resources. We’ll probably do something more sophisticated in the future, but wanted to unblock in the short term—you can now supply solid-level Dask resource requirements as follows:
Copy code
@solid(
    ...
    step_metadata_fn=lambda _: {'dagster-dask/resource_requirements': {'CPU': 1}},
)
def foo(context):
    pass
And this will pass along those resource requirements to Dask, assuming you launched your Dask worker with e.g.
dask-worker --resources "CPU=1" scheduler-address.internal:8786
p
Hi Nate, just got back from a long vacation, so only seeing this now. Very nice and exactly what we were looking for. Thank you!