https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • a

    Adrien

    09/09/2022, 9:13 AM
    👋 Hi everyone! We're looking at integrating Dagster Cloud Serverless data for our organisation for analysis; we're interested in all numbers we can get, including historical #runs, #assets, #*failed materializations*, any billing information, etc. I was looking at the website for any API information for this type of organisation data, which would actually sound like
    dagster-daemon
    history type of data, only in the Cloud Serverless environment, but I can't find anything. Is anyone aware of any such API?
    d
    • 2
    • 6
  • t

    Todd

    09/09/2022, 3:00 PM
    If I wanted to use a dagster asset in a web-based application say, from django, is there a recommended way?
    :dagster-bot-responded-by-community: 1
    z
    • 2
    • 1
  • a

    Alan Dao

    09/09/2022, 3:47 PM
    hi another question
    :dagster-bot-not-a-thread: 1
  • a

    Alan Dao

    09/09/2022, 3:48 PM
    can we define a upstream dependency for DBT asset but outside of DBT area ( without ref or source )
  • a

    Alan Dao

    09/09/2022, 3:48 PM
    like for example i want dbt model to run after a python script is finished
    o
    • 2
    • 10
  • c

    Charlie Bini

    09/09/2022, 4:15 PM
    is there a way to rename subgraphs? when I do this, the
    resync
    graph contains nodes named
    compose_queries
    ,
    compose_queries_2
    , etc
    @graph
    def execute_query(dynamic_query):
        # trunk-ignore(flake8/F841)
        data = extract(dynamic_query=dynamic_query)
    
    
    @graph
    def resync():
        # log
        dynamic_query = compose_queries()
        dynamic_query.map(execute_query)
    
        # attendance
        dynamic_query = compose_queries()
        dynamic_query.map(execute_query)
    
        # storedgrades
        dynamic_query = compose_queries()
        dynamic_query.map(execute_query)
    
        # pgfinalgrades
        dynamic_query = compose_queries()
        dynamic_query.map(execute_query)
    
        # assignmentscore
        dynamic_query = compose_queries()
        dynamic_query.map(execute_query)
    o
    • 2
    • 3
  • y

    Yevhen Samoilenko

    09/09/2022, 4:19 PM
    Hi!
    [dagit] The launchpad tab is no longer shown for Asset jobs. Asset jobs can be launched via the "Materialize All" button shown on the Overview tab. To provide optional configuration, hold shift when clicking "Materialize".
    This feature doesn't work for me in dagster cloud and open source dagster.
    s
    • 2
    • 2
  • s

    Saul Burgos

    09/09/2022, 4:31 PM
    Hi, I am using this link like a guide: https://github.com/dagster-io/dagster/blob/1.0.8/examples/deploy_docker/docker-compose.yml I am trying to integrate Airbyte, so....I took the docker-compose.yaml from the airbyte github and I put it on my folder together with my other compose file https://github.com/airbytehq/airbyte/blob/v0.40.3/docker-compose.yaml So... in this moment I have 2 docker compose files: "docker-compose.yml" and "docker-compose.airbyte.yml". I run this command: docker-compose -f docker-compose.yml -f docker-compose.airbyte.yml up --build Everything is built and created correctly. I am using the "airbyte_resource" from "dagster_airbyte". On my env variables I pass the client airbyte host in this way: AIRBYTE_CLIENT_HOST=airbyte-webapp AIRBYTE_CLIENT_PORT=8000 I have a sensor that is calling a job with a unique operation: @job( resource_defs={ "airbyte": airbyte_resource } ) def main(): airbyte_sync_op() So .. the problem is when the sensor triggers my job I can not reach the airbyte api List of things that I have tried: - Join all the services from dasgter and airbyte in a unique network - Expose the port 8000 on webapp service - use like host: "airbyte-webapp", "webapp", "airbyte-server" - use ports like: 8000 or 8001 does anyone has any idea what is happening. ? who is supposed to have access to the airbyte api ?? "docker_example_user_code" , "docker_example_dagit", "docker_example_daemon"??? I assume that I have some problem with docker configuration ... but I have tried everything.
    a
    d
    • 3
    • 18
  • a

    Anthony Reksoatmodjo

    09/09/2022, 4:39 PM
    Good morning! Does anyone know how to force a gRPC repository to reload? I add new jobs dynamically following this example, but the new jobs don't show up unless I restart the code server (reloading via Dagit does nothing).
    d
    s
    +2
    • 5
    • 12
  • m

    Michal Malyska

    09/09/2022, 5:01 PM
    Hi Everyone, is there a reasonable way to have a job that is both scheduled at a set time, but also can be triggered manually, with the contstraint that only one instance of the job can be running at the same time? (i.e. if the job is scheduled for 00:02, and I trigger it manually at 00:01 so it’s still running at 00:02, there is only one job executed during that day)
    y
    • 2
    • 2
  • s

    Scott Hood

    09/09/2022, 5:20 PM
    Hey all, I am looking to produce an asset as the very last step of a dagster job, is this possible or is my understanding of assets flawed.
    y
    • 2
    • 2
  • j

    jose

    09/09/2022, 7:48 PM
    Hi all, I have defined the config_schema of a @op to be a Enum , as documented here What should I have in the @job config ? I want to exhibit all the Enum options in dagit, and right now I am only able to have a default value
    y
    d
    • 3
    • 5
  • t

    Tom Reilly

    09/09/2022, 10:48 PM
    trying to use dagster-celery and get error
    operation queue.declare caused a channel exception precondition_failed: inequivalent arg 'x-max-priority' for queue 'dagster' in vhost '/': received the value '10' of type 'signedint' but current is none
    any ideas?
  • j

    Jiamin Chen

    09/10/2022, 1:17 AM
    hi all, I have a dbt project that includes models that should be partitioned at different time gran (hourly, daily, weekly) and want to load them as partitioned assets in the same dagster repo with corresponding shedulers. However I was not able to have jobs/schedulers that use a different time-gran partition than the assets and got a “non-matching partitions_def” error. Any walkaround so that I can load the dbt project and respect the partition dependencies for time window-partitioned assets (i.e. daily jobs wait on finish of all hourly partitions etc. ) as well as the dependencies defined within dbt?
  • g

    geoHeil

    09/10/2022, 6:44 AM
    FYI: open metadata supports dagster now: https://docs.open-metadata.org/openmetadata/connectors/pipeline/dagster has anyone already played with this and perhaps got it to work with dagster-cloud?
  • a

    Alan Dao

    09/11/2022, 6:08 AM
    can we define non-argument dependency for @ops
    :dagster-bot-resolve: 1
    :dagster-bot-not-a-thread: 1
  • a

    Alan Dao

    09/11/2022, 6:08 AM
    @op
    :dagster-bot-not-a-thread: 1
  • a

    Alan Dao

    09/11/2022, 6:08 AM
    i can’t quite find it in the doc, only way is to have data
    :dagster-bot-resolve: 1
    g
    s
    • 3
    • 4
  • b

    Baris Cekic

    09/11/2022, 10:57 AM
    hey folks, another pyspark on kubernetes question. As you may know pyspark on kubernetes with multiple executor requires a headless service ( docs here ) . I wonder if there is any out-of-box functionality to create a
    headless service
    that points to driver job/pod when the job is executed. Otherwise the spark executor pods can not communicate with dagster job/pod ( which is the spark driver ) .
    s
    • 2
    • 3
  • s

    Saad Anwar

    09/11/2022, 1:17 PM
    Hello, quick question about executing python scripts within Dagster. I have a python script that runs some logic, and want to integrate that script in my job / pipeline. I understand the basics of Software Defined Assets, but can anyone point me to some example code on how to load the python script into Dagster and execute it as an op? Thanks!
    :dagster-bot-responded-by-community: 1
    :dagster-bot-resolve: 1
    z
    • 2
    • 3
  • t

    Tamas Foldi

    09/11/2022, 6:59 PM
    seems dbt asset load's
    key_prefix
    does not work with seeds and snapshots. any easy ways to prefix all dbt outputs, not just models?
    👀 1
    s
    o
    • 3
    • 3
  • a

    Alexander Whillas

    09/12/2022, 4:24 AM
    HOW do environmental variables get added to new ECS task when a job is run? I have a situation where not all of the daemon/dagit/user-code env. vars. are getting added to the new task/container?
    :dagster-bot-resolve-to-issue: 1
    :dagster-bot-responded-by-community: 1
    g
    d
    +2
    • 5
    • 13
  • f

    Frank Dekervel

    09/12/2022, 7:58 AM
    Hello, i'm upgrading from dagster 0.14.19 to 1.0.8. My sensors don't work anymore. in the GUI they are listed as started, but the SensorDaemon says "dagster.daemon.SensorDaemon - INFO - Not checking for any runs since no sensors have been started."
    :dagster-bot-resolve: 1
    • 1
    • 3
  • t

    teodorpaius

    09/12/2022, 12:27 PM
    Hello i am encountering this issue: dagster._core.errors.DagsterLaunchFailedError: Error during RPC setup for executing run: dagster._check.CheckError: Failure condition: Couldn't import module dagster_postgres.run_storage when attempting to load the configurable class dagster_postgres.run_storage.PostgresRunStorage File "/usr/local/lib/python3.9/site-packages/dagster/_core/instance/__init__.py", line 1747, in launch_run self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace)) File "/usr/local/lib/python3.9/site-packages/dagster/_core/launcher/default_run_launcher.py", line 118, in launch_run DefaultRunLauncher.launch_run_from_grpc_client( File "/usr/local/lib/python3.9/site-packages/dagster/_core/launcher/default_run_launcher.py", line 86, in launch_run_from_grpc_client raise ( Was working previously on 1.0.7 but now with 1.0.8 seems to have broken. Were any changes included that break existing implementations? This is the workspace config i use:
    run_storage:
      module: dagster_postgres.run_storage
      class: PostgresRunStorage
      config:
        postgres_db:
          hostname:
            env: DAGSTER_POSTGRES_HOST
          username:
            env: DAGSTER_POSTGRES_USER
          password:
            env: DAGSTER_POSTGRES_PASSWORD
          db_name:
            env: DAGSTER_POSTGRES_DB
          port: 5432
    d
    s
    • 3
    • 4
  • g

    Gowtham Manne

    09/12/2022, 1:38 PM
    Hello All , Is there a way to to extract all logs from dagter and insert to MongoDB (python pymongo)?
    :dagster-bot-resolve: 1
    s
    • 2
    • 1
  • l

    Leo Qin

    09/12/2022, 2:30 PM
    hello - I am running aws athena on dagster serverless (via athena-dbt which uses pyathena) and getting the following error when running any query:
    botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: The S3 location provided to save your query results is invalid. Please check your S3 location is correct and is in the same region and try again. If you continue to see the issue, contact customer support for further assistance.
    , if I run the image and run it locally this error doesn't happen, any ideas what is happning?
    d
    p
    j
    • 4
    • 99
  • s

    Slackbot

    09/12/2022, 2:31 PM
    This message was deleted.
    s
    • 2
    • 2
  • g

    Gowtham Manne

    09/12/2022, 3:05 PM
    Hi All, I am trying to format my logs using
    from dagster import op, job, repository, In,Out, OpExecutionContext, logger
    from pydantic import Field
    import logging
    import json
    
    
    
    
    @logger(
        {
            "log_level": Field(str, is_required=False, default_value="INFO"),
            "name": Field(str, is_required=False, default_value="dagster"),
        },
        description="A JSON-formatted console logger",
    )
    def json_console_logger(context : OpExecutionContext):
        level = context.logger_config["log_level"]
        name = context.logger_config["name"]
    
        klass = logging.getLoggerClass()
        logger_ = klass(name, level=level)
    
        handler = logging.StreamHandler()
    
        class JsonFormatter(logging.Formatter):
            def format(self, record):
                return json.dumps(record.__dict__)
    
        handler.setFormatter(JsonFormatter())
        logger_.addHandler(handler)
    
        return logger_
    
    
    
    
    @op(
        name="MyCOperationInput",
        out={"result_str":Out(dagster_type=str)},
        config_schema={"name": str}
    )
    def my_C_op_input(context:OpExecutionContext):
        name = context.op_config["name"]
        <http://context.log.info|context.log.info>(f"My name is {name} in MyCOperationInput")
        return name
    
    
    @op(
        name="MyCOperation",
        ins={"result_str":In()},
        out={"result":Out(dagster_type=str)}
    )
    def my_C_op(context:OpExecutionContext,result_str : str):
        print(context)
        print(result_str)
        <http://context.log.info|context.log.info>(f"My name is {result_str} in MyCOperation")
        return result_str
    
    @job
    def my_C_job():
        print("job started")
        my_C_op(my_C_op_input())
    
    @repository(default_logger_defs={"json_logger": json_console_logger})
    def my_C_repo():
        return [my_C_job]
    but getting be error
    dagster.core.errors.DagsterInvalidConfigDefinitionError: Error defining config. Original value passed: {'log_level': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}), 'name': FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'dagster'})}. Error at stack path :log_level. FieldInfo(default=<class 'str'>, extra={'is_required': False, 'default_value': 'INFO'}) cannot be resolved. This value can be a: - Field - Python primitive types that resolve to dagster config types - int, float, bool, str, list. - A dagster config type: Int, Float, Bool, Array, Optional, Selector, Shape, Permissive, Map - A bare python dictionary, which is wrapped in Field(Shape(...)). Any values in the dictionary get resolved by the same rules, recursively. - A python list with a single entry that can resolve to a type, e.g. [int]
    z
    z
    s
    • 4
    • 8
  • s

    sar

    09/12/2022, 3:23 PM
    Still struggling to build the following which sounds like it should be simple, yet can’t seem to get the right pieces working. I have several Airbyte connectionIDs along with associated dbt models each in its own subfolder. I have a yaml file with the correct mappings of connectionIDs and dbt model folder path. What I’m looking at doing is generating separate pipelines for each connectionID so thatI can trigger a sync and run the dbt model whenever I choose. I have been able to write a graph that creates a job that ends up reading the yaml file and running the syncs and dbt models for all the mappings in the yaml file, however that’s not what i’m looking to do as I want individual and separate pipelines created from that template.
    s
    o
    • 3
    • 4
  • o

    Olivier Doisneau

    09/12/2022, 3:40 PM
    I was looking into putting the repo containers in ECS behind an ALB with GRPC. Has anyone does this successfully?
    :dagster-bot-responded-by-community: 1
    :dagster-bot-resolve: 1
    v
    b
    • 3
    • 12
Powered by Linen
Title
o

Olivier Doisneau

09/12/2022, 3:40 PM
I was looking into putting the repo containers in ECS behind an ALB with GRPC. Has anyone does this successfully?
:dagster-bot-responded-by-community: 1
:dagster-bot-resolve: 1
v

Vinnie

09/12/2022, 3:49 PM
Why do you wanna put them behind an ALB? I have mine behind cloudmap with servicediscovery, that way they’re not exposed outside the VPC
b

Bianca Rosa

09/12/2022, 4:11 PM
also doing service discovery here
o

Olivier Doisneau

09/12/2022, 4:21 PM
tried to avoid sidecars which I think is required with service discovery
b

Bianca Rosa

09/12/2022, 4:23 PM
i dont think it is - or i missed something. it’s just a config i had to terraform for the ecs service.
v

Vinnie

09/12/2022, 4:23 PM
She’s correct, no sidecars on my config either
o

Olivier Doisneau

09/12/2022, 4:24 PM
so ALB would be private. I also can have non-fargate containers with diff. ports because we are avoiding fargate as a company standard and I believe fargate keeps it as a single port (4000) when using service discovery.
oh good. but both of you are using fargate?
if having 1 service using fargate allows me to do all without sidecars i might be able to convince others....
b

Bianca Rosa

09/12/2022, 4:25 PM
yep!
i’m using fargate
o

Olivier Doisneau

09/12/2022, 4:26 PM
ok well let me see about fargate, then using service discovery and using the cloud map endpoint without sidecar.
thank you very much
View count: 3