https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • p

    Pablo Beltran

    10/07/2022, 1:08 AM
    Hey there would there be any issues with using a step launcher in conjunction with dagstermill?
    s
    • 2
    • 6
  • p

    peay

    10/07/2022, 10:26 AM
    Hello, I am hitting a weird issue with asset configuration. Trying https://docs.dagster.io/1.0.6/concepts/assets/software-defined-assets#asset-configuration with configuration where all fields have default values,
    context.op_config
    is
    None
    when I try and materialize an asset without providing any configuration override. However,
    context.op_config
    is correctly set if I do override at least one field from the materialize launchpad. Is this a known bug? Running Dagster 1.0.6
    s
    • 2
    • 5
  • a

    Archie Kennedy

    10/07/2022, 10:30 AM
    Hello. Is there a way to have a
    Permissive
    dict with 1 required field (op config_schema). Example:
    @op(
        config_schema={
            "payload": Field(
                Permissive(
                    {
                        "URI": Field(str, is_required=True),
                        "Webhook": Field(str, is_required=False),
                    },
                )
            )
        }
    )
    s
    • 2
    • 3
  • c

    Celio de Assis Picanco

    10/07/2022, 12:07 PM
    Hello everyone, I have a few beginner question. 1. How do I access assets once I’ve materialized them? 2. How do I pass an asset as an argument to an op? 3. How can I change the storage destination? To give you some context: I’m trying this:
    @asset
    def raw_users():
    
        string_data = io.String()
    
        user_data = pd.read_csv(FILE_PATH)
    
        return user_data
    (I haven’t really started playing with the configuration yet, so FILE_PATH is just a hard coded variable). When I run it in dagit, dagster creates a folder called storage in the directory I’m running the pipeline from as well as a file called raw users in the rencetly created storage directory. The problem is: I can’t open the asset I’ve just created. I see that dagster generated a file, but it seems to be a bytes file of a pandas DataFrame that I can’t manage to open. Which brings me to my three questions: how can I customise the storage destination and how can I open it from a job, so that I can access it from an op? Thanks in advance for any help.
    :dagster-bot-resolve: 1
    d
    s
    • 3
    • 2
  • l

    Lucas Gabriel

    10/07/2022, 12:40 PM
    Is there a way to check if Dagster will compile/run correctly (only compile/run, not jobs execution errors). I need to check for this in my CI/CD pipeline. I'm currently just running my repo file like this:
    python .\repo.py
    Is there a better away ?
    :dagster-bot-resolve: 2
    s
    d
    t
    • 4
    • 8
  • t

    Tom Reilly

    10/07/2022, 3:16 PM
    Is there a way for assets to execute before all inputs are resolved? We have an asset called
    acquisition_url_table
    that logs files we need to download and a downloading service will periodically check the table and download new files. Numerous assets across different jobs can write to this table, and it's also possible to write to the table at different steps within the same job. Ideally, we'd like the
    acquisition_url_table
    asset to write records whenever any of its inputs are received and not have to wait for all of its inputs. How would one go about this? Should the assets which create the records also write them to the table (i.e
    some_unique_identifier_acquisition_url_records
    ,
    another_unique_identifier_acquisition_url_records
    )? The latter seems easy to implement but less desirable since useful information regarding the underlying table would be scattered across many assets, whereas the former would allow a single asset to hold all metadata/materialization info in a single asset.
    :dagster-bot-resolve: 1
    z
    • 2
    • 2
  • z

    Zach P

    10/07/2022, 3:58 PM
    How do I go about testing a multi asset sensor? I’m trying to build up a test case that checks to make sure the run requests are all valid, but I’m having some issues pretty early on. Essentially, I have a dummy job, asset, and repo and I’m trying to test them with my sensor but get a “attempted to init dagster instance, but not instance reference was provided”. However; I can’t use an ephemeral instance either. am I missing something? 🤔 Code in thread
    :dagster-bot-resolve: 1
    s
    c
    • 3
    • 7
  • z

    Zachary Bluhm

    10/07/2022, 4:28 PM
    Is there a way to launch a backfill for a selected asset and all of its downstreams?
    ➕ 1
    s
    • 2
    • 1
  • s

    Steven Tran

    10/07/2022, 7:13 PM
    Does dagster-daemon have a health check endpoint?
    j
    • 2
    • 2
  • m

    Meghan Heintz

    10/07/2022, 7:29 PM
    We just started our evaluation of Dagster and I'm running into some installation errors that seem to stem from grpcio. Below is the error when I try installing grpcio directly. Have ya'll seen issues with grpcio before? I can install an earlier version (1.11.0) but neither the min or max version specified in the setup file.
    ERROR: Command errored out with exit status 1: /Users/meghanheintz/miniconda3/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-install-gqx_efxj/grpcio_4d5a9bdddbb34530a68d96f204966b96/setup.py'"'"'; __file__='"'"'/private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-install-gqx_efxj/grpcio_4d5a9bdddbb34530a68d96f204966b96/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-record-uof6h8de/install-record.txt --single-version-externally-managed --compile --install-headers /Users/meghanheintz/miniconda3/include/python3.8/grpcio Check the logs for full command output.
    a
    d
    d
    • 4
    • 12
  • s

    Sean Lindo

    10/07/2022, 7:46 PM
    Hey all, does anyone have a working Dockerfile and Docker-Compose file you’re using for local development? Mind sharing? :face_holding_back_tears:
    👀 1
    v
    • 2
    • 1
  • s

    Stephen Bailey

    10/07/2022, 7:53 PM
    I updated our Helm deployment from 0.15.8 to 1.0.11 today, and while the user-cloud-agent looks to be running normally on the k8s cluster, I'm not seeing any connectivity to Dagster Cloud within a branch deployment. Does anyone have recommendations on how to troubleshoot this?
    ✅ 1
    d
    • 2
    • 9
  • g

    Giovanni Paolo

    10/07/2022, 9:25 PM
    is there something like
    with_resources
    but for jobs?
    s
    • 2
    • 3
  • a

    Apoorv Yadav

    10/07/2022, 9:45 PM
    Hi i am new to dagster. I wanted to know like let's suppose we have 5 ops and our op at 3rd fails so is it possible to restart op at 2nd level by taking the saved metadata and then using it to restart it
    s
    • 2
    • 1
  • s

    sarah

    10/08/2022, 12:00 AM
    I’m using @run_failure_sensor to send slack messages when jobs fail (Dagster 1.0.11), but I”m running into the sensor timeout issue:
    grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exceeded"
    debug_error_string = "{"created":"@1665128988.955836327","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
    I believe this sensor (@run_failure_sensor) handles the cursor? We do have a high volume of runs at certain times, so maybe this is causing the timeout. Or maybe there is an issue with the cursor. I do get slack messages about the same job failure repeatedly (although the sensor times out, a slack message with the one failed job arrives every 5-20 minutes, — so it looks like the cursor doesn’t get updated, probably because the sensor fails). Any ideas on how to fix this? I was also unable to monitor the sensors (or turn them off) in Dagit while this was happening (I turned the sensor off after connecting with the pod running the daemon using dagster sensor stop …, after which I was able to view sensor information again in dagit).
  • g

    GTC

    10/08/2022, 9:01 AM
    I would like to defined a sensor to launch a partitioned asset job whenever a new file was created under a given directory. I have no idea how to define partition definition for 1 file = 1 partition asset jobs. Is there any example for me to refer to??
    s
    • 2
    • 2
  • s

    Spencer Guy

    10/08/2022, 3:16 PM
    Hi again! Looking for some guidance on how to submit run requests from outside of dagit via direct API call or something similar. My dagster setup is on ec2 in a multi container environment using docker-compose. I'm essentially looking to enable another internal application to be able to submit runs, similiar to how I would in the launchpad/playground section but outside of the dagit/dagster containers
    z
    • 2
    • 2
  • s

    Spencer Guy

    10/08/2022, 3:40 PM
    Is it possible to submit runs to the Run Coordinator from an external application?
    a
    • 2
    • 2
  • s

    Sean Lindo

    10/08/2022, 7:59 PM
    Hey guys…do I need a Dockerfile for every repo? I’m running a multi container config but I’m not seeing how to load multiple repositories in one grpc startup command
    • 1
    • 1
  • s

    Simon Vadée

    10/10/2022, 9:01 AM
    Hi everyone! I’m using the
    dagster_dbt
    integration with a (rather complex) dbt project with multiple dependencies. I use
    load_assets_from_dbt_project
    to generate a graph for all of my models (for by base dbt project and all of its dependencies). When I try to run a materialisation for a single node of the graph, I get
    dagster._core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
    In process 79: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: run_dbt_<package>
    where
    <package>
    is sometimes wrong (ie: unrelated to my node). I went digging into the code and I think the issue comes from
    dgster_dbt/asset_defs.py#_dbt_nodes_to_assets
    in which the package name is computed from the graph of dependencies but happens to be undeterministic, hence producing the error I printed above. My dbt project runs fine when using only dbt without dagster. I also tried removing all dependencies but one and the issue does not appear anymore. Can someone help plz 🤓 ?
    dagster-dbt==0.16.12
    dbt-core==1.2.2
    :helpwanted: 1
    🆘 1
    👍 1
    y
    o
    • 3
    • 5
  • a

    Archie Kennedy

    10/10/2022, 9:18 AM
    Hello all. I'm using the
    dagster_aws.s3.s3_pickle_io_manager
    for multiple jobs, using the same bucket. Do I need to use an s3_prefix for each job? I don't want collisions 🙂
    :dagster-bot-responded-by-community: 1
    v
    • 2
    • 2
  • a

    Archie Kennedy

    10/10/2022, 1:20 PM
    What are the default dagster-k8s resource limits? I ask because 10 concurrent pipelines are a little slow on my EKS cluster, I would like to up the resources if possible.
    y
    • 2
    • 1
  • e

    Eugenio Contreras

    10/10/2022, 1:47 PM
    Hey team! I have a quick question that I couldn't find in the documentation. If I have a job with a retry policy max retries 3, and this job is linked to a sensor on failure. Will the sensor be executed and log the error 3 times or just the last one?
    y
    • 2
    • 1
  • g

    Giovanni Paolo

    10/10/2022, 2:14 PM
    is dagster-slack not compatible with dagster newer than 1.0.5?
    y
    • 2
    • 8
  • g

    Geoffrey Greenleaf

    10/10/2022, 2:26 PM
    I am running into an error where a subset of assets fail to materialize. I thought I had a similar issue before and fixed it by removing $SPARK_HOME env variable that was set. but after making some changes to some of my io_managers I am seeing the error again. I am trying to run some of the materialization on databricks using the databricks step launcher. I think it may be related to related to how my assets and required_resource_keys are configured. Maybe something locally is running before the work gets submitted to databricks cluster?
    py4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster/_core/errors.py", line 184, in user_code_error_boundary
        yield
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster/_core/execution/resources_init.py", line 325, in single_resource_event_generator
        resource_def.resource_fn(context)
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 54, in pyspark_resource
        return PySparkResource(init_context.resource_config["spark_conf"])
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 21, in __init__
        self._spark_session = spark_session_from_config(spark_conf)
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 16, in spark_session_from_config
        return builder.getOrCreate()
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 269, in getOrCreate
        sc = SparkContext.getOrCreate(sparkConf)
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 483, in getOrCreate
        SparkContext(conf=conf or SparkConf())
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 197, in __init__
        self._do_init(
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 282, in _do_init
        self._jsc = jsc or self._initialize_context(self._conf._jconf)
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 402, in _initialize_context
        return self._jvm.JavaSparkContext(jconf)
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1585, in __call__
        return_value = get_return_value(
      File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
        raise Py4JError(
    :dagster-bot-responded-by-community: 1
    z
    • 2
    • 2
  • j

    James Hale

    10/10/2022, 2:34 PM
    I've tracked this down to being an issue with how dependencies are resolved - specifying the upstream dependency in
    ins
    resolves the issue:
    @asset(
        name="cdrs",
        key_prefix=PREFIX,
        required_resource_keys={"atscall"},
        io_manager_key="atscall_snowflake_io_manager",
        ins={"cdrs_search_start_time": AssetIn(key=AssetKey(PREFIX + ["cdrs_search_start_time"]))}
    )
    def get_recent_cdrs(context, cdrs_search_start_time):
        # ...
  • i

    Issac Loo

    10/10/2022, 2:38 PM
    This question was asked a couple years ago, but wondering if there were any updates on this: • Is there a GCS equivalent to get_s3_keys?
    y
    d
    • 3
    • 4
  • d

    Davi

    10/10/2022, 2:55 PM
    Hello, I've set a schedule to run a job everyday at 14:00. However, I would like to run it manually whenever I want to run some tests. Is there possible ? When I try to run manually, it keeps 'starting' but never runs. I suppose that's because it's waiting the schedule time. Thanks !
    y
    • 2
    • 1
  • z

    Zach

    10/10/2022, 5:42 PM
    for tag-based job retries, do they have to be defined within the JobDefinition? or can they be added via the launchpad prior to launching a job which doesn't have the retry tags specified in the JobDefinition?
    r
    j
    d
    • 4
    • 7
  • a

    Abhijeet Singh

    10/10/2022, 9:46 PM
    Hi folks, I was thinking of deploying Dagster based ML pipelines on our on-premise service and was thinking of going the route of deploying a container for dagster-daemon and another one for running and hosting the business logic. Another container perhaps for Dagit if required for debugging, otherwise it seems to be optional. Jobs would run with a multi-process executor in one container itself. Is there any data/documentation on performance of Dagster (daemon) and the memory/cpu footprint and its stability? Mine is not a cloud deployment and is used for light-weight, but sometimes long running, jobs. Can dagster-daemon simply be restarted if it crashes? What about the jobs that are running at that time? Thank you for your time!
    d
    • 2
    • 3
Powered by Linen
Title
a

Abhijeet Singh

10/10/2022, 9:46 PM
Hi folks, I was thinking of deploying Dagster based ML pipelines on our on-premise service and was thinking of going the route of deploying a container for dagster-daemon and another one for running and hosting the business logic. Another container perhaps for Dagit if required for debugging, otherwise it seems to be optional. Jobs would run with a multi-process executor in one container itself. Is there any data/documentation on performance of Dagster (daemon) and the memory/cpu footprint and its stability? Mine is not a cloud deployment and is used for light-weight, but sometimes long running, jobs. Can dagster-daemon simply be restarted if it crashes? What about the jobs that are running at that time? Thank you for your time!
I ran
dagster-daemon run
with an empty dagster yaml file and memory footprint seemed to increase by 100 MB. The increase was an extra 200 MB when I ran dagit. Note that this without any load or job running. I guess the memory needed by storage (postgres etc) needed by daemon will increase this number. The above numbers seem to be a lot for my use-case. Is there a way to run jobs with dagster in a more lightweight manner? Is dagit required in every deployment?
d

daniel

10/17/2022, 10:14 PM
Hi Abhijeet - I can speak to the crash recovery part. The daemon is designed to pick up right back where it left off if it crashes and restarts - any schedules/sensors/etc. should continue just as they did before it stopped. As long as you're using any of our run launchers other than the default one, the daemon crashing should also have no affect on any running jobs - they'll be running in their own containers without any dependency on the daemon once they launch.
a

Abhijeet Singh

10/19/2022, 9:32 PM
Thanks Daniel!
View count: 2