https://dagster.io/ logo
Docs
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • j

    Jacob Marcil

    03/23/2023, 9:30 PM
    When using MultiPartitionDefinition with a date and a static field. If we try to access the partition of the run with
    asset_partition_key_range
    we end up with
    PartitionKeyRange(start='2020-12-18|Something_Static', end='2023-03-22|Something_else_static')
    Since it’s a list of static elements, there’s no way to all the
    static
    elements of that run right? Precision I’m trying to run multiple partitions in a
    Single Run
    c
    • 2
    • 3
  • y

    Yuan Cheng

    03/24/2023, 1:12 AM
    Hi guys, we are facing a performance issue that loading any lineage of any assets is really slow. What may cause this issue? We are on dagster==1.1.21. Thanks for the help!
    j
    • 2
    • 3
  • a

    Akhil

    03/24/2023, 5:20 AM
    Hello, I am getting this error when using celery_k8s_job_executor for executing Jupyter notebooks as an op using Dagstermill op definition.
    dagster._core.errors.DagsterSubprocessError: During celery execution errors occurred in workers:
    [test_dict]: dagster._serdes.errors.DeserializationError: Deserialized object was not expected type <class 'dagster._core.events.DagsterEvent'>, got <class 'dict'>
    j
    m
    • 3
    • 9
  • a

    Aaron T

    03/24/2023, 5:36 AM
    How can a job utilize an asset? For instance, my asset being
    data_in
    and my job being
    insert_data
    . I don't need
    insert_data
    to return anything, so it can't be defined as an asset. I just need to be able to run
    insert_data
    on demand (currently) to insert data that has been consumed.
    j
    • 2
    • 3
  • j

    Johannes Müller

    03/24/2023, 7:27 AM
    Hi, Is there a repository with a docker compose setup so I can get started quickly just by downloading it and running
    docker-compose up
    ?
    :dagster-bot-resolve: 1
    • 1
    • 1
  • i

    Ignas Kizelevičius

    03/24/2023, 7:47 AM
    Hi, is it possible to pass an
    op
    that returns
    None
    to another
    op
    ?
    j
    • 2
    • 4
  • s

    Sanidhya Singh

    03/24/2023, 11:29 AM
    Hi All! I’m running into postgresql timeout in Dagit for a job with a large amount of logs. I’ve gone through custom Dagster loggers but as I understand, they apply to what gets logged on the terminal and not Dagit? additionally, is there a simpler way to just increase the postgresql timeout?
    a
    • 2
    • 1
  • s

    Spencer Nelson

    03/24/2023, 1:46 PM
    I have a run (k8s run coordinator, k8s run launcher, multiprocess executor) which is in a broken state and dagster can’t seem to recover. What should I do with a run that has been in “CANCELING” state for over 9 hours? At 14:54:59 yesterday, the last log line is “Multiprocess executor: received termination signal - forwarding to active child processes”. I believe this is because the node hit a resource limit in ephemeral storage, so the kubelet started evicting pods, and evicted this run. At 21:42:24 yesterday, I finally canceled the run in dagit. I see
    Sending run termination request.
    and
    [K8sRunLauncher] Run was terminated successfully.
    90 milliseconds later. But the run says “Cancelling” for its status, and it is part of a backfill which still says “In progress” (all other runs in the backfill are complete). Now it is 6:45 the next day, and it is still “Cancelling”. What do I do?
    d
    • 2
    • 18
  • d

    Dario De Stefano

    03/24/2023, 1:51 PM
    Quick question: If I deploy an asset-based pipelines with a scheduler, do I need to have a daemon process running at all times - let's say on a VM?
    j
    • 2
    • 2
  • v

    vengeance

    03/24/2023, 1:51 PM
    dynamic_partitioned_config
  • m

    Mark Fickett

    03/24/2023, 2:08 PM
    Can you dynamically define asset dependencies / names based on data being processed? I have a db with primary keys for different experiments being run. Experiments have a raw-data gathering op, and a result-computing op. Most experiments only depend on their own raw-data output, but some read from other related experiments -- something I can figure out by reading the db. Right now, I just do all the raw-data steps, and then all the result-computing steps. Could I read some db state, and then dynamically generate assets for all the raw-data and result-computing steps, with appropriate dependencies, and then let Dagster figure out the right order to run the ops in? What I saw in the asset docs only showed asset names declared statically.
    j
    • 2
    • 3
  • m

    Max Rostron

    03/24/2023, 2:30 PM
    Hi - I'm new to dagster, so easy question, I'm just finding the docs hard to understand. I have a DBT query which currently produces a BigQuery table each day '`daily_status_table`'. Does anybody have a rough code snippet for an op which: • Triggers when
    daily_status_table
    is run successfully. • Takes the first row of data. • Posts this as a slack message to a channel. I can't find many docs around dagster-slack at the moment. I just need a nudge in the right direction. Thank you! 🙂
    a
    j
    • 3
    • 3
  • a

    AJ Floersch

    03/24/2023, 2:59 PM
    I'm working through various CI/CD methods for deploying Dagster through Docker. At some point in the process, I envision the containers will have to be rebuilt using the
    docker compose
    command. My question is - what would prevent this from essentially killing any actively running jobs/materializations when the containers are restarted? How can I make sure the CI/CD action waits until all actively running Dagster processes have stopped. Just trying to think through how I can prevent a user from pushing a code change that might result in breaking a long running job or corrupt some asset due to the container being stopped mid-process.
    d
    • 2
    • 2
  • c

    clay

    03/24/2023, 3:16 PM
    What is the most simple way to have a job that materializes assets call an op that does some cleanup work once everything else is completed/materialized?
    j
    • 2
    • 2
  • s

    Simon Frid

    03/24/2023, 3:19 PM
    In our latest runs, I've noticed some unusual behavior - the majority of our assets are never enter into the preparing phase / unaccounted for. Basically, many assets without any upstream dependencies never get scheduled. I also see assets being "prepared" with upstream dependencies that have fail, but those dependencies jsut hang in preparing. A week ago this was operating as expected - I'm wondering if what would have caused a regression.
    • 1
    • 3
  • c

    Christoph Leinemann

    03/24/2023, 4:17 PM
    Hi there, when I'm importing all my fivetran assets and I'm importing all my dbt assets - is dagster expected to create the lineage between my connector asset and the downstream dbt models or do I need to create that lineage in the code?
    j
    • 2
    • 8
  • d

    Dario De Stefano

    03/24/2023, 4:38 PM
    Hi, I have another silly question. I am a bit confused about ops,jobs,assets,pipelines and solids. My understanding though is that assets have ops running under the hood anyway and offer more functionalities. Jobs are used in combination with schedules to run materialise the assets when needed (not sure how should I decide whether to use pipelines or jobs). So far I have structured my pipeline as assets with dependencies and then there is a job and a schedule to materialise them. Now, If I want to change a date filter based on the scheduled execution time, what would the best approach be? I am considering splitting the "trasform" asset into 3 "parallel" assets (with same dependencies) and then label them differently (maybe using groups?) so that the schedule knows what to run at what time (or maybe setting up more than 1 schedule). But maybe it could be done using different configurations? How would you tackle this? Attached is how my pipeline looks at the moment with the highlighted asset that I would like to behave differently based on the schedule:
    j
    • 2
    • 4
  • j

    Jordan

    03/24/2023, 5:06 PM
    Hi! We retrieve data daily via an API that we transform before writing it to a DWH. It happens sometimes that a change has been made on the API which implies that we need a complete history (over a time range of 10 years for example). Some APIs are limited and we cannot run the 10 * 365 executions in parallel, the rebuilding time is much too long using daily partitions. That's why we define a second job that uses monthly partitions to rebuild the history (i.e. 10*12 executions) and it works quite well. The constraint of this usage is that we have to double all our assets. So we have a lot of assets in the same code location and I don't find it ideal to have several assets that materialize the same data but at different levels of granularity. Is there any other alternative to rebuild a history that would not require duplicate assets?
  • e

    Eric Loreaux

    03/24/2023, 6:09 PM
    How can I get logs from non-ops to show up in dagit? I import a class in my pipeline which sets
    logger = logging.getLogger(__file__)
    and
    logger.setLevel("INFO")
    . It then proceeds to log things within its methods using
    <http://logger.info|logger.info>()
    . However, when I call these methods from my ops, I don't see these logs showing up in the UI
    :dagster-bot-responded-by-community: 1
    d
    z
    • 3
    • 3
  • d

    Drew You

    03/24/2023, 6:16 PM
    I'm writing an iomanager, is there a good guide to how to deal w/ parallelism/multiprocessing here? Specifically, for time based partitions my iowriter is not parallelizable but for static partitions it is. Ideally, I'd be able to specify this in a way that also works w/ MultiPartitions.
    j
    • 2
    • 6
  • a

    Aaron T

    03/24/2023, 7:16 PM
    How can a partitioned job use a partitioned asset?
  • n

    Nick Centola

    03/24/2023, 7:18 PM
    Is it possible to use the asset reconciliation sensor when you have cross code location asset dependencies? We have two locations (say location A & location B). Location A produces raw data assets, location B runs dbt transformations The reconciliation sensor is defined in location B:
    build_asset_reconciliation_sensor(
        name = "freshness_sensor",
        asset_selection = AssetSelection.all()
    )
    Some assets in location B are downstream of assets from location A. The sensor is not able to reconcile the upstream assets from location A and fails with traceback:
    KeyError: AssetKey(['location_a', 'some_asset'])
      File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 206, in user_code_error_boundary
        yield
      ....
      File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/asset_graph.py", line 172, in get_parents
        return self._asset_dep_graph["upstream"][asset_key]
    I have tried manually defining the assets from location A as `SourceAsset`s in location B but it doesn't resolve the issue. Is it possible to resolve these cross location assets?
    :next-level-daggy: 1
    j
    • 2
    • 2
  • p

    Pablo Beltran

    03/24/2023, 7:37 PM
    I want to create an individual job for every asset in an asset group how would I iterate over all the assets in an asset group and make a job for each of them.
    j
    • 2
    • 4
  • t

    Terry Lines

    03/24/2023, 7:55 PM
    Hi, I'm running dagster locally with sqlite storage. It crashed out and everytime I try to start dagit or dagster-daemon I get the following sqlite3 error. Has anyone else experienced this? Is there an easy way to recover or am I better off deleting the database (it's a testing one so no big deal)? Thanks...
    File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1901, in _execute_context
        cursor, statement, parameters, context
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
        cursor.execute(statement, parameters)
    sqlite3.OperationalError: disk I/O error
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/terry/.pyenv/versions/etl_new_env/bin/dagster-daemon", line 8, in <module>
        sys.exit(main())
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_daemon/cli/__init__.py", line 143, in main
        cli(obj={})  # pylint:disable=E1123
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/click/core.py", line 1130, in __call__
        return self.main(*args, **kwargs)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/click/core.py", line 1055, in main
        rv = self.invoke(ctx)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/click/core.py", line 1657, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/click/core.py", line 1404, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/click/core.py", line 760, in invoke
        return __callback(*args, **kwargs)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_daemon/cli/__init__.py", line 55, in run_command
        ) if instance_ref else DagsterInstance.get() as instance:
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/instance/__init__.py", line 479, in get
        return DagsterInstance.from_config(dagster_home_path)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/instance/__init__.py", line 495, in from_config
        return DagsterInstance.from_ref(instance_ref)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/instance/__init__.py", line 507, in from_ref
        unified_storage = instance_ref.storage
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/instance/ref.py", line 463, in storage
        return self.storage_data.rehydrate() if self.storage_data else None
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_serdes/config_class.py", line 99, in rehydrate
        return klass.from_config_value(self, check.not_none(result.value))
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/storage/sqlite_storage.py", line 81, in from_config_value
        return DagsterSqliteStorage.from_local(inst_data=inst_data, **config_value)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/storage/sqlite_storage.py", line 87, in from_local
        return cls(base_dir, inst_data=inst_data)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/storage/sqlite_storage.py", line 63, in __init__
        self._run_storage = SqliteRunStorage.from_local(_runs_directory(base_dir))
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/storage/runs/sqlite/sqlite_run_storage.py", line 91, in from_local
        db_revision, head_revision = check_alembic_revision(alembic_config, connection)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/dagster/_core/storage/sql.py", line 77, in check_alembic_revision
        db_revision = migration_context.get_current_revision()
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/alembic/runtime/migration.py", line 487, in get_current_revision
        heads = self.get_current_heads()
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/alembic/runtime/migration.py", line 534, in get_current_heads
        if not self._has_version_table():
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/alembic/runtime/migration.py", line 552, in _has_version_table
        self.connection, self.version_table, self.version_table_schema
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/alembic/util/sqla_compat.py", line 244, in _connectable_has_table
        return inspect(connectable).has_table(tablename, schemaname)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/reflection.py", line 283, in has_table
        return self.dialect.has_table(conn, table_name, schema)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/dialects/sqlite/base.py", line 2020, in has_table
        connection, "table_info", table_name, schema=schema
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/dialects/sqlite/base.py", line 2604, in _get_table_pragma
        cursor = connection.exec_driver_sql(statement)
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1765, in exec_driver_sql
        future=True,
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1676, in _exec_driver_sql
        distilled_parameters,
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1944, in _execute_context
        e, statement, parameters, cursor, context
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2125, in _handle_dbapi_exception
        sqlalchemy_exception, with_traceback=exc_info[2], from_=e
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
        raise exception
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1901, in _execute_context
        cursor, statement, parameters, context
      File "/home/terry/.pyenv/versions/3.7.11/envs/etl_new_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
        cursor.execute(statement, parameters)
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) disk I/O error
    [SQL: PRAGMA main.table_info("alembic_version")]
    :dagster-bot-resolve: 1
    • 1
    • 2
  • c

    clay

    03/24/2023, 8:16 PM
    I'm trying to get a multi-container Docker deploy running via WSL2 on Windows (😬) to help a colleague and Docker seems to be refusing the connection when trying to launch a run container? Anybody know what's up here or who could point me in the right direction to debug myself? 99.9% certain that my
    .env
    files are in the proper place since this is a stripped down version of another deployment I have that works just fine with the same setup.
    dagster_daemon      | Stack Trace:
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py", line 333, in _dequeue_run
    dagster_daemon      |     instance.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/dagster_docker/docker_run_launcher.py", line 161, in launch_run
    dagster_daemon      |     self._launch_container_with_command(run, docker_image, command)
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/dagster_docker/docker_run_launcher.py", line 104, in _launch_container_with_command
    dagster_daemon      |     client = self._get_client(container_context)
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/dagster_docker/docker_run_launcher.py", line 78, in _get_client
    dagster_daemon      |     client = docker.client.from_env()
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/docker/client.py", line 101, in from_env
    dagster_daemon      |     **kwargs_from_env(**kwargs)
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/docker/client.py", line 45, in __init__
    dagster_daemon      |     self.api = APIClient(*args, **kwargs)
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/docker/api/client.py", line 197, in __init__
    dagster_daemon      |     self._version = self._retrieve_server_version()
    dagster_daemon      |   File "/usr/local/lib/python3.7/site-packages/docker/api/client.py", line 222, in _retrieve_server_version
    dagster_daemon      |     f'Error while fetching server API version: {e}'
    dagster_daemon      |
    dagster_daemon      | The above exception occurred during handling of the following exception:
    dagster_daemon      | requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionRefusedError(111, 'Connection refused'))
    d
    • 2
    • 6
  • d

    Danny Steffy

    03/24/2023, 8:25 PM
    Just want to be sure before I continue down this path... is the
    step_key
    in the OutputContext a unique value per run per step?
    j
    • 2
    • 1
  • f

    fahad

    03/24/2023, 8:37 PM
    Hi all! Has anyone ever found a pattern for creating an op that can take in any arbitrary
    AssetKey
    via op configuration?
    l
    • 2
    • 4
  • l

    Leo Qin

    03/24/2023, 8:55 PM
    I know that for observable source assets, it's possible to define an asset job that observes them, but is it possible to create a sensor that does the observation?
    ➕ 1
    j
    • 2
    • 2
  • a

    Aaron T

    03/24/2023, 10:45 PM
    Is there a way to set the number of partitions saved? For instance I only want the last 7 days of partitions
  • a

    Abhishek Roul

    03/25/2023, 8:29 AM
    Hello, I'm trying to deploy dagster on the AKS using the helm charts provided. I'm able to deploy the code-location separately and dagster components separately and get them running but I'm confused with the server definition of helm chart. I'm unsure what I should put in the host part as I don't deploy the code location as part of the dagster default helm chart:
    servers:
      - host: "test" // Should this be service url?
        port: 3030
        name: "test"
    Can something like service url be put there inside the host?
    m
    • 2
    • 3
Powered by Linen
Title
a

Abhishek Roul

03/25/2023, 8:29 AM
Hello, I'm trying to deploy dagster on the AKS using the helm charts provided. I'm able to deploy the code-location separately and dagster components separately and get them running but I'm confused with the server definition of helm chart. I'm unsure what I should put in the host part as I don't deploy the code location as part of the dagster default helm chart:
servers:
  - host: "test" // Should this be service url?
    port: 3030
    name: "test"
Can something like service url be put there inside the host?
m

MUKUL GARG

03/25/2023, 1:01 PM
@Abhishek Roul server is the configuration of the gRPC which will be deployed as configmap.
a

Abhishek Roul

03/25/2023, 1:29 PM
Configuration in configmap is fine. I'm more concerned with what should be put in the configuration? Should I put the host as service url of code location or something else?
m

MUKUL GARG

03/25/2023, 1:36 PM
yes! host as a service url.
View count: 1