https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • a

    Andrew Parsons

    06/24/2021, 5:50 PM
    Hi all, I have a question about authenticating
    boto3
    from within Dockerized Dagster. I am sure the answer is simple but I can't seem to figure out what is wrong. 1. [Screenshot 4] I have created a multi-container Dagster deployment configuration based on the documentation. 2. [Screenshot 4] I don't think it is relevant, but I'm also using the Django ORM side-by-side with Dagster to manage parts of this ETL/ML pipeline. 3. [Screenshot 4] Using a Docker volume, I pass AWS credentials from the host to the container. 4. [Screenshot 4] As I understand, only the pipeline container (not daemon or dagit) needs the AWS credentials volume. In my efforts to debug, I've given it to all three, however. 5. [Screenshot 2] If I
    docker exec -it <container_id> /bin/bash
    into the Dagster containers, I can see that each one has the correct AWS credential file and it is fully populated. 6. [Screenshot 3] If I install the
    awscli
    Python package in a container, I can query the configuration and see that all of the credentials are accessible. 7. [Screenshot 1] I have a Dagster resource through which I try instantiate an S3 bucket. 8. [Screenshot 5] And then to my surprise, the pipeline (or at least the resource) appears to be executing in a different file system!? It cannot find
    /root/.aws/credentials
    9. [No screenshot] ...so the pipeline fails with
    botocore.exceptions.NoCredentialsError: Unable to locate credentials
    10. [No screenshot] I am sure that my Dagster resource works, since I've had successful runs prior to Dockerization. 11. [Screenshot 6] I believe my confusion stems from how the daemon works. Is it spinning up new containers exclusively for a single pipeline run? If so, can I also give this container my AWS credentials volume? For what it is worth, everywhere I've used
    /root
    , I've also tried with
    ~/
    to the same effect. Otherwise, congratulations to the Dagster/Elementl team. Dagster has been fun to learn so far, and while the documentation is still needing improvements, a lot of the basic examples are sufficient to help new users get started quickly. Sorry that the screenshots are out of order; Slack doesn't provide any facilities to reorder them to my knowledge.
    d
    • 2
    • 3
  • s

    Somasundaram Sekar

    06/24/2021, 6:13 PM
    Hello, I have a problem understanding
    dagster-daemon run
    , I have a pipeline with a schedule
    @schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline", execution_timezone="XXXXX", mode="local")
    and I run
    dagit
    and
    dagster-daemon run
    , but even after the schedule is turned off and the
    dagit
    is killed the scheduler still executes the pipeline once a minute. I'm honestly lost on any logical explanation of what is happening
    d
    • 2
    • 19
  • s

    Serge Smertin

    06/24/2021, 7:35 PM
    Hello, i cannot get schedules to work. No matter what i try, the daily schedule gets skipped.
    @solid
    def just_run(_):
        <http://_.log.info|_.log.info>(f'Just run....')
    
    @pipeline(mode_defs=[ModeDefinition(
        resource_defs={'io_manager': fs_io_manager}, 
        executor_defs=[multiprocess_executor])])
    def dummy():
        just_run()
    
    @daily_schedule(
        pipeline_name=dummy.__name__,
        start_date=datetime.datetime(2021, 6, 24),
        execution_time=datetime.time(21, 32),
        should_execute=lambda _: True,
        # should_execute=lambda _: datetime.today().weekday() < 5,
        execution_timezone='Europe/Amsterdam')
    def daily(date):
        print(f'Daily stuff... {date}')
        return {}
    latest tick is always skipped and daemon is telling
    2021-06-24 21:32:29 - SchedulerDaemon - INFO - Evaluating schedule `daily` at 2021-06-24 21:32:00+0200
    2021-06-24 21:32:29 - SchedulerDaemon - INFO - No run requests returned for daily, skipping
    what is wrong?...
    ✅ 1
    d
    • 2
    • 3
  • s

    Sarah Gruskin

    06/24/2021, 9:16 PM
    Hello, I'm trying to setup the slack hooks with my pipeline but a little lost in the documentation. https://docs.dagster.io/concepts/solids-pipelines/solid-hooks#defining-a-solid-hook Currently I have an alerting file like this:
    from dagster import HookContext, failure_hook
    
    @failure_hook
    def my_failure_hook(context: HookContext):
        solid_exception: BaseException = context.solid_exception
        # print stack trace of exception
        traceback.print_tb(solid_exception.__traceback__)
    
    
    @failure_hook(required_resource_keys={"slack"})
    def slack_message_on_failure(context: HookContext):
        message = f"Solid {context.solid.name} failed"
        context.resources.slack.chat.post_message(channel="#de-alerts", text=message)
    and init file like this
    ...
    
    @pipeline(
        mode_defs=[
            ModeDefinition(
                name="prod",
                resource_defs={
                    "redacted": redacted,
                    "api_call": api_call,
                    "slack": slack_resource
                }
            ),
            ModeDefinition(
                name="unit_test",
                resource_defs={
                    "redacted": redacted,
                    "api_call": aapi_call_test,
                    "slack": ResourceDefinition.hardcoded_resource(
                        slack_resource_mock,
                        "do not send messages in dev"
                    )
                }
            )
        ],
        tags={
            'dagster-k8s/config': {
                'pod_template_spec_metadata': {
                    'labels': {'my_labels': 'redacted}
                }
            }
        }
    )
    def my_pipeline():
        solid_one(
            solid_two(
                solid_three(
                        fetch_from_redacted()
                )
            )
        )
    
    @daily_schedule(
        pipeline_name="my_pipeline",
        start_date=datetime(2021, 4, 1),
        execution_time=time(hour=8, minute=15),
        execution_timezone="UTC",
    )
    def my_schedule(date):
        return {
            "solids": {
                "fetch_from_redacted": {
                    "config": {
                        "date": date.strftime("%Y-%m-%d")
                    }
                }
            }
        }
    
    @slack_message_on_failure
    @pipeline(mode_defs=mode_defs)
    def notif_all():
        # the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
        solid_one()
        solid_two()
        fetch_from_redacted()
        solid_3()
    workspace.yaml
    # Yaml for loading our single repository from our repos.py file:
    load_from:
      - python_file: repos.py
    
    resources:
      slack:
        config:
          token: "secret"
    the resources section doesn't seem to be correct since I'm receiving the following error:
    "slack": slack_resource
    docker_example_pipelines     | NameError: name 'slack_resource' is not defined
    I'm unsure how all this ties together.
    y
    • 2
    • 2
  • d

    Daniel Michaelis

    06/25/2021, 8:52 AM
    Hi, I have a short question on the syntax used in the 0.11.15 release notes. One of the points contains the following solid decorator:
    @solid(input*defs=[InputDefinition("data", dagster_type=str, root_manager_key="my_root")])
    I haven't encountered the usage of the asterisk as part of an input variable name as in
    input*defs
    and I couldn't find what it does. In the rest of the code I've only seen it written as
    input_defs
    . Is this just a typo or could someone clarify which function the asterisk has here?
    j
    • 2
    • 2
  • g

    Giacomo D

    06/25/2021, 12:44 PM
    Hello! I need to execute a pipeline triggered by an event on Monday.com. Monday offers webhook integration, but with their own pre-defined graphql calls (here some examples: https://support.monday.com/hc/en-us/articles/360003540679-Webhook-Integration-), is it possible to capture that graphql from dagit and use it to execute a pipeline? To me, the problem here is the mismatch between the graphql sent from Monday.com and the dagit's expected graphql. Thaanks in advance!
  • c

    Caio Rogério Silva dos Santos

    06/25/2021, 1:13 PM
    Hello! I'm setting up a pipeline that's scheduled with a daily schedule, and need to get the
    date
    context variable that it passes for the solids, so as to updated some table properly. Problem is, I am unsure about that when running backfills, is the
    context.solid_config['date']
    the partition date? If not, how can I access the partition date when running backfills but not break my code that's supposed to run daily? Thanks in advance!
    d
    • 2
    • 2
  • v

    Vince

    06/25/2021, 2:06 PM
    Hi! 👋 Is there a more in depth guide on how to deploy Dagster & Dagit to AWS than this? I’d like to evaluate Dagster for Databricks pipeline orchestration.
    j
    • 2
    • 5
  • j

    jurelou

    06/25/2021, 2:50 PM
    Hello, does anyone have tried to make a conditional branching inside a "map" ? It seems like dagster does not take into account the
    is_required=False
    parameter
    @solid(
        output_defs=[
            OutputDefinition(name="a", is_required=False),
            OutputDefinition(name="b", is_required=False),
            OutputDefinition(name="c", is_required=False)
        ]
    )
    def generate_branches(file_path: str):
        if 1 == 1:
            yield(None, output_name="a")
        else:
            yield(None, output_name="b")
    
    @solid
    def proc_file(file_path):
        branches = generate_branches(file_path=file_path)
    
    @pipeline()
    def my_pipeline():
        input_files = gather_files()
    
        output_files = input_files.map(proc_file)
        end(output_files.collect())
    a
    • 2
    • 10
  • j

    Jenny Webster

    06/25/2021, 5:28 PM
    Good morning! In the upgrade from dagster 0.11.14 to 0.11.15, all of our composite solids broke and we are getting error messages like
    dagster.core.errors.DagsterInvalidDefinitionError: Input "df" in solid "generate_speed_solid" is not connected to the output of a previous solid and can not be loaded from configuration, creating an impossible to execute pipeline. Possible solutions are:
    E                         * add a dagster_type_loader for the type "DataFrame"
    E                         * connect "df" to the output of another solid
    where the “generate_speed_solid” is one of the standard solids inside the composite solid. I have tested the individual solids and they work in a pipeline outside of the composite solid. We had been using type casting in our function definitions instead of explicitly calling InputDefinition and OutputDefinition in the solid wrappers. I can’t seem to find the documentation about how this needs to change. Can someone point me to the right resource? Thanks!
    a
    d
    +2
    • 5
    • 28
  • j

    Josh Lloyd

    06/25/2021, 6:02 PM
    Is there a way to stop or restart the grpc server? Once the container with the run worker is up and I can see my pipelines in Dagit I tried running
    dagster api grpc -h 0.0.0.0 -p 4000 --python-file dagster_repos.py
    but that just causes Dagit confusion that causes it to constantly switch between the new pipeline file and the old one. Seems like I need to stop the first grpc process first.
    d
    • 2
    • 7
  • v

    Vitor Baptista

    06/26/2021, 2:43 PM
    Hi all! I'm trying to create a custom IOManager using https://dataset.readthedocs.io/. On the solid, I'm yielding
    yield AssetMaterialization(asset_key="gmail_messages_ids", partition=partition_date_str)
    and the
    Output
    . How can I access the AssetMaterialization's
    partition
    field inside the IOManager? I need this field to ensure that if I run the solid twice for the same partition date, it won't create duplicated rows in the table, but will update the data from the last time it ran on the same
    partition
    .
  • v

    Vitor Baptista

    06/26/2021, 2:52 PM
    Another question: is it possible to yield a variable number of Outputs? I have a solid that downloads Gmail's message IDs on a certain date. I'd like to yield each message ID separately, so each could be handled by another solid instance. However, when I try to yield multiple outputs, I get an error saying that the output name doesn't exist. The code I'm using is:
    for i, message in enumerate(messages):
            yield Output(message, output_name=f"download_messages_ids_{i}"
    and the error is:
    dagster.core.errors.DagsterInvariantViolationError: Core compute for solid "download_messages_ids_in_day" returned an output "download_messages_ids_0" that does not exist. The available outputs are ['result']
    I suppose I could create multiple optional outputs like
    download_message_ids_0
    to 100, but that seems too hackish. Is there a better way?
    j
    • 2
    • 2
  • j

    Jonathan Mak

    06/28/2021, 12:05 PM
    Anyone here uses dagster-dbt and reads the output objects
    DbtResult
    and
    NodeResult
    ? It seems like `NodeResult`’s
    error
    ,
    fail
    ,
    skip
    and
    status
    do not respond to whether the dbt node’s actual run result. Whether it failed, errored or succeeded, these values are all null. Can anyone confirm if this is a bug or not?
    o
    • 2
    • 2
  • s

    Scott Peters

    06/28/2021, 4:20 PM
    I have a fairy abstract question here, but maybe someone has some thoughts to point me in the right direction. Here are the take away questions ( this context to follow ) 1. is there any notion of a
    dagster client
    ( maybe the graphql client could be this? ), for interacting with dagster ( submit, get output ) without having direct local access to
    dagit
    ? ( currently, the
    graphql client
    is fairly limited
    <https://docs.dagster.io/concepts/dagit/graphql-client>
    ) 2. (related) ... what is the appropriate way to get output from a pipeline to other non-dagster python tools... ( aka - pipeline creates an s3 bucket that another script needs to send that s3 path back to the user )
    r
    • 2
    • 1
  • s

    Scott Peters

    06/28/2021, 4:25 PM
    Context: We are looking at leveraging
    dagster
    for our asset management and validation pipelines, but, several of our users are artists, and therefor will require a more didactic front end than
    dagit
    ( simple buttons and forms ) for submitting jobs and getting feedback / results. It seems that once you are in the
    dagster
    eco-system, it is fairly insular? Any perspective is appreciated
    r
    • 2
    • 8
  • a

    Andrew Parsons

    06/28/2021, 6:32 PM
    Hi all -- I'm using Spacy with Dagster. I am confused as to why there is a five to six minute delay between these two debug statements (see screenshots). Interestingly, I don't observe any delay when running this pipeline locally, but I observe this delay on our AWS EC2 instance. In both cases, I run Dagster/Dagit using the multi-container Docker deployment described in the Dagster documentation.
    segmentizer
    is simply a
    spacy.language.Language
    Here are some relevant excerpts from my configuration: playground yaml:
    execution:
      multiprocess:
        config:
          max_concurrent: 14
    dagster.yaml:
    run_launcher:
      module: dagster.core.launcher
      class: DefaultRunLauncher
    Any ideas? Thanks in advance!
  • s

    Sarah Gruskin

    06/28/2021, 8:52 PM
    I have a docker instance running with a pipeline that is scheduled using the
    daily_schedule
    decorator. I'm running it locally currently and kicked off a backfill. The issue is it's been stuck in the 'Starting...' state and I don't know how to debug why. I've downloaded the logs from the 3 dots but unsure what I should be looking for. What would cause a pipeline to hang like this?
    a
    d
    • 3
    • 10
  • s

    Scott Peters

    06/28/2021, 10:29 PM
    I have a general-ish question. A colleague of mine has been running
    dagster
    for the last year or so. As I started to look into it, he mentioned that it is great as long as you are doing all the execution on a single node, but that scaling to multiple compute nodes gets infinitely more complex. Can someone briefly explain how to run
    dagster pipelines
    across multiple nodes ( async or otherwise )
    d
    • 2
    • 1
  • s

    Scott Peters

    06/29/2021, 1:33 AM
    Hey all, I am trying the
    graphql
    examples from the docs and getting an error:
  • s

    Scott Peters

    06/29/2021, 1:33 AM
    https://docs.dagster.io/concepts/dagit/graphql#filters
    r
    • 2
    • 1
  • s

    Simon Späti

    06/29/2021, 8:43 AM
    A little bit of a strange one, that dagit is quite laggy when you keep some windows open for several days. We experienced that fetching latest runs can take 3-4 seconds where it normally takes split-seconds. What we analysed that there were open sessions that are polling non-stop? It might be related to user-code updates during that period and the opened session still hold the old versions. Could that be? After we closed above 3-4 windows, dagit and postgres were back to normal. This not critical, but seems that there is a bug with long opened sessions. Did you experience similar issues?
    a
    • 2
    • 5
  • a

    Alex

    06/29/2021, 11:49 AM
    Hello, I have some problems running dagster-daemon from service file for systemd, I have installed everything on virtual environment on Ubuntu server, inside my service file the I have this ExecStart=/bin/sh -c 'cd /home/name/project && . bin/activate && dagster-daemon run' but i get and error about sql database : sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) attempt to write a readonly database. Any idea ? Maybe is better to setup a relational database as backend instead of sqllite ?
    • 1
    • 1
  • x

    Xiaosu

    06/29/2021, 1:23 PM
    Hi Dagster team! Just started to explore dagster and I met an issue when running dagster locally. The local log shows the pipeline success - finished execution of pipeline. However on the dagster dashboard my last solid keeps being executed forever. I’m sure the solid is done. Any idea?
    • 1
    • 1
  • m

    Michel Rouly

    06/29/2021, 3:07 PM
    Just ran into a new error this morning on k8s Dagster. Using the default k8s postgres DB and the s3 io manager. Just wiped them both out (the postgres volume and the S3 prefix) and deployed fresh, still seeing it the first time I trigger a run. It seems like it's an issue with Dagit talking to the Dagster user deployment service, but I'm not entirely sure... Error deets will be in thread, salient message:
    Message: Attempted to deserialize class "EventLogEntry" which is not in the whitelist. This error can occur due to version skew, verify processes are running expected versions.
    d
    • 2
    • 6
  • m

    Michel Rouly

    06/29/2021, 4:21 PM
    Is there a way to have a fixed fan-in solid proceed despite failed inputs? e.g., if I have solids
    a
    and
    b
    that feed into solid
    fan_in
    , and
    b
    fails, I'd still like
    fan_in
    to run with
    b
    excluded from its input array.
    a
    • 2
    • 2
  • o

    orenl

    06/29/2021, 6:24 PM
    Hey! is there a way for a sensor (or a graphql call) to override the
    dagster-k8s/config
    of a pipeline? My pipeline need to handle inputs of varying sizes, and I'm looking for a way to dynamically set the resources based on the input size.
    d
    • 2
    • 2
  • m

    mrdavidlaing

    06/29/2021, 6:31 PM
    When testing a solid using
    execute_solid()
    I'm finding that resources required by the solid are initialised twice - once during the
    execute_solid()
    call and then again when I grab the result of the execution with
    result.output_value()
    . Is this expected behaviour? If yes; can I get the output_value without triggering a re-init of the resources? Details & logs in the 🧵
    c
    a
    • 3
    • 14
  • o

    orenl

    06/29/2021, 7:08 PM
    K8sRunLauncher doesn't seem to identify out-of-memory failures, and shows the status of the job as "Started" until I manually terminate it. I created a simple solid that eats up all memory:
    @solid(config_schema={})
    def consume_all_mem(context):
        a = []
        while True:
            a.append(' ' * 10**6)
    When I run it in a pipeline, the underlying k8s job shows
    BackoffLimitExceeded
    warning, and the pod's status is "OOMKilled with exit code 137". Ideally, I'd like to see that the job failed because it ran out of memory, and create a sensor to re-submits it with higher resource allocation.
  • m

    Michel Rouly

    06/29/2021, 7:27 PM
    What is the / is there a recommendation for dealing with pipelines wherein a solid might fail halfway thru, and retrying to make it succeed would require a slight reparameterization (change of the
    run_config
    )? e.g., if I tweak one parameter, it won't impact the other solids, but would turn this halfway solid from failure to success on retry?
    a
    • 2
    • 5
Powered by Linen
Title
m

Michel Rouly

06/29/2021, 7:27 PM
What is the / is there a recommendation for dealing with pipelines wherein a solid might fail halfway thru, and retrying to make it succeed would require a slight reparameterization (change of the
run_config
)? e.g., if I tweak one parameter, it won't impact the other solids, but would turn this halfway solid from failure to success on retry?
"just retry the whole pipeline with the new config" is my current strategy
but it would be nice if I could make use of the first few successful solids, given that the new config won't interfere with them
a

adamd

06/29/2021, 7:38 PM
Sounds like maybe you could use versioned outputs and memoization to get retrying the whole pipeline running efficiently. https://docs.dagster.io/guides/dagster/memoization#versioning-and-memoization
m

Michel Rouly

06/29/2021, 7:41 PM
Ooh. Interesting.
We're already playing around with versioning, but haven't looked at memoization yet.
View count: 1