Hi guys! Thanks for your work on dagster, everythi...
# announcements
d
Hi guys! Thanks for your work on dagster, everything looks really cool 👍 Me and @matas are trying to deploy dagster at bestplace.ai to facilitate pipeline creation and execution for our analysts. Analysts don't like to write python code, they want to create pipelines described in yaml configuration files 😁 So we want to build system around dagster to make this possible. We deployed dagster using docker and use such containers: - dagster-master with dagit process; - several dagster-worker containers with dagster-celery executor; - rabbitmq container for communication between master and workers; - postgres and minio (s3) containers for persistent storage. Dagster-master and dagster workers use such directory structure: solids/ pipelines/ repository.py repository.yaml solids - directory with solids library, i.e. solids written in python (in future we will add jupyter solids); pipelines - directory with yaml pipelines, that use solids from solids library; repository.py contains define_repo() function that reads all yaml files and creates pipelines from them (I used code similar to this: https://github.com/dagster-io/dagster/blob/master/examples/dagster_examples/dep_dsl/pipeline.py); repository.yaml refers to define_repo() in repository.py. Everything works mostly fine but we encountered several problems and have questions that we would like to discuss with you. 1. We want users to add their own yaml pipelines to dagit. Then everyone should be able to explore and execute these pipelines. So what is best way to reload yaml pipelines to dagit? As I understand pipelines are reloaded when user clicks reload button in dagit UI. Is this the best way possible? What is happenning after click on reload button? Will define_repo() function be called as it is called at dagit start? 2. What happens to already running pipelines when user clicks reload button in dagit UI? What if user starts execution of the pipeline and then immediately clicks reload? I tried to do this and it seemed that pipeline is stuck in infinitely running state. I attached screenshots with this problem. 3. Is it possible to inform user about error in his pipeline.yaml through dagit interface? I can catch exceptions that are raised during yaml pipelines creation but how can I show them in dagit? Thanks
infinite running pipeline
s
re: 1) Yes the repository should be recreated when dagit is reloaded or restarted. 2) Yes this is a problem that we are working on. Pretty terrible experience thanks for bearing with us. This is a function of how you configure your run launcher. Given that you are using celery how are you launching your runs? 3) Not possible currently, but we should be able to handle this much more elegantly post our 0.8.0 release. Tracking issue here: https://github.com/dagster-io/dagster/issues/2464
d
Thanks, Nick! 2) I used config like this:
Copy code
execution:
  celery:
    config:
      broker: <amqp://guest>:guest@cube-rabbitmq:5672//
resources:
  s3:
    config:
      endpoint_url: <http://cube-storage:9000>
storage:
  s3:
    config:
      s3_bucket: dagster-intermediates
solids:
  ...
Is it possible to configure execution so that a restarted dagit correctly picks up running jobs?
m
@nate ^^
m
we've thought of a funny workaround for 3): if we put the whole PipelineDefinition creation into a try\except block we could catch errors and substitute broken pipelines with a single-solid pipeline holding traceback in it's description 😃 but it's a kludge of course
n
@Dmitry Krylov for (2), by default the parent orchestration for celery execution is happening in the Dagit process. We’ve built a “run launcher” for k8s here which relocates that to a separate container for driving execution. We don’t have a run launcher implemented yet which does this more generally outside of a k8s context, but it should be straightforward to put together a run launcher which does this
m
@nate correct me if I'm wrong but as I understood it we could just: 1. implement a simple daemon-process RunLauncher (where should I put it? here? https://github.com/dagster-io/dagster/tree/master/python_modules/dagster/dagster/core/launcher) 2. run our master dagit process under pm2 or any other manager which can watch filesystem for changes and restart dagit if a new pipeline arrives 3. configure dagit to run pipelines in the same container but in separate processes with this RunLauncher would it be enough?
🚀 1
n
yes, I think that should work!
m