https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
announcements
  • s

    sashank

    06/26/2020, 1:19 AM
    0_8_5_Changelog
    • 1
    • 1
  • m

    matas

    06/26/2020, 12:29 PM
    @schrockn @nate @John Helewa @Cris @Fran Sanchez @Rafal I've made a boilerplate repo of our dagster setup at bestplace.ai to share with the community: https://github.com/bestplace/cube We run it containerized on multiple servers using celery-docker executor for pipeline isolation and self-hosted s3+postgres as persistent storages It has a nice ansible role inside, safe hot reloading on code deployments, and yaml-DSL for making your new pipelines from existing solids really fast It is also aimed on jupyter-dagstermill solids and its worker container is a fully-functional jupyterlab image. We'll add .ipynb solids support soon Hope it could give you some inspiration \ starting point for your deployments
    :blob_woah: 3
    😛artydagster: 8
    • 1
    • 1
  • c

    Cris

    06/26/2020, 9:25 PM
    Hi! I'm trying to configure the celery-executor with Amazon SQS but it seems that celery does not have support for a results backend with sqs. Does this mean the backend is not necessary? could someone point me to a configuration that might work with dagster-celery and sqs?
    m
    • 2
    • 2
  • k

    King Chung Huang

    06/27/2020, 7:52 PM
    How do I declare a field with more than one value type? For example, I'm trying to write a
    path
    file that can either take a single string or a list of strings (corresponding to the
    path
    argument in
    dask.dataframe.read_parquet
    ). Separately, the two value types can be written as:
    Field(String, …)
    Field([String], …)
    Is there something like a union type that can combine the two types?
    m
    • 2
    • 3
  • b

    Binh Pham

    06/27/2020, 10:48 PM
    Hi I'm trying to use s3_plus_default_storage_defs, but if my solid has a nested function or lambda it fails to pickle?
    @solid
    def test_solid(context):
        objs = map(lambda x: x, [1, 2, 3])
        return objs
    AttributeError: Can't pickle local object 'test_solid.<locals>.<lambda>'
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_plan.py", line 153, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 272, in core_dagster_event_sequence_for_step
    for evt in _create_step_events_for_output(step_context, user_event):
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 302, in _create_step_events_for_output
    for evt in _set_intermediates(step_context, step_output, step_output_handle, output):
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/execution/plan/execute_step.py", line 314, in _set_intermediates
    value=output.value,
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediates_manager.py", line 130, in set_intermediate
    paths=self._get_paths(step_output_handle),
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediate_store.py", line 89, in set_value
    return self.set_object(obj, context, dagster_type, paths)
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/storage/intermediate_store.py", line 42, in set_object
    key, obj, serialization_strategy=dagster_type.serialization_strategy
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster_aws/s3/object_store.py", line 42, in set_object
    serialization_strategy.serialize(obj, bytes_io)
    File "/home/binhpham/.local/share/virtualenvs/limebi-flow-DASaHltL/lib/python3.6/site-packages/dagster/core/types/marshal.py", line 70, in serialize
    pickle.dump(value, write_file_obj, PICKLE_PROTOCOL)
    s
    • 2
    • 7
  • s

    Shaun Ryan

    06/28/2020, 10:40 AM
    Hi folks. I'm trying to setup a celery worker. I have rabbit mq with a hostname of rabbitmq. I've set the executor celery url in the $DAGSTER_HOME/pipeline_run.yaml and celery_config.yml as follows: config: -> broker: 'pyamqp://guest:guest@rabbitmq:5672//' starting the broker: dagster-celery worker start -y /opt/dagster/dagster_home/celery_config.yaml However my worker is still looking at localhost and failing to connect 🤔 dagster_worker1_1 | [2020-06-28 10:37:24,256: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused. Am I missing something somewhere?
    m
    • 2
    • 5
  • r

    Rafal

    06/28/2020, 2:59 PM
    Can Dagster UI (Dagit) be intefacesed with CloudWatch somehow?
    m
    n
    • 3
    • 3
  • k

    King Chung Huang

    06/28/2020, 4:56 PM
    Is it possible to specify an
    @input_hydration_config
    with
    required_resource_keys
    that are dependent on the config? For example, in the following config, I want to say that a
    glue
    resource is required if the hydration is specified to be
    glue
    . Otherwise, it's not required.
    @input_hydration_config(
        Selector(
            {
                "glue": Permissive({
                    "database": Field(String, is_required=True, description="Name of the Glue Data Catalog database."),
                    "table": Field(String, is_required=True, description="Name of the table in the database."),
                }),
                "csv": Permissive({
                    "path": Field(ReadPathType, is_required=True, description="Path to read from."),
                }),
                "parquet": Permissive({
                    "path": Field(ReadPathType, is_required=True, description="Path to read from."),
                    "columns": Field([String], is_required=False, description="Fields names to read in as columns."),
    
                }),
            }
        ),
        required_resource_keys = {"glue"}  # But, only if "glue" is specified in the input hydration config
    )
    a
    p
    • 3
    • 3
  • k

    King Chung Huang

    06/28/2020, 6:51 PM
    Related to the above, given a config like the following, I'd like to express that either
    path
    is required, or
    glue_database
    and
    glue_table
    are required, but not both.
    columns
    is optional in either case. I've been looking at
    Selector
    for (
    path
    or (
    glue_database
    and
    glue_table
    )), but I can't think of a way to set this up. Is such a thing possible?
    @input_hydration_config(
        Selector(
            {
                "parquet": Permissive({
                    "path": Field(ReadPathType, is_required=True, description="Path to read from."),
                    "glue_database": Field(String, is_required=True, description="Glue database to read from."),
                    "glue_table": Field(String, is_required=True, description="Glue table to read from."),
                    "columns": Field([String], is_required=False, description="Fields names to read in as columns."),
                }),
            }
        )
    )
    a
    • 2
    • 8
  • s

    Shaun Ryan

    06/28/2020, 8:44 PM
    Hi. I'm trying to process documents through an nlp AI pipeline. Are there any examples of a parallel loop work flow? Each document is block of text that will hit dependency pipeline of ML service solids each returning a payload. I don't want to process the documents sequentially. I figure I need a solid for each ML service (3 in total with a rest endpoint) and the pipeline will iterate the documents in parallel through the pipeline. ML models are A, B, C where B & C are dependent on the output of A. I have a small dev setup with RabbitMQ, Dagster_Celery, Dagit, Dagster & Posgres. I will need to be able to scale it out on a bigger compute platform possibly later. I've been messing about and so far I can't get dagit to execute the workload out to the workers. I have it setup in docker-compose here -> https://github.com/shaunryan/docker-compose/tree/master/dagster I'm using the dagster celery cli... e.g.
    dagster-celery worker start --config-yaml celery_config.yaml
    s
    a
    • 3
    • 3
  • n

    narom

    06/29/2020, 9:04 AM
    Hello! I have an existing Dagster project using
    v0.7.13
    and I'm currently trying to update it into
    v0.8.5
    . I'm encountering an error where the scheduler for
    v0.8.5
    can't find my
    pipelines
    directory. For context, here's how
    v0.7.13
    was structured:
    dagster
    - solids
        - test.py
    - pipelines
        - test.py
    - scheduler.py
    - repository.py
    - repository.yaml
    - dagster.yaml
    And here's how I structured
    v0.8.5
    :
    dagster
    - solids
        - test.py
    - pipelines
        - test.py
    - repository.py
    - dagster.yaml
    - workspace.yaml
    Everything's working fine except for the scheduled run which is raising
    ModuleNotFoundError: No module named 'pipelines'
    . It can access the
    pipelines
    folder when ran manually (using dagster run and by running the generated script) but not when scheduled. Added a simplified version of my
    repository.py
    here: https://gist.github.com/santosnarom/7f42ba94d29fb8395f06ae2b12b8cb85
    a
    p
    • 3
    • 4
  • n

    nate

    06/29/2020, 5:15 PM
    0.9.0 Release Plans Hi everyone, 0.8.0 was a massive release requiring substantial restructuring of the whole system. We’ve decided to release our next major version, 0.9.0, in late July. This is a shorter timeframe than our typical major release, and it will focus on polish, bug fixes, documentation, and building new features that have been made possible by the architecture changes in 0.8.0. With the upcoming release, we wanted to share an update on our plans to give you all visibility into what we’ll be working on over the next month: • Containerization: In 0.8.0, we decoupled our tools such as Dagit from user code in a separate process. In 0.9.0, we’ll push this further by working towards supporting packaging user-defined repositories in containers. • Documentation: Completely revamping our documentation, including a collection of scenario-driven examples, new overview sections, and cleaner organization. • Kubernetes Improvements: • Based on the containerization work, separating user code from the Dagit Deployment.   • Adding support for run cancellation and retries. • Adding resource limits for run coordinators. • Threading all logs back through to Dagit. • Exploring publishing Helm chart to simplify usage. • Removing cron scheduler from the Dagit Deployment • Hooks: Adding support for triggering actions based on events like success or failure. We want to support alerting, notifying other systems like PagerDuty or Slack about solid execution results, etc. • APIs: Cleaning up rough edges in Dagster’s APIs: • Fixes for the compute log manager. • Improvements to Dagster’s subselection semantics and for backfilling solid subsets. • Refining the new workspace YAML. • Moving storage configuration from run config to the instance. • Dagit: Various UI improvements and polish to clarify information hierarchy: instance vs. repo vs. pipeline scope. Updates to longitudinal views, including showing step-level success / failure. • gRPC: Adopting gRPC for internal communication between components in Dagster deployments. After this change user environments will no longer have to include the Python GraphQL stack. • Great Expectations: Adding a new Dagster integration with Great Expectations.
    💯 3
    🚀 4
    😛artydagster: 9
    👏 9
    ❤️ 2
    m
    r
    +3
    • 6
    • 6
  • d

    Danny

    06/29/2020, 7:41 PM
    Hey guys, just wanted to point out that the changes in https://github.com/dagster-io/dagster/pull/2640 broke my setup. We run dagster via docker compose for hacking locally:
    dagster:
      image: company/project
      build: .
      command: sh -c "cp pipelines/dagster.yaml tmp/dagster/ && dagit -h 0.0.0.0 -p 9090 -w pipelines/workspace.yaml"
      environment:
        DAGSTER_HOME: tmp/dagster
      ports:
        - 9090:9090
      volumes:
        - ./tmp/dagster:/src/tmp/dagster
    Worked fine on 0.7.13, but today I upgraded to 0.8.5 and getting this error:
    dagster.core.errors.DagsterInvariantViolationError: DAGSTER_HOME must be absolute path: tmp/dagster
    Changing the path to be absolute in the docker compose file is a no go, it'll break on another dev's machine. Any non-hacky suggestions?
    a
    • 2
    • 12
  • d

    Danny

    06/29/2020, 9:00 PM
    Got an 0.7.13 -> 0.8.5 upgrade issue I'm scratching my head over. Getting a 2m hang and then this error when I try to execute a pipeline from dagit's web UI:
    dagster.core.errors.DagsterLaunchFailedError: Host <http://127.0.0.1:9090> failed sanity check. It is not a dagit server.
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/utils.py", line 14, in _fn
        return fn(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 19, in launch_pipeline_execution
        return _launch_pipeline_execution(graphene_info, execution_params)
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 49, in _launch_pipeline_execution
        run = do_launch(graphene_info, execution_params, is_reexecuted)
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 40, in do_launch
        pipeline_run.run_id, external_pipeline=external_pipeline
      File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 964, in launch_run
        return self._run_launcher.launch_run(self, run, external_pipeline=external_pipeline)
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/launcher/__init__.py", line 80, in launch_run
        self.validate()
      File "/usr/local/lib/python3.7/site-packages/dagster_graphql/launcher/__init__.py", line 74, in validate
        host=self._address
    My dagster.yaml config:
    run_launcher:
      module: dagster_graphql.launcher
      class: RemoteDagitRunLauncher
      config:
        address: "<http://127.0.0.1:9090>"
    
    dagit:
      execution_manager:
        disabled: False
        max_concurrent_runs: 4
    This worked fine before on 0.7.13.
    a
    l
    • 3
    • 7
  • w

    wbonelli

    06/30/2020, 2:09 AM
    Hi all! Are there any plans to add Slurm support to the Dask executor? Seems like it would just involve adding a condition here, unless I'm missing something? Thanks much
    d
    • 2
    • 2
  • d

    Danny

    06/30/2020, 3:27 PM
    I'm calling the graphql endpoint over HTTP with a
    launchPipelineExecution
    mutation to start a pipeline. Looking to have it start in process on the dagit instance. I can't figure out the new 0.8 selector I need to specify. Here's what I got so far:
    launchPipelineExecution(
    	executionParams: {
    		selector: {
    			repositoryLocationName: "<<in_process>>",
    			repositoryName: "my_repository",
    			pipelineName: "my_pipeline",
    		},
    		mode: "default",
    	}
    )
    Getting this error:
    RESPONSE >>> {'data': {'launchPipelineExecution': {'__typename': 'PipelineNotFoundError', 'message': 'Could not find Pipeline <<in_process>>.my_repository.my_pipeline', 'pipelineName': 'my_pipeline'}}}
    I'm trying to copy how the dagster_graphql test suite does it but not succeeding. On a side note, might be useful to incorporate "execute pipelines over graphql" machinery into the main dagster_graphql API, because it's very useful for use cases like mine where I have pipelines launching other pipelines, and it's a shame all that useful client code is buried in tests.
    a
    • 2
    • 25
  • k

    King Chung Huang

    06/30/2020, 7:03 PM
    I have an embarrassingly newbie question, but I just cannot figure out where in the docs to find the answer. 😕 What's the method for renaming a solid in a pipeline? I thought it was
    alias
    as in
    df_test().alias("another_name")
    , but that doesn't seem to be correct.
    df_test
    is a solid here.
    @pipeline(
        mode_defs = [
            ModeDefinition("default", resource_defs={'glue': glue_resource})
        ]
    )
    def scratch_pipeline():
        result1 = df_test()
        result2 = df_test(result1)
    a
    • 2
    • 6
  • k

    Kevin

    06/30/2020, 7:19 PM
    datascience with notebooks, dagstermill documentation has mysteriously disappeared? Is it being updated? 😄
    a
    y
    • 3
    • 10
  • p

    Patrick Merlot

    06/30/2020, 10:20 PM
    Hi How would you pass a list of string in the yaml file which configures the solids of a pipeline? I tried like this...
    solids:
      get_list_tables:
        inputs:
          table_list:
            value:
              - "table1"
              - "table2"
    but Dagster doesn't like it.
    c
    l
    • 3
    • 10
  • c

    Cam Marquis

    07/01/2020, 2:06 PM
    Hey All, We've been experimenting with Dagster over the past few weeks and really like it! We are very interested in deploying Dagit as a service and being able to trigger pipelines via REST calls rather than scheduling with chron. Any guidance on the best way to go about implementing something like this? As far as I know there is no rest interface for Dagit nor do any of the current extensions offer what we need... but knowing me it is very possible I just missed something. Thanks in advance for any guidance!
    👍 1
    :graphql: 1
    m
    d
    • 3
    • 10
  • d

    Danny

    07/01/2020, 4:12 PM
    0.8.5 docs are referencing obsolete
    repository.yaml
    https://docs.dagster.io/docs/deploying/celery#4-dagster-code
    a
    • 2
    • 1
  • b

    Binh Pham

    07/01/2020, 9:26 PM
    Is it possible to create Permissive dict with dynamic amount of keys but have Enum values? I'm trying to create a dtype config, for example:
    solids:
      solid_a:
         config:
           dtype:
             col_a: type_option_a
      solid_b:
         config:
            dtype:
              col_b: type_option_a
              col_c: type_option_b
    a
    • 2
    • 2
  • c

    Cris

    07/02/2020, 3:53 PM
    Hi! I was wondering if theres a way to limit the maximum number of processes running solids in dagster using the multiprocess executor. I noticed that when running multiple pipelines with schedules, you could end up with much more processes than cpus. For us this is particularly an issue because we end up overloading our db with multiple heavy queries that end up waiting forever and never end.
    a
    d
    • 3
    • 17
  • d

    Danny

    07/02/2020, 7:18 PM
    Let's say I'm gathering a list of things in one solid A, and want to have each thing found be processed by another solid B. Some arbitrary number (N) of things can be found, and its ok to gather all of them in A into one List output. But because Dagster expects there are no cycles in the compute graph, currently B is forced to process the entire list as opposed to having it run N times. I want B to run once per thing because then when B is run via celery I can properly control concurrency etc. Do I need to upgrade B to be a pipeline to accomplish what I want? That'll greatly decrease the usability of my dagit's Runs page, which could have 10000 entries for Things per each run of A... is there a different way to do this?
    :dagster: 1
    a
    • 2
    • 4
  • b

    Binh Pham

    07/02/2020, 8:32 PM
    Hi I'm wanting to use dagster_databricks, but also use spark DataFrame serialization for intermediate storage, currently available in dagster_pyspark. Would I have to rewrite the serialization to work with dagster_databricks?
    a
    s
    • 3
    • 19
  • m

    Muthu

    07/02/2020, 9:11 PM
    hi… how can i raise the custom error
    • 1
    • 2
  • f

    Fran Sanchez

    07/02/2020, 9:33 PM
    Hi, I just got an unexpected error...
    Message: Cannot query field "isOptional" on type "ConfigTypeField".
    
    Path: 
    
    Locations: [{"line":106,"column":7}]
    It happened after upgrading from 0.8.3 to 0.8.5 in a testing environment.
    a
    • 2
    • 6
  • f

    Fran Sanchez

    07/02/2020, 9:57 PM
    I'm experimenting with the optional inputs/outputs of the solids but I got to something that I don't understand, in this example:
    @solid(output_defs=[OutputDefinition(name="num", dagster_type=int, is_required=False)])
    def optional_input_output(_, num: Optional[int] = None):
        if num:
            yield Output(num, "num")
    
    
    @solid
    def takes_optional(_, num: Optional[int] = 333) -> int:
        return num*2
    
    
    @pipeline
    def optional_pipeline():
        takes_optional(optional_input_output())
    If I don't provide any input to to
    optional_input_output
    then
    takes_optional
    is always skipped, regardless of the input marked as Optional and having a default value... how can I achieve this then?
    a
    • 2
    • 16
  • u

    user

    07/02/2020, 10:38 PM
    Johann Miller just published a new version: 0.8.6.
    :congadagster: 2
  • j

    johann

    07/02/2020, 10:45 PM
    0_8_6_Changelog
    🎉 11
    👏 2
    🚀 2
    😛artydagster: 1
    f
    k
    • 3
    • 4
Powered by Linen
Title
j

johann

07/02/2020, 10:45 PM
0_8_6_Changelog
🎉 11
👏 2
🚀 2
😛artydagster: 1
Thanks to @matas and @David for the contributions in this release!
f

Fran Sanchez

07/02/2020, 11:06 PM
There are a few issues with the format of the Changelog in Slack...
j

johann

07/02/2020, 11:50 PM
Fixed, thanks for catching that @Fran Sanchez
k

Ken

07/03/2020, 12:52 AM
Thanks for releasing!! This is great 🚀
View count: 1