https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • s

    Sanidhya Singh

    05/31/2022, 5:53 AM
    Hello, what’s the correct way to pass a 2d-list as an op output?
    r
    c
    • 3
    • 3
  • a

    Abednego Santoso

    05/31/2022, 9:43 AM
    hi everyone, do you know what this is and how do I get rid of this (highlighted in red)? As far as I know, these are
    jitter
    and
    backoff
    from
    with_retry_policy
    function. I have already removed these but the gap still appears. Thank you!
    ❤️ 1
    r
    • 2
    • 3
  • t

    Thierry Hue

    05/31/2022, 9:47 AM
    Hello I'm facing an issue while trying to upgrade Dagster to 0.14.17 when I define my container image in my
    values.yaml
    the following way, all is good
    dagster:
      dagster-user-deployments:
        enabled: true
        deployments:
          - name: "my-data-pipelines"
            image:
              repository: "myrepo/mycontainer"
              tag: local
    The image is good:
    image: "myrepo:local"
    But when I try to use my version instead of local like 20220531.1:
    dagster:
      dagster-user-deployments:
        enabled: true
        deployments:
          - name: "my-data-pipelines"
            image:
              repository: "myrepo/mycontainer"
              tag: "20220531.1"
    The image is wrong
    image: "myrepo/mycontainer:\"20220531.1\""
    and the image cannot be found in the repo because the image should be
    image: "myrepo/mycontainer:20220531.1"
    This was working as expected in version 0.14.3 What should I do instead?
    :dagster-bot-resolve: 1
    d
    r
    • 3
    • 11
  • v

    Vlad Efanov

    05/31/2022, 10:23 AM
    Hi everyone I want to create a job with multiple operations where a few operations start another job and wait until the job is completed. Is there a built-in function in dagster, or do I just have to programmatically submit the job from Op and wait until the status of the submitted job changes to a success/failed/canceled status?
    :dagster-bot-resolve: 1
    o
    • 2
    • 4
  • d

    Daniel Mosesson

    05/31/2022, 10:46 AM
    We currently use database cursors for postgres as a resource, and it works, but it leads to use having to pass that around between functions, and there is a split between the functions that take
    context
    and can do the logging, etc, and the utility functions that don't take a context that can't do logging the same way (I suppose I could pass the context object around as well, but that is also not ideal.) 1. Is this a problem that writing an IO manager would help with? Most of what all of these functions are doing is helping store or retrieve data. a. Most cases would be get/update a table, but there are some cases where I need to execute a custom query. Would that be possible? 2. How difficult is it to create an IO manager that does this? I looked at the code for
    fs_io_manager
    (https://github.com/dagster-io/dagster/blob/8e8ee8537146aad35d3dd75e181f998fd989325f/python_modules/dagster/dagster/core/storage/fs_io_manager.py) and it looks straightforward, but is there something that makes my use case different?
    :dagster-bot-resolve: 1
    o
    • 2
    • 7
  • m

    Mykola Palamarchuk

    05/31/2022, 11:30 AM
    Hi! I'd like to contribute and add some changes to dagster helm chart. But contribution guide does not cover helm chart testing. What I'm trying to do: add parameters to deployments helm chart to be able to set up rbac for launcher if it runs in separate namespace.
    :dagster-bot-resolve: 1
    r
    • 2
    • 2
  • k

    Kobroli

    05/31/2022, 11:52 AM
    Hey all, I was wondering about 3 issues: 1. Is there a way to disable daemons selectively? In my case, I only need the run queue daemon 2. By default, dagster will place all kinds of storages into the DAGSTER_HOME directory. Is there a way to place all of these storages into a custom directory? 3. Similar to 2, but is it even possible to explicitly ask for an ephemeral instance in
    dagster.yaml
    (such that run concurrencies can be defined, but no storages are persisted to disk)? Thanks a lot in advance!
    d
    o
    • 3
    • 16
  • k

    Kayvan Shah

    05/31/2022, 1:09 PM
    In UI only the first repo shows up?
    from dagster import repository
    
    from demo.jobs.say_hello import say_hello_job
    from demo.schedules.my_hourly_schedule import my_hourly_schedule, my_schedule
    from demo.sensors.my_sensor import my_sensor, mysensor
    
    from demo.jobs.cereal_diamond import diamond
    
    
    @repository
    def demo():
        """
        The repository definition for this demo Dagster repository.
    
        For hints on building your Dagster repository, see our documentation overview on Repositories:
        <https://docs.dagster.io/overview/repositories-workspaces/repositories>
        """
        jobs = [say_hello_job]
        schedules = [my_hourly_schedule]
        sensors = [my_sensor]
    
        return jobs + schedules + sensors
    
    
    @repository
    def cereals():
        jobs = [diamond]
        schedules = [my_schedule]
        sensors = [mysensor]
    
        return jobs + schedules + sensors
    :dagster-bot-resolve: 1
    r
    • 2
    • 6
  • m

    Martin Remy

    05/31/2022, 1:37 PM
    Hello Dagster users and team, We are currently trying to onboard on Dagster with our new Pyspark pipelines running on AWS EMR. I am currently doing a PoC of your solution and trying to demonstrate that we can execute an
    op
    on our EMR cluster. I use the
    emr_pyspark_step_launcher
    demonstrated in your documentation here. I strictly copied this code and simply replaced values to match my current EMR cluster. Then I created a test case using Pytest to try and launch it. My current version is :
    import pytest
    
    from dagster import execute_pipeline, reconstructable
    from pipeline.data.jobs.pyspark_dagster import make_and_filter_data_emr
    
    def test_emr():
        result = execute_pipeline(reconstructable(make_and_filter_data_emr))
        assert result.success
    I've tried both with
    execute_pipeline
    and
    execute_in_process
    , I always have different errors raised by Dagster. Is that documentation up to date ? Is it suppose to work out of the gate ? Do you have other documentation I could look into or a solution ?
    d
    o
    • 3
    • 3
  • s

    Samuel Stütz

    05/31/2022, 1:40 PM
    How do I set a list of default values.
    Field(
            Noneable(list),
            default_value=["GIT_HASH", "DAGSTER_VERSION"],
            is_required=False,
            description="Li.....",
     ),
    I tried many different versions … all say invalid configuration
    z
    o
    • 3
    • 12
  • l

    Liezl Puzon

    05/31/2022, 2:37 PM
    why is RunsFilter.job_name giving me
    field \"jobName\": Unknown field.
    in graphql?
    :dagster-bot-resolve: 1
    ✅ 1
    d
    • 2
    • 4
  • l

    Liezl Puzon

    05/31/2022, 2:53 PM
    edit: my bad, used the wrong filter graphql “limit” not respecting higher limit — I always get 250 items back
    :dagster-bot-resolve: 1
    ✅ 1
    d
    • 2
    • 4
  • j

    Jordan

    05/31/2022, 3:36 PM
    Hi ! I have a job that has dynamic op's, and I would like each op to return an
    asset_event
    I'm using an
    asset_sensor
    for this but unfortunately only the last op that finishes makes the request. I have the impression that the context is overwritten each time an op executes. Is it possible to have several
    AssetMaterialisation
    in the same context to make sure that all the requests are executed and not just one?
    @op
    def my_op_0(context):    
      for date in ["2022-05-29", "2022-05-30", "2022-05-31"]:
        yield DynamicOutput(value = date, mapping_key = date.replace('-',''))
        
    @op
    def my_op_1(context, date : str):
        context.log_event(
          AssetMaterialization(asset_key="my_asset_0", partition = date)
        )
     
    @job() 
      def my_job():
          my_op_0().map(my_op_1)
     
     
    my_asset_job_0 = build_assets_job("my_asset_job_0", assets=[my_asset_0])
    my_asset_job_1 = build_assets_job("my_asset_job_1", assets=[my_asset_1])
     
    @asset_sensor(asset_key=AssetKey("my_asset_0"), job=my_asset_job_1)
    def my_asset_sensor(context, asset_event):
        partition = asset_event.dagster_event.event_specific_data.materialization.partition
        request = my_asset_job_1.run_request_for_partition(partition_key=partition, run_key=context.cursor)
        yield request
    o
    d
    • 3
    • 11
  • h

    Hebo Yang

    05/31/2022, 4:28 PM
    Hi team, hope you have had a great long weekend! I am wondering if there is a way (graphql?) to get Dagit action audit events please? (i.e. sensors turn on/off, start/stop jobs etc.)
    d
    • 2
    • 1
  • s

    Shriram Holla

    05/31/2022, 5:19 PM
    Hi! Is it a good practice to use
    os.system
    within an op instead of a
    shell_op
    ? My use case didn’t really fit well with
    shell_op
    .
    :dagster-bot-resolve: 1
    o
    • 2
    • 2
  • s

    Shriram Holla

    05/31/2022, 6:55 PM
    Hi! Is there a way to run a job in batches? For example, if I have 100000 records that need to be processed, is there a way to pass 100 records at a time through a job? And is it possible to do this at scale?
    o
    • 2
    • 1
  • a

    Aaron Bailey

    05/31/2022, 7:35 PM
    Think this will be a pretty easy one. I have an s3 sensor set to monitor for a new file being dropped. Every time i turn the daemon on - it starts it's corresponding job 8 times. Im just confused where to set the limit. I mean i understand if by turning on the daemon it sees the file as new - and starts it once. But can't figure out why its requesting 8 times.
    r
    • 2
    • 1
  • i

    ibrahem

    05/31/2022, 7:40 PM
    HI everyone I want to run dbt using dagster_dbt but I'm having an issue with accessing dbt's python venv. Dagster and dbt are running in the same server, but each tool has its own venv I want to know how can I run dbt through Dagster job using the dbt's python virtual environment? Is it possible to just point to the dbt venv path similarly to what we do in profiles_dir and project_dir under dbt_cli_resource.configured ? appreciate your help
    a
    • 2
    • 9
  • g

    Gowtham Manne

    05/31/2022, 8:11 PM
    Hi Everyone Can I know if we can start dagster and dagit, programmatically ?
    s
    d
    • 3
    • 47
  • d

    Daniel Mosesson

    05/31/2022, 9:13 PM
    I want to pass a list of
    CustomType
    via config. I tried annotating
    CustomType
    with
    @usable_as_dagster_type
    and get the same error message: Input <foo> in op 'myOp' is not connected to the output of a previous node and can not be loaded from configuration, making it impossible to execute. Possible solutions are: • add a dagster_type_loader for type <CustomType> • connect <foo> to the output of another node I am trying to set this from a sensor if that makes sense
    o
    • 2
    • 5
  • z

    Zach

    05/31/2022, 9:14 PM
    I'm having an issue with getting my Dagit container restarted - not sure exactly what changed, I hadn't done anything to change the dockerfile:
    dagster_docker_dagit  | Traceback (most recent call last):
    dagster_docker_dagit  |   File "/usr/local/bin/dagit", line 5, in <module>
    dagster_docker_dagit  |     from dagit.cli import main
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/dagit/cli.py", line 21, in <module>
    dagster_docker_dagit  |     from .app import create_app_from_workspace_process_context
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/dagit/app.py", line 11, in <module>
    dagster_docker_dagit  |     from .webserver import DagitWebserver
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/dagit/webserver.py", line 7, in <module>
    dagster_docker_dagit  |     import nbformat
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/nbformat/__init__.py", line 9, in <module>
    dagster_docker_dagit  |     from traitlets.log import get_logger
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/traitlets/__init__.py", line 5, in <module>
    dagster_docker_dagit  |     from ._version import __version__, version_info
    dagster_docker_dagit  |   File "/usr/local/lib/python3.9/site-packages/traitlets/_version.py", line 7, in <module>
    dagster_docker_dagit  |     assert __version__ == (
    the docker compose logs seem to cut off there
    :dagster-bot-resolve: 1
    r
    a
    • 3
    • 8
  • e

    Eric Larson

    05/31/2022, 9:38 PM
    I’m using software defined assets and S3 pickle IO manager for storage. If I materialize assets together top to bottom in one go the run succeeds. But if I say materialize the first asset in a run, then materialize the second asset after that has finished, I get a failure
    boto3 key not found
    It looks like the storage of the assets for S3 each run makes a subdirectory and stores them in there, but when a new run kicks off it doesn’t look in the directory of the original run that produced the upstream asset.
    :dagster-bot-resolve: 1
    o
    • 2
    • 2
  • s

    Spencer Guy

    05/31/2022, 10:31 PM
    Hello Dagster Team, looking for some help with a multi container deployment. I'm using docker-compose to build and run jobs following the example here https://github.com/dagster-io/dagster/tree/0.14.17/examples/deploy_docker. What I would like to figure out is how to setup an additional user code container and be able to use this when launching specific jobs. Following the comments in the docker-compose setup it seems possible but unsure of how to launch a job to a specific user code container. Thanks in advance!
    z
    • 2
    • 4
  • s

    Son Giang

    06/01/2022, 4:20 AM
    Hi there, is there anyway to treat backfill run as a single unit. So when all the jobs in backfill run are finished, we can trigger a sensor/job after that. And if a job in backfill run is failed, it means that the whole backfill run is failed so it will not trigger the sensor/job.
    s
    • 2
    • 2
  • c

    Caleb Fornari

    06/01/2022, 7:37 AM
    Hey All, we have been evaluating Dagster for our data warehouse project and so far we have been really impressed. We ran into one issue though that's currently preventing us from bringing this to prod. I opened a PR with the fix, its simple so really hoping it can get into the next release. PR is here: https://github.com/dagster-io/dagster/pull/8142 If this needs further work or anything else from me please let me know. We can of course use the forked Helm chart but would rather do this the right way if possible. Istio is a mandatory requirement for us since it handles our end to end encryption.
    :dagster-bot-resolve: 1
    ❤️ 1
    o
    • 2
    • 2
  • k

    Kayvan Shah

    06/01/2022, 9:18 AM
    Can there be a job creating and triggering other jobs Some kind of mapping based upon inputs
    :dagster-bot-responded-by-community: 1
    i
    • 2
    • 1
  • k

    Kayvan Shah

    06/01/2022, 9:58 AM
    /home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py:93: UserWarning: While in @job context 'ticker_single_pipeline', received an uninvoked op 'get_ticker_data'.
      warnings.warn(warning_message.strip())
    /home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/context.py:554: UserWarning: Error loading repository location demo:dagster.core.errors.DagsterInvalidDefinitionError: In @job ticker_single_pipeline, received invalid type <class 'str'> for input "start_date" (passed by keyword) in op invocation "get_ticker_data". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
    
    Stack Trace:
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 224, in __init__
        self._loaded_repositories = LoadedRepositories(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 98, in __init__
        loadable_targets = get_loadable_targets(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets
        else loadable_targets_from_python_package(package_name, working_directory)
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 48, in loadable_targets_from_python_package
        module = load_python_module(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module
        return importlib.import_module(module_name)
      File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
      File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
      File "<frozen importlib._bootstrap_external>", line 850, in exec_module
      File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
      File "/home/kayvan/projects/dagster-demo/demo/demo/__init__.py", line 1, in <module>
        from .repository import demo, yahoo_finance
      File "/home/kayvan/projects/dagster-demo/demo/demo/repository.py", line 4, in <module>
        from demo.jobs.yahoofin.multi_ticker import ticker_mutli_pipeline
      File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/multi_ticker.py", line 2, in <module>
        from demo.jobs.yahoofin.single_ticker import ticker_single_pipeline
      File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 11, in <module>
        def ticker_single_pipeline(ticker_dict: dict):
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 204, in job
        return _Job()(name)
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 75, in __call__
        ) = do_composition(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 1012, in do_composition
        output = fn(**kwargs)
      File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 12, in ticker_single_pipeline
        data = get_ticker_data(ticker_dict, start_date="2022-05-01")
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 146, in __call__
        return super(SolidDefinition, self).__call__(*args, **kwargs)
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/node_definition.py", line 162, in __call__
        return PendingNodeInvocation(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 387, in __call__
        self._process_argument_node(
      File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 510, in _process_argument_node
        raise DagsterInvalidDefinitionError(
    
      warnings.warn(
    :dagster-bot-responded-by-community: 1
    z
    • 2
    • 6
  • l

    LP

    06/01/2022, 10:02 AM
    Hi All, I am facing a problem while trying an API call inside from
    Dagster Solid
    and it fails with
    DagsterExecutionInterruptedError
    at time.sleep(5). If API response time is fast it work properly but when API response time is slow it give me below error. Can anyone suggest me what wrong I am doing? Deployed on k8s and using 0.11.16.
    dagster.core.errors.DagsterExecutionInterruptedError
      File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
        for step_event in check.generator(step_events):
      File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 308, in core_dagster_event_sequence_for_step
        for user_event in check.generator(
      File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 63, in _step_output_error_checked_user_event_sequence
        for user_event in user_event_sequence:
      File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 138, in execute_core_compute
        for step_output in _yield_compute_results(step_context, inputs, compute_fn):
      File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 111, in _yield_compute_results
        for event in iterate_with_context(
      File "/usr/local/lib/python3.8/site-packages/dagster/utils/__init__.py", line 382, in iterate_with_context
        next_output = next(iterator)
      File "/dp/dp/pipeline/steps/ips/save_to.py", line 67, in save_to
        ingestion_data(ingest_data, file)
      File "/dp/dp/core/utils/task_ingestion.py", line 58, in ingestion_data
        task_status = tm.post_task(spec)
      File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 103, in post_task
        task_status = self.check_for_status(res['task'])
      File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 126, in check_for_status
        time.sleep(5)
      File "/usr/local/lib/python3.8/site-packages/dagster/utils/interrupts.py", line 78, in _new_signal_handler
        raise error_cls()
    d
    j
    • 3
    • 10
  • e

    Eric Larson

    06/01/2022, 2:32 PM
    With software defined assets, is there a way for an asset to access it’s previous value/materialization of itself? Two use cases would be to record metadata about what has changed since the last materialization, and the other as an option to just return the previous version if it is determined no update is needed, in the case of an expensive operation.
    :dagster-bot-resolve: 1
    o
    • 2
    • 2
  • c

    Charlie Bini

    06/01/2022, 4:03 PM
    is there a recommended method of providing
    context.log
    access to a resource?
    :dagster-bot-resolve: 1
    o
    • 2
    • 8
Powered by Linen
Title
c

Charlie Bini

06/01/2022, 4:03 PM
is there a recommended method of providing
context.log
access to a resource?
:dagster-bot-resolve: 1
so far, I've provided the Op context as a param to a method, but wondering if there's a built-in way using the
@resource
decorator
o

owen

06/01/2022, 4:13 PM
@resource(...)
def my_resource(context):
    return MyClass(..., logger=context.log)
should work!
you can also
from dagster import get_dagster_logger
, and inside the body of the resource call
get_dagster_logger().info("foo")
c

Charlie Bini

06/01/2022, 4:14 PM
awesome, the first example is what I was looking for. wasn't sure if that worked
✅ 1
so that
context
param on the resource def is the same one that contains
config_schema
right?
o

owen

06/01/2022, 4:17 PM
the context object is of type InitResourceContext, so it'll have a
resource_config
property (this is the resolved config value)
the
config_schema
is the schema for that config blob, and will be inside the argument to the
@resource
decorator
c

Charlie Bini

06/01/2022, 4:18 PM
gotcha, thanks for the reference
View count: 2