https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • a

    Aharon Levine

    03/01/2022, 10:57 AM
    Hey guys! We run dagster on k8, just upgraded it last week to v0.13.18, looks great! We’ve ran into a strange issue this morning though that the UI gets stuck for some pages with
    upstream request timeout
    For example the /runs /status amd /workspace pages worked every now and then after refreshing the browser multiple times but in general they just show a timeout message like
    upstream connect error or disconnect/reset before headers. reset reason: connection failure
    . Don’t see any strange error logs in kube for either our dagit or our daemon pods. The issue seems intermittent , for a few minutes everything works as normal and then the timeouts return. Any ideas how I can fix this?
    j
    • 2
    • 2
  • g

    geoHeil

    03/01/2022, 1:43 PM
    https://docs.dagster.io/deployment/run-coordinator writes: For example, you can add the following to your dagster.yaml:
    run_coordinator:
      module: dagster.core.run_coordinator
      class: QueuedRunCoordinator
      config:
        max_concurrent_runs: 25
        tag_concurrency_limits:
          - key: "database"
            value: "redshift"
            limit: 4
          - key: "dagster/backfill"
            limit: 10
    but this is a global configuration. Assuming I have different teams with separate pipelines and in particular repositories / workspaces teams might contribute unique new resources in their use cases. For example one team might have a https://github.com/dagster-io/dagster/blob/master/examples/hacker_news_assets/hacker_news_assets/resources/hn_resource.py HN resource and another team a resource for other APIs. How can I merge the configuration from the resources of the different teams to configure the run coordinator? (i.e. if some resources are available only in specific teams) Is it possible to somehow import the resources of teams (and not hardcode it at the startup of dagster in the centralized yml configuration)?
    d
    • 2
    • 2
  • d

    David Choy

    03/01/2022, 2:14 PM
    Hi all. I have revised my code to convert a partition schedule job into a scheduled job. On the dagit console, I am able to see that there is a scheduled next time which is my expected run time. But strangely nothing gets run at that time. Can you please look into my code? Also, attaching DAGIT console screenshot.
    dagster_demo_setup.txt
    d
    • 2
    • 22
  • g

    geoHeil

    03/01/2022, 2:57 PM
    How can I work with partitioned software-defined assets? In a graph which combines
    op
    and
    asset
    ? Right now I get a:
    DagsterInvalidConfigError: Error in config for job
    Error 1: Missing required config entry "dummy_asset_partitioned" at path root:ops. Sample config for missing entry: {'dummy_asset_partitioned': {'config': {'date': '...'}}}
    as Dagster is feedng the configuration per-op and somehow is only filling the partition date into a single of the two steps
    s
    • 2
    • 6
  • m

    Max

    03/01/2022, 3:08 PM
    How is everyone storing their connections strings and passwords? Yaml, env vars or something secrets? Setting up Dagster on ECS, just wondering which of these is the best setup :)
    d
    z
    • 3
    • 3
  • a

    Anoop Sharma

    03/01/2022, 4:02 PM
    How does dagster handle the in-progress runs in case the docker container restarts mid run? 1. Does it restart the runs from the first op? 2. Does it restart the run from the same op where the execution was interrupted? If so, does it store the data/output from the previous op even after the container outage? 3. Does it do nothing?
    d
    • 2
    • 7
  • l

    Louis Auneau

    03/01/2022, 4:33 PM
    Hello! We are trying to setup
    GOOGLE_APPLICATION_CREDENTIALS
    for a sensor logic (that pulls data from GCP Pub/Sub) on an Helm deployment. We are stuck on this because it seems that we need to mount a volume based on a secret onto the daemon, which is not possible given the current
    values.yaml
    file. Do you know if this is possible, or is there any other way to do this ? Thank you by advance and have a nice day 🙂 !
    d
    a
    r
    • 4
    • 8
  • g

    geoHeil

    03/01/2022, 5:26 PM
    I have an asset and IO manager:
    class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
        def load_input(self, context):
            file_path = os.path.join("my_base_dir", context.step_key, context.name)
            return pd.read_csv(file_path)
    
        def handle_output(self, context, obj):
            file_path = os.path.join("my_base_dir", context.step_key, context.name)
    
            obj.to_csv(file_path, index=False)
    
            yield <http://MetadataEntry.int|MetadataEntry.int>(obj.shape[0], label="number of rows")
            yield MetadataEntry.float(0.1234, "some_column mean")
    
        def get_output_asset_key(self, context):
            file_path = os.path.join("my_base_dir", context.step_key, context.name)
            #return AssetKey(file_path)
            return file_path
    
        def get_output_asset_partitions(self, context):
            return set(context.config["partitions"])
    
    @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-02-01"))
    def dummy_asset_partitioned(context) -> DataFrame:
        """Creates a mini dummy asset which is partitioned"""
        partition_key = context.output_asset_partition_key
        get_dagster_logger().info(f"Partitioned asset from: {partition_key}")
        df = pd.DataFrame({'foo':[1,3,3], 'bar':['a', 'b', 'c']})
        df['partition_key'] = partition_key
    
        rand_metric_dummy_value = random.randrange(0, 101, 2)  
        yield Output(df, metadata={
            "path": EventMetadata.path('/path/to/file'),
            "value_counts": 10,
            "random_dummy_metric": rand_metric_dummy_value
        })
    How can I use/set the
    asset_key
    only once? This is currently very unclear to me.
    dagster.core.errors.DagsterInvariantViolationError: Both the OutputDefinition and the IOManager of output "result" on solid "dummy_asset_partitioned" associate it with an asset. Either remove the asset_key parameter on the OutputDefinition or use an IOManager that does not specify an AssetKey in its get_output_asset_key() function.
    c
    d
    s
    • 4
    • 18
  • c

    Chris Nogradi

    03/01/2022, 6:27 PM
    Is there a way to pass metadata from one op downstream to the next op? I see the metadata option in the DynamicOutput() which I am using but am not sure how to get that metadata in the next op. Or is it better to just have an Out that is an object which contains the metadata needed along with the data frame?
    c
    a
    • 3
    • 2
  • r

    rowan gaffney

    03/01/2022, 11:27 PM
    Is it possible to have multiple workspaces/repositories per user-code-deployment?
    c
    b
    j
    • 4
    • 18
  • b

    Ben Gatewood

    03/02/2022, 5:47 AM
    Hey all. New here and evaluating some stuff. One requirement I have is the ability to run a job on an "accelerating schedule". That is, I need to provide a date in the future and, depending on how far away this date is, the job will run more or less often. Does this sound possible?
    r
    • 2
    • 2
  • g

    geoHeil

    03/02/2022, 8:54 AM
    How are software-defined-assets meant to work in an ACID way with regards to metadata & materialization in an IO manager?
    yield Output(df, metadata={
            "path": EventMetadata.path('/path/to/file'),
            "value_counts": 10,
            "random_dummy_metric": rand_metric_dummy_value
        })
    If I understand this correctly it would: - store the emitted metadata - materialize the data using an IO manager What happens if the materialization fails (for whatever reason). Would I still get the metadata to show up in dagit?
    c
    s
    • 3
    • 7
  • g

    geoHeil

    03/02/2022, 9:48 AM
    When integrating dagster with (py-spark): https://docs.dagster.io/integrations/spark#using-dagster-with-spark explains:
    The op-decorated function accepts DataFrames as parameters and returns DataFrames when it completes. An IOManager handles writing and reading the DataFrames to and from persistent storage.
    Assuming the DF is petabytes in size I do not neccessarily want to materialize all this IO. Spark itself will create a DAG of the submitted operations - and perhaps calculate additional predicate pushdowns or projections for optimization (AQE). How can I use dagster and ops to define multiple (reusable building blocks) but still not materialize the IO between these steps?
    z
    s
    • 3
    • 8
  • m

    Mark Fickett

    03/02/2022, 2:31 PM
    What's the right way to declare a
    values_resource
    with an
    Optional
    List
    of `Enum`s? Here's what I tried:
    from typing import List
    from typing import Optional
    
    from dagster import make_values_resource
    from dagster import Enum
    
    from common_definitions import PipelineStage, Pipe  # regular Python enums
    
    
    # Define a resource for a config that can be passed to multiple ops in a job.
    # <https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job>
    SHARED_CONFIG = make_values_resource(
        from_stage=Enum.from_python_enum(PipelineStage),  # seems to work
        pipes=Optional[List[Enum.from_python_enum(Pipe)]],  # error
    )
    The error I get is:
    /...packages/dagster/core/workspace/context.py:541: UserWarning: Error loading repository location my_pipeline.py:TypeError: Parameters to generic types must be types. Got <dagster.config.config_type.Enum object at 0x7f54f51247c0>.
    I haven't done much debugging yet, but thought I'd ask for a definitive answer while I experiment. This is my first time trying to declare a
    values_resource
    .
    a
    • 2
    • 2
  • m

    Mark Fickett

    03/02/2022, 2:57 PM
    What's the right way to optionally skip an op in a graph? I'm trying to combine conditional branching and a fixed fan-in with order-based dependencies, but at the moment getting an error that you can't have an input that's a list of `Nothing`s. Here's a trimmed-down version:
    @op(
        out={
            "do_metadata": Out(Nothing, is_required=False),
            "no_metadata": Out(Nothing, is_required=False),
        },
    )
    def _decide_whether_to_do_metadata(context):
        config = context.resources.shared_config
        if config.should_do_metadata:
            yield Output(None, "do_metadata")
        yield Output(None, "no_metadata")
    
    
    @op(
        ins={"start": In(Nothing)},
    )
    def _metadata(context):
        pass  # only do this work if the config said so
    
    
    @op(
        ins={"start": In(List[Nothing])},
    )
    def _continue_either_way(context) -> None:
        pass  # always do this work, but wait for metadata to be done
    
    
    @graph
    def _graph():
        do_metadata, no_metadata = _decide_whether_to_do_metadata()
        done_metadata = _metadata(do_metadata)
        _continue_either_way(start=[no_metadata, done_metadata])
    I could just put an early-exit in the
    _metadata
    @op
    , but it would be nice to see at a glance in Dagit whether the
    @op
    was skipped or not.
    a
    m
    • 3
    • 4
  • c

    Chen Tsinovoy

    03/02/2022, 4:41 PM
    how do I connect to my s3 bucket with dagster_aws.s3.s3_resource? I want to save a file into my bucket but I dont really understand how to do it with dagster
    z
    • 2
    • 5
  • a

    Anoop Sharma

    03/02/2022, 8:24 PM
    Can I set my DAGSTER_HOME env variable to an s3 bucket (at minio)? I want all the sessions, schedules and history to be stored in that bucket.
    p
    • 2
    • 1
  • d

    Daniel Katz

    03/02/2022, 9:10 PM
    Could someone help me understand this error:
  • d

    Daniel Katz

    03/02/2022, 9:10 PM
    dagster.check.CheckError: Failure condition: Can not have pending and unresolved step inputs
    a
    d
    • 3
    • 17
  • m

    Mark Fickett

    03/02/2022, 9:48 PM
    How can I capture logs from a subprocess into Dagster? I'm migrating an existing pipeline into Dagster, and it's using concurrent.futures which spawn subprocesses. Their logs aren't captured in Dagster, even though I followed https://docs.dagster.io/concepts/logging/python-logging#python-logging to set
    python_logs:  managed_python_loggers: - root
    in my
    $DAGSTER_HOME/dagster.yaml
    . (I confirmed that a logger in my main process does get picked up within Dagit, and the logs from the subprocess do still show in up in my terminal where I started
    dagit
    .) I'm hoping to do an incremental migration where I leave some chunks using concurrent.futures for a bit, but if the answer is that I just need to get everything into Dagster, that's probably OK too.
  • s

    Stephen Bailey

    03/03/2022, 2:04 AM
    i have setup a job to run meltano (data loader) on a schedule, and now i'd like to set up a more comprehensive schedule to run all of my extractors. the basic idea is i need to create multiple schedules with varying parameters, like:
    schedules:
    - cron: 0 1 * * *
      extractor: tap-gitlab
      loader: target-snowflake
    - cron: 0 2 * * *
      extractor: tap-dbt-cloud
      loader: target-snowflake
    - cron: 0 3 * * *
      extractor: tap-salesforce
      loader: target-snowflake
    ...
    could some one give me some pointers on how i can do this?
    d
    • 2
    • 4
  • b

    Ben Gatewood

    03/03/2022, 5:04 AM
    Hey all - I feel like this should basically "just work" ?
    context.resources.s3.upload_file_obj(f_obj, "testing", filename)
  • b

    Ben Gatewood

    03/03/2022, 5:05 AM
    But I get:
    'S3' object has no attribute 'upload_file_obj'
  • b

    Ben Gatewood

    03/03/2022, 5:05 AM
    I have this on my op:
    @op(required_resource_keys={'s3'})
  • b

    Ben Gatewood

    03/03/2022, 5:06 AM
    and this in my job def:
    resource_defs={'s3': s3_resource}
  • b

    Ben Gatewood

    03/03/2022, 5:06 AM
    (it's all copied straight from the dagster-aws docs except I've changed to
    upload_file_obj()
    instead of list keys or whichever)
    j
    • 2
    • 2
  • s

    Sathish

    03/03/2022, 5:08 AM
    Hey all, I am using dagster Sensor in our project.I have activated them in my dagit UI still the sensors are not running. I am seeing the last tick as None in the sensors tab Any idea what causing this.
    d
    • 2
    • 40
  • f

    Francois-DE

    03/03/2022, 5:38 AM
    Good day All. I've got my Dagit UI running with a Multi-container setup. I'm using Postgres for the run, event_log and schedule storage. Some of my jobs takes forever to render the ops, sometime never. A job with 48 ops renders perfectly fine, but a job with 56 ops doesn't render at all. Is there a limit? How do I get this to render for all my job? Could this this related to the Postgres RDS I'm running?
    a
    d
    • 3
    • 6
  • s

    Sanat Mouli

    03/03/2022, 7:36 AM
    Hi There, is it possible to schedule a Software-Defined Asset in Dagster? In the same way we can schedule a dagster job, like with this
    @schedule(
        cron_schedule="45 6 * * *",
        job=hello_cereal_job,
        execution_timezone="US/Central",
    )
    c
    • 2
    • 3
  • m

    Martin Laurent

    03/03/2022, 3:26 PM
    Ok fixed the fan-in issue after a conditional branching. Now I have this conditional-branching + fan-in in a subgraph, and I want to apply this subgraph to all my inputs so I'm using DynamicOutput then
    .map(subgraph)
    but I get this error:
    Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
    a
    d
    • 3
    • 2
Powered by Linen
Title
m

Martin Laurent

03/03/2022, 3:26 PM
Ok fixed the fan-in issue after a conditional branching. Now I have this conditional-branching + fan-in in a subgraph, and I want to apply this subgraph to all my inputs so I'm using DynamicOutput then
.map(subgraph)
but I get this error:
Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
a

alex

03/03/2022, 5:50 PM
@Dagster Bot issue dynamic output dependency in fan in
d

Dagster Bot

03/03/2022, 5:50 PM
Created issue at: https://github.com/dagster-io/dagster/issues/6906
View count: 6