Hi Dagster folks! My team likes dagster but I've ...
# announcements
k
Hi Dagster folks! My team likes dagster but I've got some questions from them! """ The problem is that our use case sits somewhere between the workflow execution problem, and the streaming data problem (we have a stream of data to execute a workflow on), and so nothing solves our problem fully. If dagster doesn't work we think we're left with writing our own orchestration using celery, or moving to a streaming approach, but we really don't like how hard to maintain/test either of those options will be. For dagster we want to use the dependency/output management, but need to run far more pipelines/tasks than it is usually designed for (and batching things together means we lose a lot of the useful features). Three questions that I think I need to answer: What's the per task overhead? I think I we can probably deal with anything up to ~1s (airflow was giving us 5-10s overhead per tasks) Can we programmatically kick off pipelines on some external trigger? (Ideally within the framework, but having to write a lambda to do it wouldn't be terrible). It looks like we can with the GraphQL API? How many pipelines/tasks can run at once? Our pipeline should take ~1 minute to run. It would be nice to be able to process 100-1000 images/minute, but that would mean 1000 pipeline runs per minute, and 10-40 task runs in that time. Airflow couldn't handle that, but could dagster? If you have insight into them that would be appreciated! """
m
interesting!! as to 2) you can certainly programmatically execute pipelines, either with GraphQL or the Python APIs.
you will have to write your own trigger
though one thing we've considered/sketched is a deployment & execution story that would really level lambda
for 2) and 3), i think the bottleneck will be the transaction processing in the event log storage -- and postgres can certainly be tuned to handle 40k writes per minute
this would certainly stress dagster along some axes that haven't yet been tested
you would need to figure out a parallel deployment story to handle 1k pipeline runs per minute, probably using the dagster-celery workers (or again, possibly on lambda/FaaS)
and i'm sure there's frontend stuff where this would imply making some performance improvements and UI changes
k
Thanks @max! ๐Ÿ™‚ I'll send this back to my team! Though off the bat i'm sure to be interested in k8s deployments with the celery workers!
a
We have a pretty flexible system, so we can make choices for how the pipeline run is processed and then how each step in the pipeline is processed. For instance given the constraints you mentioned we may choose to send each pipeline to a celery worker, then process all of the steps in that pipeline in process (to avoid process spin up overhead). These
RunLauncher
and
Engine
components are pluggable so we can create new ones that attempt to meet your constraints.
f
I still struggle a bit to understand the difference between these 2 and when I would write my own version of one or the other
For example, for production one thing that I'm considering is using the Argo cluster that we have been using so far, it requires only a yaml file, so I guess I would need to convert the ExecutionPlan into this yaml equivalent, then each step in Argo would run in a container running an instance of dagster-graphql (I guess), so it can emit events back to the run master. Would something like this be done with RunLauncher or Engine?
a
Here let me draw a diagram quick to try to help. The answer in your case is potentially both. An
Engine
which would take the
ExecutionPlan
distribute the steps to your Argo cluster. The
RunLauncher
would come in to play for handling the process where the
Engine
would execute, if that should be on a different box than where
dagit
is hosted.
๐Ÿ™ 1
So given what you said @Fran Sanchez you might ned up with this
f
@alex you don't know how useful this diagram actually is!!
a
great glad its helpful
f
So, if I remember correctly, there is an equivalent to the RunLauncher at step level, right? The
StepLauncher
k
i echo Fran's comment, thank you @alex this will be helpful to take back to my team! ๐Ÿ™‚
a
ya thats very new - but its basically a way to special case some steps. So the
Engine
determines the default behavior and then you can use the
StepLauncher
machinery to special case some steps
The only one we have right now is taking
pyspark
steps and submitting them to the appropriate cluster instead of handling those steps like the others
f
So, does it mean that it will execute another instance of the
Engine
or do you need to provide a special
StepEngine
-kind of class?
a
since its just a single step it should get executed directly and not go through any
Engine
(ideally, there may be some quirks currently, it is very new as i said)
f
I see, I'll have a look at the Spark example
So, in my case my
Engine
could be fairly simple, pretty much conversion to yaml and submission + monitoring of the remote workflow I guess.
๐Ÿ‘ 1
a
heres a diagram framed around the python code in dagster core

https://docs.dagster.io/assets/images/apidocs/internal/execution_flow.pngโ–พ

f
Everything else would be done in the Argo pods running
dagster-graphqul
(I think this is what I need to run in these pods, right?)
a
fairly simple
Ya I think the hard part will be figuring out how to do the translation
f
Agree, I would need to borrow some of the conventions from the dask engine or celery to define resource limits, images, etc.
a
I think this is what I need to run in these pods, right?
I donโ€™t know enough about Argo to say anything with confidence.
f
No, I mean frm dagster point of view. I have seen how the k8s RunLauncher command is
dagster-graphql
so I guess that this is what I need to run in every step to stick to dagster expectations
a
we are in the middle of changing that a bit - but there will exist some roughly equivalent dagster cli command to invoke that specifies a run id and the step in that run to execute
@johann can keep you up to date as he makes progress
f
Ok, yes I was referring to this
Copy code
args=[
                '-p',
                'executeRunInProcess',
                '-v',
                seven.json.dumps(
                    {
                        'runId': run.run_id,
                        'repositoryName': external_pipeline.handle.repository_name,
                        'repositoryLocationName': external_pipeline.handle.location_name,
                    }
                ),
            ],
...
command=['dagster-graphql'],
        args=args,
One last thing, as of today, is it possible to configure the k8s job RunLauncher to execute every step in a separate k8s job?
a
We currently have the
CeleryK8sJobEngine
which will end up submitting each step as a separate K8s job - but does so via celery queues to provide a means for global resource constraints
we have not yet made a
K8sJobEngine
which just directly submits the jobs, but plan to in the near future
f
Ok, so it would be implemented as different
Engine
handling all the steps
a
I expect we should have it in the next few weeks* since most of the building blocks are all available *assuming some other refactors that take precedence go well
f
Awesome, I think I will start with a simple RunLauncher running everything in a single k8s job with multiprocessing, then switch to this once is ready and in parallel evaluate the feasibility/benefits of using Argo instead
๐Ÿ‘ 1
@alex you were extremely helpful!! Thanks
And please, add something like your diagram to the internal docs, so much easier to understand