https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • k

    Koby Kilimnik

    06/18/2022, 8:31 AM
    has anyone managed to use a default config but still have the auto config scaffolding work in dagit, its a bit buggy for me , sometimes i see the values and sometimes the config is empty and it wont let me hit scaffold , is there a way to insert part of the config in the job and then require the rest from the user and have the scaffold missing config button work?:
    d
    • 2
    • 2
  • s

    Son Giang

    06/18/2022, 11:21 AM
    Hi there, we’re kinda interested in asset (which is not experimental anymore in 0.15.0) and want to move our ops/graphs style to assets style. The question is: We had a simple pipeline like this: Op 1: Query the database then writing result to a temporary file Op 2: Upload the file to cloud storage then delete the temporary file What is the best way/practice to write this in asset style?
    b
    s
    • 3
    • 4
  • y

    ygt1qa

    06/19/2022, 2:46 AM
    Hi, I have a question. Can I send any dynamic values to IOManager’s methods? here is example.
    @op(out=Out(io_manager_key="obj_io_manager"}))
    def op1(context):
        # 1. I define some dynamic values in ops
        some_value = "aaa"
        return something...
    
    class ObjIOManager(IOManager):
        def handle_output(self, context, obj):
            # 2. I want to get and use op1's some_value
            print(some_value)
    
        def load_input(self, context):
            # 2. I want to get and use op1's some_value
            print(some_value)
    :dagster-bot-resolve: 1
    s
    • 2
    • 2
  • b

    Barry Sun

    06/20/2022, 12:13 AM
    Hi all, I'm trying to get to grips with Dagster Instance/Daemon. I have looked through the docs here (https://docs.dagster.io/deployment/dagster-daemon) and concluded that I need a dagster instance (
    dagster.yaml
    ) file to define which daemons to run, which is located in the same folder as
    workspace.yaml
    . I am trying to run a Backfill, for this which daemon do I need, and how do I define it in my dagster instance file?
    :dagster-bot-resolve: 1
    👍 1
    d
    • 2
    • 5
  • b

    Barry Sun

    06/20/2022, 5:21 AM
    Hi all, I am struggling with updating my SDA code for 0.15.0 😕 I want to do two things: 1.
    load_assets_from_modules
    is not working for me - I want to be able to load all the assets from a module (e.g. raw_assets) and apply the 'group_name'. I seem to be able to run this function, but how do I input it into
    with_resources
    ? I get the error
    AttributeError: 'list' object has no attribute 'with_resources'
    . 2. Be able to build job from assets. I've tried using
    define_asset_job
    but I just can't find clear documentation on the parameters inside, and how it fits into the final repository definition. Looking at the API docs, I'm not sure how to input my multi-key assets. I'm not sure if it's me but I'm having a hard time finding enough documentation around the new functions 😕
    :dagster-bot-resolve: 1
    g
    s
    • 3
    • 6
  • o

    Oliver

    06/20/2022, 6:25 AM
    Hey, anyone used ray to accelerate dagster pipelines? I'm thinking that i can use ray in my asset definitions and then use the plasma store as an iomanager. I'm not sure if this will work though since I think that after materialisation the process will finish and the ref counter will get decremented and this will result in garbage collection of the object. I am not super confident in my Ray or Dagster knowledge here so this may just work. Otherwise maybe a ray Aactor could be initialised to maintain the ref I guess? Any thoughts about this?
    :dagster-bot-resolve: 1
  • d

    Dmitry Mogilevsky

    06/20/2022, 7:50 AM
    I've tried to use
    with_resources
    to create multiple assets from the same graph with varying configs, using the code below
    @op(required_resource_keys={"a"})
    def op1(context):
        return context.resources.a["v"] + 1
    
    
    @op
    def op2(context, op1):
        return op1 + 1
    
    
    @graph
    def mygraph():
        return op2(op1())
    
    
    @job(resource_defs={"a": make_values_resource(v=int)})
    def myjob():
        mygraph()
    
    
    VALUE_OPTIONS = [0, 1, 2]
    myassets = [
        with_resources(
            [AssetsDefinition.from_graph(mygraph)],
            resource_defs={"a": make_values_resource(v=int)},
            resource_config_by_key={"a": {"config": {"v": v}}},
        )[0]
        for v in VALUE_OPTIONS
    ]
    
    
    @repository
    def test_repo():
        return [*myassets, myjob]
    However, I get the following error when I attempt to load it
    dagster.core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'a' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
    Is it not possible to specify multiple assets from a single graph that vary only in input config? I realise this a classic case for Static Partitioning, but I'm trying to do this without using Static Partitioning to get around the fact that I can't partition across multiple directions, as mentioned in a previous thread.
    :dagster-bot-resolve: 1
    s
    • 2
    • 2
  • m

    Mykola Palamarchuk

    06/20/2022, 10:46 AM
    Hi team! We have a pipeline to grab a lot of paged data, e.g.:
    @graph
    def get_and_persist_page(page_idx):
      page_data = get_page(page_idx)  # from external API
      persist_original(page_idx, page_data)  # persist to track history 
      return to_dataframe(page_idx, page_data)  # convert to Pandas DataFrame
    
    @graph
    def my_job_graph:
      page_indexes = get_page_indexes()  # DynamicOutput, get list of pages from external API
      list_of_dfs = pages.map(get_and_persist_page).collect()
      combined_df = combine(list_of_dfs)
      save_to_warehouse(combined_df)
    It works this way. I like that it is possible to configure retry-policy for "get_page" op. But I'm curious if there is another elegant way to do this? What bothers me: • I don't know how to define type for the list of Pandas DataFrames produced by
    collect()
    • It produces some "noise" in Dagit for my 100 pages. Is there a way to group logs/charts for `DynamicOutput`s somehow?
    :dagster-bot-resolve: 1
    d
    s
    • 3
    • 11
  • c

    Chris Histe

    06/20/2022, 3:42 PM
    Hi, I’m getting the following error:
    Exception: No docker image specified by the instance config or repository
    when
    dagster job launch -j job -c config.yaml
    config.yaml
    execution:
      config:
        image: <http://ghcr.io/user/image:latest|ghcr.io/user/image:latest>
        registry:
          url: <https://ghcr.io>
          username: user
          password: password
    dagster.yaml
    run_launcher:
      module: dagster_docker
      class: DockerRunLauncher
    There is very little documentation. And I’m struggling to fix this. What am I doing wrong? Thanks
    :dagster-bot-resolve: 1
    r
    d
    • 3
    • 54
  • a

    Alessandro Facchin

    06/20/2022, 6:02 PM
    Hi there, I am still getting familiar with Dagster and I cannot find much about op-specific k8s configurations. Ideally I would like to be able to use some lightweight instances for light ops and GPUs for heavy training steps. Could someone point me in the right direction please?
    :dagster-bot-resolve: 1
    r
    • 2
    • 2
  • c

    Charles Leung

    06/20/2022, 7:27 PM
    Hey all, If i wanted to pass in a json string in dagster launchpad, how can i do that?
    :dagster-bot-resolve: 1
    r
    • 2
    • 7
  • m

    Mark Atkins

    06/20/2022, 10:42 PM
    Hi all, I've got a conceptual question about dagster resources. In an implementation I'm doing, I'm using a resource to connect to snowflake using the
    snowflake-connector-python
    package, I'm not currently using the dagster-snowflake implementation because I'd like to be able to use the web based browser authentication for this resource on a local machine. Setting up the resource is no trouble, but something I'm noticing is that If I have multiple ops that consume this resource, the browser auth is triggered for every distinct op that utilizes the resource. Reading through the documentation it conceptually makes sense that this would happen, basically executing the code within a resource definition at every op. But is there any way to instantiate a connection object/cursor once at the beginning of a job within a resource, so that each time the resource is utilized in an op it doesn't attempt to recreate the object?
    :dagster-bot-resolve: 1
    s
    s
    • 3
    • 3
  • j

    Jeremy

    06/20/2022, 11:30 PM
    Is there an example using some ops to build a graph to create an asset? I’m having a hard time getting it to compile. I have one op that creates stuff for 3 that can run in parallel, that then feed into an asset. op1 -> (op2a, op2b, op2c) -> op3
    :dagster-bot-resolve: 1
    s
    • 2
    • 43
  • j

    Jeremy

    06/21/2022, 2:41 AM
    Can you not set the schema on the dagster_snowflake.snowflake_io_manager?
    :dagster-bot-resolve: 1
    s
    • 2
    • 4
  • j

    Jeremy

    06/21/2022, 4:04 AM
    @op(
        out={
            "ip_metadata": Out(
                asset_key=AssetKey(["schema","table"]),
                asset_partitions=context.partition_key,
                metadata={"partition_expr": "created_on"},
                io_manager_key="snowflake_io_manager",
            )
        }
    )
    how do i set the asset_partitions from the context for this op?
    :dagster-bot-resolve: 1
  • s

    Sathish

    06/21/2022, 4:18 AM
    Hi, We are trying to run Dagster CLI command inside a sensor to run a job os.system('dagster job execute -f <py file> -j <job_name>'). when the command is triggered from a sensor it's loading a new repository instance and loading the dagster objects again. instead is there a way to trigger a job in the same instance
    :dagster-bot-resolve: 1
    d
    • 2
    • 6
  • s

    Son Giang

    06/21/2022, 7:56 AM
    How can I add asset_key_prefix when using
    AssetsDefinition.from_graph()
    ?
    :dagster-bot-resolve: 1
    s
    • 2
    • 3
  • s

    Son Giang

    06/21/2022, 8:54 AM
    How can I run dbt assets with parameters/variables?
    :dagster-bot-resolve: 1
    s
    • 2
    • 3
  • l

    Lucia Ambrogi

    06/21/2022, 10:12 AM
    Hello, I am you using an
    Output
    object to attach metadata to my op’s output (based on this example). How can I access this metadata from a downstream op?
    :dagster-bot-resolve: 1
    s
    • 2
    • 2
  • m

    MO

    06/21/2022, 10:33 AM
    Hello, I work in an MLOps team that manages multiple projects spread over a number of git repositories. Is it possible to have a single Dagit UI that interfaces these succinctly (each project with it’s own resource requirements, schedule etc).
    :dagster-bot-resolve: 1
    d
    • 2
    • 4
  • d

    David Hyman

    06/21/2022, 11:21 AM
    Hi again :blob-wave: Great to see the continued development to 0.15 😁 What is the current best practice, if it's possible now, for using multiple docker images in a deployment (e.g. for ops with different/conflicting dependencies)❓ Perhaps for some individual Ops, or maybe grouping by Pipeline? Maybe they share data using assets/sensors etc? Sort of this question again (closest I could find)
    :plus1: 1
    :dagster-bot-resolve: 1
    s
    • 2
    • 1
  • r

    Rszk

    06/21/2022, 11:33 AM
    Hi, I get the following error, when I try to rerun ops in a job:Error AttributeError: type object 'KnownExecutionState' has no attribute 'for_reexecution'
    :dagster-bot-resolve: 1
    s
    y
    • 3
    • 3
  • m

    Megan Beckett

    06/21/2022, 1:25 PM
    Hi there, new to Dagster and I am setting up a fairly straightforward pipeline that pulls data from an API, processes and inserts into our postgresql DB. I am struggling to figure out how to manage connecting to different databases, ie. my local testing instance versus production. At the moment, I have something like this where I have a graph and two different jobs defined that read in a different YAML file that contains either my local testing credentials or the production credentials.
    @graph
    def update_db_metadata():
        """
        Generalised job that will pull all metadata required. There is a test and prod instance of each below.
        """
        process_data(pull_data(connect_api()))
    
    # Test job - will use local testing database connection
    update_db_metadata_test_job = update_db_metadata.to_job(
        name='update_db_metadata_test_job',
        resource_defs={"database": database_connection},
        config=config_from_files(
            [file_relative_path(__file__, "../config/run_config_test.yaml")]
        ),
    )
    
    # Production job - will use production database connection
    update_db_metadata_prod_job = update_db_metadata.to_job(
        name='update_db_metadata_prod_job',
        resource_defs={"database": database_connection},
        config=config_from_files(
            [file_relative_path(__file__, "../config/run_config_prod.yaml")]
        ),
    )
    In the Dagit UI, I can then run either the testing or the production job (this isn't deployed yet either). Is this the recommended way to handle connecting to different DB environments in testing vs production? Or how else?
    :dagster-bot-resolve: 1
    d
    • 2
    • 2
  • j

    Jeremy

    06/21/2022, 1:45 PM
    is this actually fixed? I think I’m bumping up against it…
    :dagster-bot-resolve: 1
    s
    • 2
    • 24
  • m

    Mykola Palamarchuk

    06/21/2022, 2:32 PM
    Hi team! Could you please explain how to use
    RetryRequested
    ? And how does it work internally? My case: a HTTP request may fail sometimes with different errors that require individual retry policy. E.g. some response means we have to wait 30 seconds and can be retry only once (NotReadyYetException), but another response requires exponential timeout and can be retried more times (TemporaryServerOutageException). Is it possible to model that somehow?
    :dagster-bot-resolve: 1
    d
    • 2
    • 4
  • j

    Jay Jackson

    06/21/2022, 5:59 PM
    Hi, am I missing something with my cron expression for Dagster to accept it ->
    dagster.core.errors.DagsterInvalidDefinitionError: Found invalid cron schedule '* * 0 * *' for schedule
    ? Is
    * * 0 * *
    not 5 field cron expression ?
    :dagster-bot-resolve: 2
    c
    r
    • 3
    • 4
  • z

    Zach

    06/21/2022, 6:40 PM
    I'm trying to use the GraphQL API to to retrieve the event logs for a given run and seem to be running into a little syntax error - not very familiar with GraphQL. here's the error:
    {
      "error": {
        "data": null,
        "errors": [
          {
            "message": "Variable \"runId\" of type \"ID\" used in position expecting type \"ID!\".",
            "locations": [
              {
                "line": 1,
                "column": 18
              },
              {
                "line": 2,
                "column": 21
              }
            ]
          }
        ]
      }
    }
    and my query:
    query LogsForRun($runId: ID){
      logsForRun (runId: $runId) {
        __typename
        ... on EventConnection {
          events {
            __typename 
            ... on ExecutionStepStartEvent {
              runId
              timestamp
              stepKey
            }
          }
        }
      }
    }
    with this variable:
    {
      "runId": "1d71f590-e925-42eb-af6c-831d524e1945"
    }
    :dagster-bot-resolve: 1
    r
    • 2
    • 2
  • h

    Harpal

    06/21/2022, 7:03 PM
    Heya team! Does anyone have experience with software defined assets on Dagster? The rest of my dagster jobs work fine (sensors, etc.), but when I add the job that utilises software defined the repo.py it causes an error below. It also shows that page where dagitui cannot see any of your repositories (see attached image).
    dagster._check.CheckError: Invariant failed. Description: Invalid asset dependencies: {AssetKey(['sector_cls__hold__train_set_csv_rig_refactor'])} specified in `internal_asset_deps` argument for multi-asset '_assets' on key 'sector_cls__hold__train_set_csv_rig_refactor'. Each specified asset key must be associated with an input to the asset or produced by this asset. Valid keys: {AssetKey(['public', 'sector_cls__hold__eval_set_csv']), AssetKey(['public', 'sector_cls__hold__isolate_all_caf']), AssetKey(['public', 'sector_cls__hold__isolate_test_caf']), AssetKey(['public', 'sector_cls__hold__isolate_test_haw']), AssetKey(['public', 'sector_cls__hold__train_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__not_test_wak']), AssetKey(['public', 'sector_cls__hold__train_gcal']), AssetKey(['public', 'sector_cls__hold__isolate_test_wak']), AssetKey(['public', 'sector_cls__hold__train_caf']), AssetKey(['public', 'sector_cls__hold__eval_set_ids']), AssetKey(['public', 'sector_cls__hold__test_set_csv']), AssetKey(['public', 'sector_cls__hold__not_test_caf']), AssetKey(['public', 'sector_cls_crunchbase_data']), AssetKey(['public', 'sector_cls__hold__not_test_set_ids']), AssetKey(['gcs', 'sector_cls__hold__train_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__train_unk']), AssetKey(['public', 'sector_cls__hold__isolate_all_unk']), AssetKey(['public', 'sector_cls__hold__eval_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__not_test_unk']), AssetKey(['public', 'sector_cls__hold__test_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__train_wak']), AssetKey(['public', 'sector_cls__hold__isolate_all_haw']), AssetKey(['public', 'sector_cls__hold__isolate_all_wak']), AssetKey(['public', 'sector_cls__hold__isolate_test_set_ids']), AssetKey(['public', 'sector_cls__hold__isolate_test_gcal']), AssetKey(['public', 'sector_cls__hold__train_set_ids']), AssetKey(['public', 'sector_cls__hold__isolate_test_unk']), AssetKey(['public', 'sector_cls__hold__not_test_gcal']), AssetKey(['public', 'sector_cls__hold__train_haw']), AssetKey(['public', 'sector_cls__hold__train_set_csv']), AssetKey(['public', 'sector_cls__hold__not_test_haw']), AssetKey(['gcs', 'sector_cls__hold__test_set_csv_rig_refactor']), AssetKey(['public', 'sector_cls__hold__isolate_all_gcal']), AssetKey(['gcs', 'sector_cls__hold__eval_set_csv_rig_refactor'])}
    
    Stream closed EOF for dagstertest/dagster-dagster-user-deployments-moonfire-dagster-repo-6c5p8z8k (dagster-user-deployments)
    Any help would be much appreciated 😛artydagster: Code snippets available in the comments!
    :dagster-bot-resolve: 1
    s
    • 2
    • 8
  • e

    Eric Larson

    06/21/2022, 7:33 PM
    Hi I’m loading DBT assets and getting error because I think I have a schema name in DBT that is “prohibited” in dagster. How to get around this?
    dagster.core.errors.DagsterInvalidDefinitionError: "meta" is not a valid name in Dagster. It conflicts with a Dagster or python reserved keyword.
    
    dbt_assets = load_assets_from_dbt_project(
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 364, in load_assets_from_dbt_project
        return load_assets_from_dbt_manifest(
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 433, in load_assets_from_dbt_manifest
        dbt_assets_def = _dbt_nodes_to_assets(
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster_dbt/asset_defs.py", line 272, in _dbt_nodes_to_assets
        return AssetsDefinition(
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/asset_defs/assets.py", line 85, in __init__
        self._group_names_by_key[key] = validate_group_name(group_name)
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/definitions/utils.py", line 122, in validate_group_name
        return check_valid_name(group_name)
      File "/root/.cache/pypoetry/virtualenvs/l2-check-pipeline-VA82Wl8V-py3.8/lib/python3.8/site-packages/dagster/core/definitions/utils.py", line 56, in check_valid_name
        raise DagsterInvalidDefinitionError(
    :dagster-bot-resolve: 1
    h
    s
    • 3
    • 5
  • j

    Jay Jackson

    06/21/2022, 7:55 PM
    Hi, sorry but I can't figure out why my schedule is not getting picked up by the
    dagster-daemon
    and I'm not seeing it in the dagit UI ..
    dagster-daemon run -w dagster-workspace.yaml
    @schedule(
        cron_schedule="* * * * *",
        pipeline_name="my_job",
        job=my_job,
    )
    def my_job_schedule(context):
        # Find runs of the same job that are currently running
        run_records = context.instance.get_run_records(
            RunsFilter(job_name="my_job",
                       statuses=[DagsterRunStatus.STARTED]))
    
        # Kick off a run only if no other runs of the same job are running
        if len(run_records) == 0:
            yield RunRequest()
    :dagster-bot-resolve: 1
    r
    • 2
    • 7
Powered by Linen
Title
j

Jay Jackson

06/21/2022, 7:55 PM
Hi, sorry but I can't figure out why my schedule is not getting picked up by the
dagster-daemon
and I'm not seeing it in the dagit UI ..
dagster-daemon run -w dagster-workspace.yaml
@schedule(
    cron_schedule="* * * * *",
    pipeline_name="my_job",
    job=my_job,
)
def my_job_schedule(context):
    # Find runs of the same job that are currently running
    run_records = context.instance.get_run_records(
        RunsFilter(job_name="my_job",
                   statuses=[DagsterRunStatus.STARTED]))

    # Kick off a run only if no other runs of the same job are running
    if len(run_records) == 0:
        yield RunRequest()
:dagster-bot-resolve: 1
I'm assuming the daemon has the schedule daemon inside of it so I don't need
dagster schedule start
, that cmd is giving me
There are no schedules defined for repository
also
r

rex

06/21/2022, 8:01 PM
your schedule should be contained in your
@repository
that you specified in your workspace.yaml. Could you show your repository definition?
j

Jay Jackson

06/21/2022, 8:02 PM
dagster-workspace.yaml
->
load_from:
  - python_file:
      relative_path: data/workflow-recommendations.py
in
workflow-recommendations.py
, I only have the @job decorator , do i need the @repository as well?
like so -> https://docs.dagster.io/_apidocs/repositories#dagster.repository ?
okay, i added @repository and i'm seeing the schedule in the UI now. didn't know i needed it.. I think i got it. Thanks for the response
r

rex

06/21/2022, 8:12 PM
Yup, here are the docs for this: https://docs.dagster.io/concepts/repositories-workspaces/repositories.
View count: 2