https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • r

    Rodrigo Baron

    01/07/2022, 1:00 PM
    Hello, I've made a POC using Dagster and just loved it. Then after present to my team they have some questions about it: • We saw the support to run jobs on top of K8s, however this create one just one pod to run all tasks. That's correct? if so there's a way to run each task in a separated pod? • Regarding K8s, that's possible to run part of a job (like model training) using multiple pod's in parallel? • Yet about K8s 😅 there's a way to select the target node? let's say I have one high performance node and need select that one for the job (maybe using a label filter)..
    a
    d
    • 3
    • 3
  • a

    Andy Chen

    01/07/2022, 2:26 PM
    Hey! Does dagster-pandas have the equivalent of “dtypes” in pandera? https://pandera.readthedocs.io/en/latest/dataframe_schemas.html#get-pandas-data-types I basically have a csv path that's an input to one of my ops, and I want to read that csv into a data frame. I want to explicitly define the types on its columns, but I don't want to define both the pandas-dagster schema and the dtype dictionary to pass into read_csv. In pandera, it looks like you can just define the schema once and then call .dtypes to pass into read_csv. Curious if you guys have a similar helper function or if I should just go with pandera?
    p
    • 2
    • 1
  • r

    raul

    01/07/2022, 3:27 PM
    Hello all. wanted to ask something basic. how do i list objects in GCS bucket ? Any sample code example i could get would help.
  • b

    Bernardo Cortez

    01/07/2022, 3:49 PM
    I want a sequence of jobs to be fired upon the arrival of a new partition of a Delta partition table. I have implemented it with a schedule from a partitioned job with
    build_schedule_from_partitioned_job
    . However, this approach has the downside of being prone to skip partitions, since it depends of the regularity of the availability of the new partition. In other words, if I expect the new partition to arrive daily at 10am and it arrives at 10:30am or 11am, I will lose that partition forever, since, in the next day, it will only process in the next day. I could, indeed, implement something like a more frequent schedule, but that turns out to be close to the idea of a sensor. Is it possible to implement a sensor that listens to these types of events? What
    dagster.core.events.DagsterEventType
    subclass should I use?
    p
    m
    • 3
    • 14
  • d

    Daniel Eduardo Portugal Revilla

    01/07/2022, 8:46 PM
    hello! :dagster-evolution: Dagster has integration with an ingest tool like Airbyte, Striim?
    s
    o
    • 3
    • 5
  • r

    Raphael Krupinski

    01/07/2022, 9:25 PM
    Hi, first time user here 😉 I have a 3GB zip file containing excel files to download. I want to save the zip file in one step, then unzip it and save the excel files in another, and then load them to a database on by one. I want to split it up this way so a problem in one step doesn't mean I need to download the file again. Can I store a large binary in IOManager? Can I get a parent directory to save a file to or bytebuffer to write to?
  • t

    Tim

    01/08/2022, 5:01 AM
    Hi there - just trying out the latest release (0.13.13) When I execute a run via "Launchpad > Launch Run", after the run completes (marked success), the "Terminate" button shows up suddenly in the top right. Is this intended? (Not gonna lie, maybe I just didn't notice that this was a thing in previous releases. If this is intended, my bad.) Screenie attached (ran locally)
    a
    d
    • 3
    • 3
  • l

    Loc Nguyen

    01/08/2022, 8:02 AM
    Hii everyone, I am having dependency conflict between Dagster and the newest DBT, currently Dagster:0.13.13 is using Flask 1.1.4 and requires click:<8 while DBT in their latest version 1.0.1 requires click:>8
    a
    • 2
    • 1
  • w

    Will Gunadi

    01/08/2022, 5:36 PM
    Can dagster_daemon and dagit be deployed on separate servers? I am currently having an issue with Dagit that hang the whole EC2 instance, so at least if I can separate the dagster_daemon into its own server, I won't have to deal with both being down when Dagit acts up.
    r
    d
    • 3
    • 8
  • n

    Navneet Sajwan

    01/08/2022, 6:40 PM
    Hi everyone, I am using k8sRunLauncher so that pipelines run in separate pods. But, these pods are missing some of the environment variables that are available to the repository pods. How do I add those variables to the pipeline run pods?
    d
    y
    d
    • 4
    • 4
  • h

    Huib Keemink

    01/08/2022, 7:11 PM
    Hi! Is it possible to log (
    context.log
    ) stuff other than printable strings? For instance, it would be really useful if I could log something like a plot. I know I could use dagstermill, but at that point I feel like I could just as well use DataBricks
    a
    • 2
    • 2
  • h

    Huib Keemink

    01/08/2022, 7:12 PM
    My usecase: I’d really like to log some info about the data I’ve received in my pipeline, and the descriptive stuff I can think of isn’t very helpful. A plot, on the other hand, would show what’s going on straight away
  • h

    Huib Keemink

    01/08/2022, 7:13 PM
    (for instance, I could log mean, std, kurtosis etc, but an histogram would be much easier)
  • m

    madhurt

    01/09/2022, 8:53 AM
    Hi, I am using the
    dagster-dbt
    library for integration with dbt. According to the docs, I can send extra keyword arguments such as
    my_flag_name = 'foo'
    will get converted to
    --my-flag-name foo
    . But, I need to use an argument called
    --no-version-check
    which doesn’t accept any values in front of it. So how can I pass it to
    context.resources.dbt.run()
    ?
    a
    o
    • 3
    • 3
  • s

    Sa'ar Elias

    01/09/2022, 12:40 PM
    Hey guys! I’m sure I’m just missing something - is there a way to halt a chain (not execute the downstream ops) at run-time, without considering it as a failure? (i.e. instead of
    raise Failure()
    do something like
    yield Stop()
    and count it as a success)?
    • 1
    • 1
  • d

    Daniel Suissa

    01/09/2022, 3:55 PM
    How do I access an OpDefinition's
    context
    argument? I'm trying to wrap
    op
    in a decorator that uses the context to determine whether the op should be skipped (i.e return None for a non required Out)
    b
    a
    • 3
    • 8
  • y

    Yan

    01/10/2022, 12:17 AM
    Hi Is there way to run dagit only with specific job in specific repository? In dagster cli i do -m test.repository --repository prod_repository --job my_job Thanks
    a
    • 2
    • 1
  • m

    Mykola Palamarchuk

    01/10/2022, 6:58 PM
    Hello! I'm trying to set up Dagster in k8s cluster with separated user-deployments. I have two AWS accounts with access to different buckets: one for Dagster-related operations (e.g. as an intermediate storage for s3_pickle_io_manager) and another for ready assets. Manual does not cover the way to set-up multi-profile AWS credentials, so at the moment I'm trying to find the best way. I'd appreciate any advice. My current idea is: put credentials to a k8s Secret and mount it as a volume to ~/.aws/credentials.
    d
    • 2
    • 2
  • b

    Bryan Chavez

    01/11/2022, 12:56 AM
    Is there a way to get access to resources from a RunStatusSensorContext?
    @run_status_sensor(
        pipeline_run_status=PipelineRunStatus.SUCCESS,
        pipeline_selection=pipeline_names,
    )
    def job_success_email_sensor(context: RunStatusSensorContext):
    c
    • 2
    • 2
  • k

    Keshav

    01/11/2022, 5:05 AM
    Hi Team, We are using fs_io_manager. We observed that temporarily created files are not getting deleted after pipelines execution is over. Do we have any option to delete them once the execution is over ?
    c
    • 2
    • 5
  • s

    Sa'ar Elias

    01/11/2022, 10:46 AM
    Hey guys! Is there a place that specified the naming rules for Dagster ops/graphs/ins/outs/mapping keys? I’m assuming it’s the usual python rules with a few exceptions (like not using “context” or “output”), but it’ll be nice to see the entire list of rules/exceptions
    a
    y
    d
    • 4
    • 4
  • m

    Mohammad Nazeeruddin

    01/11/2022, 11:21 AM
    Hi Team, We are getting this error in sensor . We are using user code deployment and configured all required values in dagstet.yaml (configmap-instance.yaml). dagster V0.12.14
    grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = "Exception iterating responses: Errors whilst loading configuration for {'instance_config_map': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=True), 'postgres_password_secret': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=False), 'dagster_home': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=/opt/dagster/dagster_home, is_required=False), 'job_image': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2f7f0>, default=@, is_required=False), 'image_pull_policy': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2fb20>, default=None, is_required=False), 'image_pull_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03deccb9d0>, default=@, is_required=False), 'service_account_name': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3edc0>, default=@, is_required=False), 'env_config_maps': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e070>, default=@, is_required=False), 'env_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3ecd0>, default=@, is_required=False), 'env_vars': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e6d0>, default=@, is_required=False), 'volume_mounts': Field(<dagster.config.config_type.Array object at 0x7f03deccb790>, default=[], is_required=False), 'volumes': Field(<dagster.config.config_type.Array object at 0x7f03dec2f220>, default=[], is_required=False), 'job_namespace': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=default, is_required=False), 'load_incluster_config': Field(<dagster.config.config_type.Bool object at 0x7f040f161b50>, default=True, is_required=False), 'kubeconfig_file': Field(<dagster.config.config_type.Noneable object at 0x7f03decd50d0>, default=None, is_required=False)}.
    Error 1: Post processing at path root:job_image of original value {'env': 'DAGSTER_K8S_PIPELINE_RUN_IMAGE'} failed:
    dagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable "DAGSTER_K8S_PIPELINE_RUN_IMAGE" which is not set. In order for this execution to succeed it must be set in this environment.
    
    Stack Trace:
    File "/usr/local/lib/python3.8/site-packages/dagster/config/post_process.py", line 77, in _post_process
    new_value = context.config_type.post_process(config_value)
    File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 42, in post_process
    return str(_ensure_env_variable(cfg))
    File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 16, in _ensure_env_variable
    raise PostProcessingError(
    "
    debug_error_string = "{"created":"@1641899640.132209114","description":"Error received from peer ipv4:192.168.223.227:3030","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Exception iterating responses: Errors whilst loading configuration for {'instance_config_map': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=True), 'postgres_password_secret': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=False), 'dagster_home': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=/opt/dagster/dagster_home, is_required=False), 'job_image': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2f7f0>, default=@, is_required=False), 'image_pull_policy': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2fb20>, default=None, is_required=False), 'image_pull_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03deccb9d0>, default=@, is_required=False), 'service_account_name': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3edc0>, default=@, is_required=False), 'env_config_maps': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e070>, default=@, is_required=False), 'env_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3ecd0>, default=@, is_required=False), 'env_vars': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e6d0>, default=@, is_required=False), 'volume_mounts': Field(<dagster.config.config_type.Array object at 0x7f03deccb790>, default=[], is_required=False), 'volumes': Field(<dagster.config.config_type.Array object at 0x7f03dec2f220>, default=[], is_required=False), 'job_namespace': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=default, is_required=False), 'load_incluster_config': Field(<dagster.config.config_type.Bool object at 0x7f040f161b50>, default=True, is_required=False), 'kubeconfig_file': Field(<dagster.config.config_type.Noneable object at 0x7f03decd50d0>, default=None, is_required=False)}.\n Error 1: Post processing at path root:job_image of original value {'env': 'DAGSTER_K8S_PIPELINE_RUN_IMAGE'} failed:\ndagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable "DAGSTER_K8S_PIPELINE_RUN_IMAGE" which is not set. In order for this execution to succeed it must be set in this environment.\n\nStack Trace:\n File "/usr/local/lib/python3.8/site-packages/dagster/config/post_process.py", line 77, in _post_process\n new_value = context.config_type.post_process(config_value)\n File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 42, in post_process\n return str(_ensure_env_variable(cfg))\n File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 16, in _ensure_env_variable\n raise PostProcessingError(\n","grpc_status":2}"
    >
    
      File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 238, in execute_sensor_iteration
        sensor_debug_crash_flags,
      File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 268, in _evaluate_sensor
        job_state.job_specific_data.cursor if job_state.job_specific_data else None,
      File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/repository_location.py", line 721, in get_external_sensor_execution_data
        cursor,
      File "/usr/local/lib/python3.7/site-packages/dagster/api/snapshot_sensor.py", line 49, in sync_get_external_sensor_execution_data_grpc
        cursor=cursor,
      File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 299, in external_sensor_execution
        sensor_execution_args
      File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 118, in _streaming_query
        yield from response_stream
      File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
        return self._next()
      File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
        raise self
    d
    • 2
    • 5
  • n

    Nick Dellosa

    01/11/2022, 2:07 PM
    Hey Dagster folks, what's the advantage of using a
    RootInputManager
    as opposed to just defining an upstream op to get a value?
    a
    • 2
    • 2
  • n

    nikki

    01/11/2022, 5:16 PM
    Hi All! Quick question regarding the ECSRunLauncher. Does the the ECSRunLauncher support AWS Secrets with Key/Value type entries? ie MY_SECRET:SECRET_1 , MY_SECRET:SECRET_2? Or does it only support single Plaintext type entries? In Terraform or the AWS Secrets UI you can make multiple entries ( Key/Value) in a secret and access them via their ARN like:
    arn:aws:secretsmanager:us-west-2:234sdfr9823:MY_SECRET:SECRET_1::
    . Can you use the same ARN in dagster.yaml and if so do the secrets get translated to ENV vars correctly? ie env would have SECRET_1 and SECRET_2 in it? The ECSRunLauncer docs don't really say ... nor do they describe very well how the secrets get translated into ENV vars ...
    j
    • 2
    • 1
  • w

    Will Gunadi

    01/11/2022, 6:15 PM
    Anybody else using Postgresql to store runs, etc.? The Dagit instance has been hanging the server and the only exception I can see is this:
    Exception in thread postgres-event-watch:
    Traceback (most recent call last):
      File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/home/kfdev/dagster/v_dagster/lib/python3.8/site-packages/dagster_postgres/event_log/event_log.py", line 258, in watcher_thread
        for notif in await_pg_notifications(
      File "/home/kfdev/dagster/v_dagster/lib/python3.8/site-packages/dagster_postgres/pynotify.py", line 123, in await_pg_notifications
        conn.poll()
    psycopg2.OperationalError: SSL SYSCALL error: Connection timed out
    Anyone have seen this?
    c
    • 2
    • 6
  • b

    Bryan Chavez

    01/11/2022, 6:16 PM
    Is there a way to get job metadata like start time, duration or endtime?
    c
    y
    d
    • 4
    • 6
  • k

    Keith Devens

    01/11/2022, 8:19 PM
    Hi, just getting started with Dagster. Doing a proof-of-concept with some of our ETL code, that calls a bunch of command-line tools in-order. If that works, we can split up the code into more fine-grained @ops, but for now I need to make a @job that just calls some CLIs in order. I’m doing:
    @graph
    def get_data():
        g = create_shell_command_op("...", name="get_data")
        g()
    
    @graph(ins={"start": In(Nothing)})
    def process_data():
        p = create_shell_command_op("...", name="process_data")
        p()
    
    @job
    def load_data():
        data = get_data()
        process_data(start=data)
    But, when I run
    dagit -f jobs.py
    getting:
    UserWarning: Error loading repository location jobs.py:dagster.core.errors.DagsterInvalidDefinitionError: @graph 'process_data' decorated function does not have parameter(s) 'start', which are in provided input_defs. @graph decorated functions should only have keyword arguments that match input names and, if system information is required, a first positional parameter named 'context'.
    I’ve been having trouble just figuring out how to get Dagster to call things in order within a @job.
    a
    • 2
    • 16
  • n

    Nick Dellosa

    01/11/2022, 8:36 PM
    What's the right way to create an op with optional input? I have the default value set explicitly in the
    In
    dict and a default value set on the parameter in the op definition but I'm still getting
    is not connected to the output of a previous node and can not be loaded from configuration, making it impossible to execute
    c
    • 2
    • 11
  • f

    Fikri Thauli

    01/12/2022, 12:04 AM
    [SOLVED] Hey guys I'm new here and I'm trying to install dagster using this link https://docs.dagster.io/getting-started but after I installed dagit I can't run it from cmd and it says " 'dagit' is not recognized as internal or external commands, interoperable programs, or batch files." what did i miss?
    h
    • 2
    • 2
  • c

    Chris Gough

    01/12/2022, 7:39 AM
    noob question: I have this job that runs OK in daggit launcher when I paste in the config, and a schedule that tries but the operations all fail because
    context.op_config == None
    . I want an easy way to manage config with files, then have the right config loaded at the tight time by scheduled jobs. The workflow I imagine is saving config yaml files (as per config editor in the daggit launcher) somewhere and somehow have the scheduled job load them at runtime. Is there an idiomatic way to do that (or a similar but better idiom), or would I need to write my own config-reading operation that edits the context in place ahead of the ops that need it?
    r
    d
    • 3
    • 5
Powered by Linen
Title
c

Chris Gough

01/12/2022, 7:39 AM
noob question: I have this job that runs OK in daggit launcher when I paste in the config, and a schedule that tries but the operations all fail because
context.op_config == None
. I want an easy way to manage config with files, then have the right config loaded at the tight time by scheduled jobs. The workflow I imagine is saving config yaml files (as per config editor in the daggit launcher) somewhere and somehow have the scheduled job load them at runtime. Is there an idiomatic way to do that (or a similar but better idiom), or would I need to write my own config-reading operation that edits the context in place ahead of the ops that need it?
r

Roel Hogervorst

01/12/2022, 8:03 AM
I'm a noob too, but I think you can create a job version that you run on a schedule with a fixed configuration and a job without configuration that you can pass config from dagit.
So the op or job wants a certain value (context.op_config) , and you either supply that from the outside with dagit or in the job. https://docs.dagster.io/concepts/configuration/config-schema#providing-run-configuration
c

Chris Gough

01/12/2022, 8:07 AM
I see. I read that page a few times but it didn't occur to me to have two versions of the job.
thanks. I think my real problem is that I'm working through the concept guild column-wise rather than row-wise, so I didn't get to resources yet 🙂 reading the examples, I think I'm abusing context.op_config instead of letting resources pull config from the environment... It's probably a bit wrong for me to want the context to interact with it's environment like I'm trying to do.
d

daniel

01/12/2022, 3:59 PM
Hi Chris - you'll need to have the run config to be fixed at the time the actual job starts (it's persisted and used to determine the structure and order of your ops), but you could absolutely load from a YAML file within your schedule function - that would be as simple as a
with open("your_run_config_yaml_file.yaml", "r") as f:
        return yaml.load(f)
and then it could change each time the schedule tick happens. Not sure if that would satisfy your requirements here exactly though. You can also source the value of individual fields from environment variables at runtime as well - there's a "StringSource" type that you can use that lets you determine values at runtime: https://docs.dagster.io/_apidocs/config#dagster.StringSource
View count: 2