https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • w

    William

    12/05/2022, 4:29 PM
    I’m trying to materialize a single assset within an asset group (by shift+click materialize button), why I see multiple `op`s in the launch pad?
    o
    • 2
    • 1
  • д

    Даниил Конев

    12/05/2022, 5:39 PM
    Hi all! I'm trying to run dagster using dagster-postgres, the feature is db schema changed.
    storage:
      postgres:
        postgres_db:
          db_name: db
          hostname: localhost
          params: {
            options: "-csearch_path=snbx_dagster"
          }
          scheme: postgresql+psycopg2
          username: postgres
          password: ********
          port: 5432
    I manage to start the daemon, the initialization in the database takes place (I see it), but when I start dagit, the service does not start, because cannot bind to schema and throws an error:
    dagster> dagit -p 3030
    2022-12-05 20:31:01 +0300 - dagit - INFO - Serving dagit on <http://127.0.0.1:3030> in process 10768
    Traceback (most recent call last):
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1900, in _execute_context
        self.dialect.do_execute(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\default.py", line 736, in do_execute
        cursor.execute(statement, parameters)
    psycopg2.errors.UndefinedTable: ╬╪╚┴╩└:  юЄэю°хэшх "instance_info" эх ёє∙хёЄтєхЄ
    LINE 2: FROM instance_info
                 ^
    
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "C:\Python310\lib\runpy.py", line 196, in _run_module_as_main
        return _run_code(code, main_globals, None,
      File "C:\Python310\lib\runpy.py", line 86, in _run_code
        exec(code, run_globals)
      File "C:\dagster\venv\Scripts\dagit.exe\__main__.py", line 7, in <module>
      File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 181, in main
        cli(auto_envvar_prefix="DAGIT")  # pylint:disable=E1120
      File "C:\dagster\venv\lib\site-packages\click\core.py", line 1130, in __call__
        return self.main(*args, **kwargs)
      File "C:\dagster\venv\lib\site-packages\click\core.py", line 1055, in main
        rv = self.invoke(ctx)
      File "C:\dagster\venv\lib\site-packages\click\core.py", line 1404, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "C:\dagster\venv\lib\site-packages\click\core.py", line 760, in invoke
        return __callback(*args, **kwargs)
      File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 133, in dagit
        host_dagit_ui_with_workspace_process_context(
      File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 166, in host_dagit_ui_with_workspace_process_context
        log_action(workspace_process_context.instance, START_DAGIT_WEBSERVER)
      File "C:\dagster\venv\lib\site-packages\dagster\_core\telemetry.py", line 494, in log_action
        (dagster_telemetry_enabled, instance_id, run_storage_id) = _get_instance_telemetry_info(
      File "C:\dagster\venv\lib\site-packages\dagster\_core\telemetry.py", line 333, in _get_instance_telemetry_info
        run_storage_id = instance.run_storage.get_run_storage_id()
      File "C:\dagster\venv\lib\site-packages\dagster\_core\storage\runs\sql_run_storage.py", line 778, in get_run_storage_id
        row = self.fetchone(query)
      File "C:\dagster\venv\lib\site-packages\dagster\_core\storage\runs\sql_run_storage.py", line 96, in fetchone
        result_proxy = conn.execute(query)
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1380, in execute
        return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 333, in _execute_on_connection
        return connection._execute_clauseelement(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1572, in _execute_clauseelement
        ret = self._execute_context(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1943, in _execute_context
        self._handle_dbapi_exception(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2124, in _handle_dbapi_exception
        util.raise_(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\util\compat.py", line 208, in raise_
        raise exception
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1900, in _execute_context
        self.dialect.do_execute(
      File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\default.py", line 736, in do_execute
        cursor.execute(statement, parameters)
    sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) ╬╪╚┴╩└:  юЄэю°хэшх "instance_info" эх ёє∙хёЄтєхЄ
    LINE 2: FROM instance_info
                 ^
    
    [SQL: SELECT instance_info.run_storage_id
    FROM instance_info]
    Has anyone already experienced this?
    j
    a
    • 3
    • 6
  • d

    Daniel Mosesson

    12/05/2022, 6:22 PM
    Is there a way of specifying an IO manager for assets only? I frequently want to do that, and its a bit of a pain to manage
    o
    • 2
    • 8
  • a

    Abhinav Ayalur

    12/05/2022, 6:50 PM
    Hey there, was wondering if there are ways to "yield" outputs instead of outputting all at once? So like single input, many outputs, but the many outputs can be processed individually by downstream tasks
    o
    • 2
    • 6
  • a

    Abhinav Ayalur

    12/05/2022, 7:18 PM
    Is there a way to order inputs or outputs? For example, if I had a bunch of inputs would there be a way to order them via some sort of reduce operation before passing them to a future layer?
    o
    • 2
    • 5
  • h

    Huy Dao

    12/05/2022, 7:27 PM
    Hello devs, in this example
    @run_status_sensor(
        run_status=DagsterRunStatus.SUCCESS,
        request_job=status_reporting_job,
    )
    def report_status_sensor(context):
        # this condition prevents the sensor from triggering status_reporting_job again after it succeeds
        if context.dagster_run.job_name != status_reporting_job.name:
            run_config = {
                "ops": {
                    "status_report": {"config": {"job_name": context.dagster_run.job_name}}
                }
            }
            return RunRequest(run_key=None, run_config=run_config)
        else:
            return SkipReason("Don't report status of status_reporting_job")
    is there anyway I can get the run config of the job from context.dagster_run? I want to check the previous run_config before I trigger another job. Many thanks
    o
    • 2
    • 1
  • b

    Byron Murillo

    12/05/2022, 8:59 PM
    Guys, anyone has this documentation in the latest versions? when I clicked on see the recent documentation throw me an 404 error https://docs.dagster.io/0.15.7/guides/dagster/graph_job_op
    o
    • 2
    • 2
  • y

    Yang

    12/05/2022, 9:22 PM
    Hi! I'm trying to test an asset that uses a MultiPartitionKey, but I'm getting errors Is this how I should build the context?
    context = build_op_context(
                config={"exec_path": ""},
                partition_key=MultiPartitionKey({"fiscal_year": "2021", "dataset": "idealratings"}))
    o
    • 2
    • 5
  • a

    Alex Prykhodko

    12/06/2022, 12:21 AM
    Having trouble running a non-partitioned asset job that has upstream partitioned input when using
    s3_pickle_io_manager
    . Works as expected when using
    fs_io_manager
    (the input upstream argument is a dict with keys as partitions). Code:
    @asset(partitions_def=StaticPartitionsDefinition(get_partitions()))
    def sa_metrics_normalized(context: OpExecutionContext, sa_metrics_raw):
        ...
    
    @asset
    def sa_metrics_data_frame(context: OpExecutionContext, sa_metrics_normalized):
        ...
    Error:
    dagster._check.CheckError: Failure condition: Tried to access partition key for input 'sa_metrics_normalized' of step 'sa_metrics_data_frame', but the step input has a partition range: '2014-01' to '2016-12'.
      File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
        yield
      File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/inputs.py", line 856, in _load_input_with_input_manager
        value = input_manager.load_input(context)
      File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 72, in load_input
        key = self._get_path(context)
      File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 33, in _get_path
        path = context.get_asset_identifier()
      File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 409, in get_asset_identifier
        return [*self.asset_key.path, self.asset_partition_key]
      File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 324, in asset_partition_key
        return self.step_context.asset_partition_key_for_input(self.name)
      File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/system.py", line 915, in asset_partition_key_for_input
        check.failed(
      File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1642, in failed
        raise CheckError(f"Failure condition: {desc}")
    o
    • 2
    • 4
  • a

    Abhinav Ayalur

    12/06/2022, 12:56 AM
    is dagster suitable for stream processing? Are there resources for this?
    o
    • 2
    • 10
  • m

    Mycchaka Kleinbort

    12/06/2022, 12:33 PM
    Is there a way to configure the definition of stale? For example, I have a sofware deffined asset
    mostActiveUserId
    that run on a daily schedule (pulls the latest user activity data from snowflake, returns the userId of the most active user - call this ). Most days the most active user is the same as yesterday. In this specific scenario, I only want to mark the downstream tasks as stale if the
    mostActiveUserId
    asset value has changed.
    o
    • 2
    • 1
  • t

    Tamas Juhasz

    12/06/2022, 1:05 PM
    👋 Hello, team! I am having trouble executing my deep learning model training pipeline using k8s-executor. The pipeline itself has 24 training ops executed simultaneously. These ops are executed on designated k8s nodes selected by NodeSelector label and they are scaled up and down by cluster autoscaler and an AWS autoscaling group depending on the cpu utilization. The problem is after sometime during the training the pipeline get terminated with this error:
    Execution of run for "" failed. Execution was interrupted unexpectedly. No user initiated termination request was found, treating as failure.
    I've no clue where does this termination comes from.
    a
    p
    • 3
    • 3
  • d

    Damian

    12/06/2022, 1:08 PM
    Hi guys, we're getting a lot of graphql / mysql connection errors. The dagster instance works, our devops says the database server is fine, but they show up randomly while browsing dagit. Could someone guide me what can be the problem here?
    p
    • 2
    • 3
  • c

    Casper Weiss Bang

    12/06/2022, 1:36 PM
    I'm having issues with running jobs as docker containers as per https://docs.dagster.io/concepts/repositories-workspaces/workspaces#specifying-a-docker-image-in-your-grpc-server in a docker-compose setup:
    docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/cb639ffa1ed57b0cd6581e81de37bb4395bfb4cb7f398d03b2b15a9eabbcea37/start>: Not Found ("network dagster_network not found")
    I can see docker-compose gives it another name,
    username_dagster_network
    - i.e the name of my current user and the network.. You might want to document that somewhere. or am i doing something wrong?
    • 1
    • 1
  • m

    Mehdi Hasanvandy

    12/06/2022, 2:54 PM
    Hello all please help me why this error is occur:
  • m

    Mehdi Hasanvandy

    12/06/2022, 2:59 PM
    r
    • 2
    • 5
  • k

    Kirk Stennett

    12/06/2022, 4:14 PM
    Hey all, is there a way with dagster_dbt to have it materialize assets that match a DBT tag? From what I've seen you have to use the pre-defined asset name that's given (with some customization there like prefix). Or if I want to do something like this am I better off building something custom that runs the job from the CLI resource and determines which assets were materialized? The goal is to be able to do arbitrary dbt runs and have Dagster show the corresponding assets as materialized
    • 1
    • 1
  • d

    Derek Truong

    12/06/2022, 5:33 PM
    Looked through the docs and couldn't find an answer but is there a first-class integration to push failure events to a messaging queue like pubsub for another service to ingest? Intention here is to build an alerting service on any kind of events that occur within dagster
    a
    • 2
    • 2
  • z

    Zachary Bluhm

    12/06/2022, 7:54 PM
    Hi all - are there any docs on what the possible args are to pass into
    dagsterApiGrpcArgs
    ?
    r
    • 2
    • 3
  • n

    nickvazz

    12/06/2022, 9:00 PM
    Can I not use dynamic partitions with
    assets
    ?
    import glob
    import os
    from dagster import asset, DynamicPartitionsDefinition
    
    def get_partitions(_):
        return map(os.path.basename, glob.glob("/some/path/*"))
        
    
    @asset(
        group_name='test_group',
        # partitions_def=DynamicPartitionsDefinition(get_partitions), # this line makes it fail
    )
    def partitioned_asset(context):
        <http://context.log.info|context.log.info>()
    :dagster-bot-resolve: 1
    o
    • 2
    • 16
  • b

    Binh Pham

    12/06/2022, 9:33 PM
    How to handle asset dbt models with an
    alias
    config? dbt model with the file name
    my_schema___my__table.sql
    {{ config(alias="my_table") }}}
    
    SELECT 1
    the asset key will be
    my_schema / my_schema___my__table
    downstream asset using dagster-snowflake io manager
    @asset(
      in={"my_schema__mytable": AssetIn(key_prefix=["my_schema"])
      ...
    )
    def my_asset(my_schema__my_table: DataFrame):
       ...
    the io manager would do a
    select * from <http://my_schema.my|my_schema.my>__schema____my__table
    , which doesnt exist
    o
    • 2
    • 3
  • c

    Chris Anderson

    12/07/2022, 12:16 AM
    Is there a way to call
    .collect()
    and a dynamic op that's been turned into an asset from within another asset that has it as a dependency? For example, I have
    @op(out=DynamicOut())
    def observations(context) -> DynamicOutput[gpd.GeoDataFrame]:
        # yield observations in chunks
    This op gets turned into an asset with its own group later with
    AssetsDefinition.from_op
    that's used across many jobs. In some jobs this dynamic behavior is beneficial and sought after, but in others i'd like to collect all the information before proceeding, like below
    @asset(
        ins={'observations': AssetIn('observations')},
        outs={'collective_analysis': AssetOut()}
    )
    def collective_analysis(context, observations):
        collected_obs = observations.collect()
        # do analysis with all observations collected into one dataframe
    Doing this code right now throws the following error and tries to split this
    collective_analysis
    asset materialization dynamically according to the the original dynamic op:
    dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "collective_analysis"::AttributeError: 'GeoDataFrame' object has no attribute 'collect'
    o
    • 2
    • 4
  • g

    Gatsby Lee

    12/07/2022, 3:46 AM
    Hi, when I use the output of OP as an input of op_generator, I got this errror.
    dagster._core.errors.DagsterInvalidDefinitionError: "log__<dagster._core.definitions.composition.InvokedSolidOutputHandle object at 0x1088f1f90>" is not a valid name in Dagster. Names must be in regex ^[A-Za-z0-9_]+$.
    Here is the sample code.
    @op(out={"step_name": Out()})
    def op__get_config():
        step_name = "hello-dagster"
        return step_name
    
    
    def generate_op(step_name: str):
        @op(name=f"log__{step_name}")
        def func():
            print(f"hello-{step_name}")
    
        return func
    
    
    @job
    def job() -> None:
    
        step_name = op__get_config()
        log_op = generate_op(step_name)
  • g

    Gatsby Lee

    12/07/2022, 3:47 AM
    I am using 1.0.13
  • g

    Gatsby Lee

    12/07/2022, 4:45 AM
    Maybe this is not possible since the actual value is not available during planning time. can anyone confirm this?
    o
    • 2
    • 4
  • w

    William

    12/07/2022, 7:10 AM
    After upgrading to dagster 1.1.5 I no longer can select multiple assets using
    shift+click
    on asset graph page
    o
    • 2
    • 1
  • w

    William

    12/07/2022, 7:25 AM
    How could I make DailyParitionDef to only output business days (Mon. -> Fri.)? Shall I inherit and override the
    get_partitions
    method?
    j
    • 2
    • 10
  • s

    Sơn Lê

    12/07/2022, 7:54 AM
    Hi all, I have followed the Poor man data lake but encounter this error/bug when run
    dagit
    ,
    ImportError: cannot import name 'introspection_query' from 'graphql' (/workspace/.pyenv_mirror/user/current/lib/python3.8/site-packages/graphql/__init__.py).
    However, according to tutorial, dagster should run properly. Is there any fix?
    o
    • 2
    • 6
  • w

    William

    12/07/2022, 8:05 AM
    Is it possible to view per-partition asset stale status?
    o
    • 2
    • 2
  • v

    Vrushank Kenkre

    12/07/2022, 10:53 AM
    Hello, I am trying to setup data pipeline on AWS EMR cluster using dagster. I am using the with_pyspark_emr to do this. But when I launch dagit and run the job I am running into module import errors
    Traceback (most recent call last):
    File "/home/hadoop/.local/lib/python3.7/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
    return importlib.import_module(module_name)
    File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
    File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
    File "<frozen importlib._bootstrap>", line 983, in _find_and_load
    File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
    ModuleNotFoundError: No module named 'with_pyspark_emr'
    `dagster._core.errors.DagsterImportError: Encountered ImportError:
    No module named 'with_pyspark_emr'
    while importing module with_pyspark_emr. Local modules were resolved using the working directory
    /home/ec2-user/dagster/my-dagster-project
    . If another working directory should be used, please explicitly specify the appropriate path using the
    -d
    or
    --working-directory
    for CLI based targets or the
    working_directory
    configuration option for workspace targets.` I have setup dagtser on a dev EC2 machine and trying to run the job on EMR. The module with_pyspark_emr in present in
    /home/ec2-user/dagster/my-dagster-project
    , I am not able to figure out what the issue is. Can someone please help?
    :dagster-bot-responded-by-community: 1
    • 1
    • 1
Powered by Linen
Title
v

Vrushank Kenkre

12/07/2022, 10:53 AM
Hello, I am trying to setup data pipeline on AWS EMR cluster using dagster. I am using the with_pyspark_emr to do this. But when I launch dagit and run the job I am running into module import errors
Traceback (most recent call last):
File "/home/hadoop/.local/lib/python3.7/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'with_pyspark_emr'
`dagster._core.errors.DagsterImportError: Encountered ImportError:
No module named 'with_pyspark_emr'
while importing module with_pyspark_emr. Local modules were resolved using the working directory
/home/ec2-user/dagster/my-dagster-project
. If another working directory should be used, please explicitly specify the appropriate path using the
-d
or
--working-directory
for CLI based targets or the
working_directory
configuration option for workspace targets.` I have setup dagtser on a dev EC2 machine and trying to run the job on EMR. The module with_pyspark_emr in present in
/home/ec2-user/dagster/my-dagster-project
, I am not able to figure out what the issue is. Can someone please help?
:dagster-bot-responded-by-community: 1
This fixed it: https://dagster.slack.com/archives/C01U954MEER/p1666968770375629?thread_ts=1666967497.771219&amp;cid=C01U954MEER
View count: 5