https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • m

    Makoto

    05/13/2021, 11:38 PM
    Hi. What’s an easy way to halt the execution of a pipeline run when a solid raises
    Failure
    ? I naively thought that would stop it, but it seems like I need to use conditional branching to achieve it? Is there an example somewhere? I am having a bit of hard time grasping how I can apply it.
    s
    • 2
    • 3
  • a

    Arun Kumar

    05/13/2021, 11:44 PM
    Hi, trying to run a Dagster POC on our Kubernetes cluster using the helm chart. We maintain our secrets using Vault and want to load the Postgres DB password from Vault. Is there any example on how this can be done in the Helm chart?
    n
    h
    • 3
    • 33
  • p

    Peter B

    05/14/2021, 3:07 AM
    Hey all, quick 1 - if we write pickles during intermediary load steps (e.g. extract, transform etc.) to the file system on a K8 Helm deployment in Dagster, are we not really able to do this given that one of multiple machines may service the pod in the back-end? I.e., is it better if we write these tables to storage e.g. postgres instead ? It's quite possible I've answered my own questions but looking for a second opinion from someone more experienced. Cheers
    y
    • 2
    • 2
  • a

    assaf

    05/14/2021, 3:47 PM
    I have a pipeline run that went on for long and generated A LOT of log events, and now when I try and open it in Dagit, I get this session timeout message. This is Dagster 0.10.4 running on Kubernetes, with the database being a RDS PostgreSQL instance (v12.5). The instance type was originally
    db.t3.micro
    , which is grossly underpowered. I recently scaled it up to
    db.m6g.large
    , restarted the Dagit deployment, yet the issue persists. I'd like some clues on how exactly this issue appears, and what I many be able to do about it.
    postgres.session-timeout-error-message.log
    j
    d
    a
    • 4
    • 6
  • c

    Cameron Gallivan

    05/14/2021, 10:14 PM
    Has anyone using PyCharm run into a the TypeChecker inspection getting triggered by different decorator arguments like this:
    Expected type '_SpecialForm[bool]', got 'bool' instead
    It mostly just happens when constructing an OutputDefinition but I’ve seen it trigger in the
    @resource
    decorator as well. The only problem it actually is causing is that it makes it more tedious looking for Inspections that actually matter.
    a
    • 2
    • 1
  • b

    Benj Smith

    05/16/2021, 2:21 AM
    Hi all, hitting a wall trying to set up a queued coordinator. Daemon running, dagit running, and ran the grpc with:
    dagster api grpc --python-file pipelines/my_repo.py --attribute my_repository --host 127.0.0.1 --port 8889
    But every pipeline fails with:
    AttributeError: module 'grpc' has no attribute 'insecure_channel'
    Seen this before?
    d
    • 2
    • 10
  • j

    john eipe

    05/16/2021, 8:00 AM
    Hi Team, Is it possible to create dynamic pipeline based off some metadata.
    eg: 
    task-group   seq
    A             1
    B             1
    C             2
    
    task   seq task-group
    a1       1.  A  
    a2       1.  A
    b1       1.  B
    b2       2.  B
    c3       1.  C
    
    task-group is only a logical wrapper that helps to group and control the execution flow, maybe composite solids are a fit here
    tasks are essentially solids, say
    
    @solid
    def a1():
      pass
    
    Execution flow:
    Task group A and B starts parallely (seq=1) and the task a1, a2 run parallel (seq=1) but b2 runs only after b1 is complete. c3 starts after tasks in Task group A and B are complete.
    Now based off this metadata that resides in file or DB - is it possible to create a dynamic pipeline?
    s
    m
    • 3
    • 4
  • d

    Drew Sonne

    05/17/2021, 12:29 PM
    I think I just found a bug in how custom materializers/loaders are used in
    dagster_pandas
    . I haven't done any development on dagster before, but I created a PR. I'm not sure how to do about this from here on out: https://github.com/dagster-io/dagster/pull/4179
    a
    • 2
    • 1
  • d

    Drew Sonne

    05/17/2021, 12:37 PM
    further from this, I'm getting an error when creating a custom pandas datatype if the dataframe has a Timestamp object in it.
    An unexpected exception was thrown. Please file an issue.
    TypeError: Object of type Timestamp is not JSON serializable
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 189, in _dagster_event_sequence_for_step
        for step_event in check.generator(step_events):
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 311, in core_dagster_event_sequence_for_step
        for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 362, in _type_check_and_store_output
        for output_event in _type_check_output(step_context, step_output_handle, output, version):
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 229, in _type_check_output
        yield DagsterEvent.step_output_event(
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 580, in step_output_event
        return DagsterEvent.from_step(
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 291, in from_step
        log_step_event(step_context, event)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 197, in log_step_event
        log_fn(
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/log_manager.py", line 234, in debug
        return self._log(logging.DEBUG, msg, kwargs)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/log_manager.py", line 204, in _log
        logger_.log(level, message, extra=extra)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1512, in log
        self._log(level, msg, args, **kwargs)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1589, in _log
        self.handle(record)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1599, in handle
        self.callHandlers(record)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1661, in callHandlers
        hdlr.handle(record)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 954, in handle
        self.emit(record)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 110, in emit
        self._instance.handle_new_event(event)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1065, in handle_new_event
        self._event_storage.store_event(event)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/storage/event_log/sqlite/sqlite_event_log.py", line 217, in store_event
        insert_event_statement = self.prepare_insert_event(event)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 81, in prepare_insert_event
        event=serialize_dagster_namedtuple(event),
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 174, in serialize_dagster_namedtuple
        return _serialize_dagster_namedtuple(nt, whitelist_map=_WHITELIST_MAP, **json_kwargs)
      File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 180, in _serialize_dagster_namedtuple
        return seven.json.dumps(_pack_value(nt, whitelist_map), **json_kwargs)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/__init__.py", line 234, in dumps
        return cls(
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 179, in default    raise TypeError(f'Object of type {o.__class__.__name__} '
    I have just started going down the path of a custom materializer, but then thought I should ask if there's any guidance on how to handle this?
    a
    • 2
    • 1
  • d

    Drew Sonne

    05/17/2021, 12:38 PM
    I also noticed the error is actually being thrown from serdes parts of dagster, so would a custom materializer work?
  • s

    szalai1

    05/17/2021, 3:31 PM
    Hey, how does
    intermediate_storage_defs
    relate to pipeline level
    io_manager
    's ? Our current mode_defs have intermediate storage and at some places we use io_manager in solids, if I set
    <http://resources.io|resources.io>_manager
    can/should I delete ``intermediate_storage_defs` from the mode definitions ?
    a
    • 2
    • 2
  • j

    jeremy

    05/17/2021, 8:23 PM
    Hey there, wondering if there is a recommended way to handle pipeline versions? For example in a situation where the pipeline changes on the more frequent side, but you also would want to be able to run the older versions of the pipeline if needed. One way I can think of right now is to add new solids to the grpc and dynamically create new pipelines with the DSL using a set of YAML files, but that could potentially get pretty messy. Would there be a better pattern to organize this?
    e
    • 2
    • 1
  • c

    Charles Lariviere

    05/18/2021, 5:06 PM
    Hey 👋 I’ve been getting the same “framework error” on a pipeline I tried running twice. Curious if you know of ways to debug this? This is running on Kubernetes — the pod did not reach the CPU or Memory limits, and is marked as
    Status:Terminated
    and
    Reason: Completed
    .
    An exception was thrown during execution that is likely a framework error, rather than an error in user code.
    a
    • 2
    • 6
  • j

    Jamie

    05/18/2021, 6:26 PM
    Hi, is there a way to get a specific output by name from a composite solid that yields multiple outputs? We have some functions that generate composites and the returned composites can have a variable number of outputs, and it would be useful to be able to get a subset of those outputs based on a list of the output names. I see a way we could implement it by wrapping the generated composite in another composite that yields the requested outputs, but if this functionality is already part of Dagster, that would be great to know. thanks!
    a
    • 2
    • 3
  • m

    Makoto

    05/18/2021, 6:39 PM
    Hi. I’m getting stumped on something that seems simple or maybe I am just getting confused. I have the following 2 dbt solids, one that only runs changed models and the other runs all models. :
    state_modified_config_in_test = {"project-dir": PROJECT_DIR, "models": ["state:modified"], "target": "test"}
    run_only_changed_models_in_test = dbt_cli_run.configured(state_modified_config_in_test,
                                                            name="run_only_changed_models_in_test")
    
    run_all_config_in_test = {"project-dir": PROJECT_DIR, "target": "test"}
    run_all_models_in_test = dbt_cli_run.configured(run_all_config_in_test, name="run_all_models_in_test")
    I want to be able to conditionally run them depending on if the
    run
    directory exists under the target directory. I read up on conditional branching but what I am not sure how to do is to conditionally invoke one of the dbt solids. I tried using a composite solid to pass in the yielded output but since the dbt solid does not take any input argument, I get the unused input error for the composite solid. Is there a way to work around it or a better way to achieve it?
    a
    o
    • 3
    • 10
  • k

    Kirk Stennett

    05/18/2021, 7:17 PM
    Hey, if I wanted to map multiple solids to a DynamicOutput can I just chain the map statements? In the example from https://docs.dagster.io/0.11.3/concepts/solids-pipelines/pipelines#dynamic-mapping--collect Or would I just have each mapped solid return a Dynamic output and map to that?
    c
    • 2
    • 4
  • j

    Jenny Webster

    05/19/2021, 3:58 PM
    I'm pretty sure I know the answer here,, but I just want to verify: Is it possible to access a SolidExecutionContext from a composite solid?
    c
    a
    • 3
    • 2
  • b

    Billie Thompson

    05/19/2021, 4:35 PM
    Hey, I am trying to work out how to deploy into a dagit into a kubernetes where the user deployment is in a image that requires pull secrets, and the imagePullSecrets never seem to get used, am I missing a step or something here, I also can’t see how to pass the secret on to the Run Worker
    d
    r
    • 3
    • 23
  • c

    Christian Lam

    05/19/2021, 5:18 PM
    Has anyone here successfully implemented authentication to the Graphql API endpoint with AWS load balancer and Cognito? I am looking to require authentication for hitting the Graphql API.
  • t

    Thomas

    05/19/2021, 8:12 PM
    Document processing. Hello, I have a few questions coming up with this use case. I have a bunch of document I want to process. Those documents have different template. For the example, I want to extract some meta data on both types. First I concentrate on the first document then I might change my pipeline to take both types. I want my pipeline just execute at the minimum. Document already extracted do nothing, other extract. How can I do that ? How can I see if an asset is already there ? Other questions I might not think about
    • 1
    • 2
  • k

    Kirk Stennett

    05/19/2021, 8:49 PM
    Hey, is there an easy way to pass a list in yaml to a solid? When I try to pass one it says something like: Expected one value in list either json, pickle or value. Or am I better off passing in a json list and having my solid input set as a string?
    n
    a
    • 3
    • 14
  • k

    Kirk Stennett

    05/20/2021, 12:20 AM
    Hey, posting again - getting:
    An exception was thrown during execution that is likely a framework error, rather than an error in user code.
    dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline 'redshift_checks'. Found ['y', 'z']
    This is happening on a pipeline that has a first solid with a step that outputs a DynamicOutput which I think is somewhat related to this. For whatever reason it runs fine when I execute it locally with preset and mode not listed here, but when I run it in k8s with this config it returns the above error. It loads fine in dagit & playground but not when execution is launched. I'm using version 0.11.3 Pipeline def:
    @pipeline(
        mode_defs=[
            ModeDefinition(
                name="production",
                resource_defs={
                    "redshift": redshift_resource,
                    "slack": slack_resource,
                    "s3": s3_resource,
                    "io_manager": s3_pickle_io_manager
                    },
                executor_defs=default_executors + [celery_k8s_job_executor],
            ),
        ],
        preset_defs=[
            PresetDefinition.from_files(
                name="celery_k8s",
                mode="production",
                config_files=[
                        'celery_k8s.yaml',
                        'redshift_config.yaml', -- contains the env info for redshift resource
                        'redshift_queries.yaml', -- contains the list of dicts to run queries
                ]
            ),
        ]
    )
    def redshift_checks():
        redshift_yaml_map().map(redshift_data_check)
    could this be related to the dynamic output with k8s processing / is this something that updating would solve?
    a
    • 2
    • 9
  • r

    Rubén Lopez Lozoya

    05/20/2021, 7:25 AM
    Hello team, I just got this error and cannot really understand why. I rerun all jobs and they worked.
    dagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "solid_publish_contract_info_response":
    
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 190, in _dagster_event_sequence_for_step
        for step_event in check.generator(step_events):
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 317, in core_dagster_event_sequence_for_step
        for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 369, in _type_check_and_store_output
        for evt in _store_output(step_context, step_output_handle, output, input_lineage):
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 484, in _store_output
        handle_output_res = output_manager.handle_output(output_context, output.value)
      File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
        self.gen.throw(type, value, traceback)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/errors.py", line 197, in user_code_error_boundary
        raise error_cls(
    
    The above exception was caused by the following exception:
    dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection
    
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/errors.py", line 187, in user_code_error_boundary
        yield
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 484, in _store_output
        handle_output_res = output_manager.handle_output(output_context, output.value)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/storage/fs_io_manager.py", line 97, in handle_output
        context.log.debug(f"Writing file at: {filepath}")
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/log_manager.py", line 234, in debug
        return self._log(logging.DEBUG, msg, kwargs)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/log_manager.py", line 204, in _log
        logger_.log(level, message, extra=extra)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 1512, in log
        self._log(level, msg, args, **kwargs)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 1589, in _log
        self.handle(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 1599, in handle
        self.callHandlers(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 1661, in callHandlers
        hdlr.handle(record)
      File "/usr/local/lib/python3.8/logging/__init__.py", line 954, in handle
        self.emit(record)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 152, in emit
        self._instance.handle_new_event(event)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1087, in handle_new_event
        self._event_storage.store_event(event)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/event_log/event_log.py", line 139, in store_event
        with self._connect() as conn:
      File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
        return next(self.gen)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 157, in create_pg_connection
        conn = retry_pg_connection_fn(engine.connect)
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 124, in retry_pg_connection_fn
        raise DagsterPostgresException("too many retries for DB connection") from exc
    a
    • 2
    • 1
  • l

    Liam Coatman

    05/20/2021, 10:50 AM
    Hi. I have a pipeline which runs once a day and produces an output which is saved to S3 and tracked using the asset catalogue, using the date to partition the asset. I attach the S3 object key of the saved output as metadata so I know where to load the object from later on. I then have a second pipeline the input to which should be the output from the first pipeline for a range of partitions (i.e. dates). I think I could write a GraphQL query to get the S3 object keys for the asset for the given partitions. Does this sound like a reasonable approach, or does Dagster have any patterns / concepts that fit this use case? Thanks
    a
    v
    p
    • 4
    • 6
  • j

    Jean-Pierre M

    05/20/2021, 1:32 PM
    Hi. I have a pipeline running on K8S with the K8SRunLauncher and QueuedRunCoordinator. I'm launching runs via the graphql python client. The idea is that python script loops over 1000 files and launches a run for each. This part seem to work. Once all the runs are submitted and the QueuedRunCoordinator is working through them, eventually the dagit pods on K8S crash and the dagit UI becomes unresponsive. The dagit pods restart themselves on K8S but continue to be unresponsive and never recover. My only way out of this crash loop is to kill all the pods and restart from scratch. Any thoughts about why this is happening? I currently have the dagster daemon pod, the dagit pod (with 2 replicas), the user deployment pod and the postgresql pod.
    d
    a
    • 3
    • 26
  • m

    Muthu

    05/20/2021, 7:16 PM
    hi… how can i set environment variable in dagster.yaml under
    scheduler
    ?
    a
    • 2
    • 2
  • s

    szalai1

    05/21/2021, 12:35 PM
    hey team, is there any way to use type checking in solid's
    config_schema
    (arbitrary types). As a workaround we can use inputs and configure it in the
    run_config
    . But I would like to do it in code. Can I codify solid static inputs (not in run_config) ?
    a
    • 2
    • 3
  • j

    jeremy

    05/21/2021, 7:15 PM
    Is there a way to modify or create a resource during a run? For example if a new database is created as part of the pipeline, is there a way to make it available to other solids without having to pass that through every input/output?
    a
    • 2
    • 2
  • p

    Piotr Dworzyński

    05/22/2021, 6:01 PM
    Hi, Is there a way to set version (as in Versioning and Memoization) for dagstermill solids? Thank you!
    c
    • 2
    • 2
  • p

    Piotr Dworzyński

    05/23/2021, 4:19 PM
    Secondly, i noticed that IOManager's resource dependencies are ignored when running
    MemoizableIOManager.has_output
    . For example, the below pipeline (self-contained) runs without issues when the
    MEMOIZED_RUN_TAG
    is not applied but fails when it is. This is due to `MyIOManager`'s
    has_output
    having it's
    context.resource
    set to None despite it being dependent on
    dataset
    resource. However, in
    MyIOManager.load_input
    and
    MyIOManager.handle_output
    functions
    context.resource.dataset
    is set (as you can see when running the pipeline without the
    MEMOIZED_RUN_TAG
    . Is there any way around this issue?
    import dagstermill as dm
    from dagster import ModeDefinition, pipeline, OutputDefinition, solid, Any
    from dagster.utils import script_relative_path
    from dagster import IOManager, io_manager, resource, make_values_resource
    from dagster.core.storage.memoizable_io_manager import MemoizableIOManager
    from dagster.core.storage.tags import MEMOIZED_RUN_TAG
    import pandas as pd
    import pickle
    import os
    
    output_versions = {
        "df": "1"
    }
    
    class MyIOManager(MemoizableIOManager):
    
        def _get_obj_dir_path(self, context):
            client_id = context.resources.dataset["client_id"]
            dataset_id = context.resources.dataset["dataset_id"]
            #
            #invoice_line_df_version = context.resources.dataset["invoice_line_df_version"]          
      
            obj_dir_path = f"/tmp/{client_id}/{dataset_id}/"
            
            return obj_dir_path
        
        def _get_solid_version(self, context, obj_name):
            #This is used as dagstermill doesn't allow (yet?) for versioning of solids
            if context.version != None:
                solid_version = context.version
            else:
                if obj_name in output_versions:
                    solid_version = output_versions[obj_name]
                else:
                    solid_version = "0"
            return solid_version
            
        
        def handle_output(self, context, obj):
            # name is the name given to the OutputDefinition that we're storing for
            obj_name = context.name
            obj_dir_path = self._get_obj_dir_path(context)
            solid_version = self._get_solid_version(context,obj_name)
            
            if not os.path.exists(obj_dir_path):
                os.makedirs(obj_dir_path)
    
            with open(obj_dir_path + obj_name + f"_v{solid_version}" + ".pickle", "wb") as f:
                pickle.dump(obj, f)
    
        def load_input(self, context):
            # upstream_output.name is the name given to the OutputDefinition that we're loading for
            obj_name = context.upstream_output.name
            solid_version = self._get_solid_version(context,obj_name)
    
            file_path = self._get_obj_dir_path(context) + obj_name +  f"_v{solid_version}.pickle"
            
            with open(file_path, "rb") as f:
                return pickle.load(f)
    
        def has_output(self, context):
            print(context)
            obj_name = context.name
            obj_dir_path = self._get_obj_dir_path(context)
            solid_version = self._get_solid_version(context,obj_name)
            return os.path.exists(obj_dir_path + obj_name + f"_v{solid_version}" + extension)
            
    
                
    @io_manager(required_resource_keys={"dataset"})
    def my_io_manager(_):
        return MyIOManager()
    
    @solid
    def test_solid():
        return pd.DataFrame.from_records([{"a": 1, "b": 2}, {"a": 2, "b": 3}])
    
    
    @pipeline(
        mode_defs=[
            ModeDefinition(
                resource_defs={"io_manager": my_io_manager, "dataset": make_values_resource()}
            )
        ],
        #tags={MEMOIZED_RUN_TAG: "true"}
    )
    def ingestion_pipeline():
        test_solid()
    Example config:
    resources:
      dataset:
        config:
          client_id: test-client
          dataset_id: test-dataset
    c
    • 2
    • 2
Powered by Linen
Title
p

Piotr Dworzyński

05/23/2021, 4:19 PM
Secondly, i noticed that IOManager's resource dependencies are ignored when running
MemoizableIOManager.has_output
. For example, the below pipeline (self-contained) runs without issues when the
MEMOIZED_RUN_TAG
is not applied but fails when it is. This is due to `MyIOManager`'s
has_output
having it's
context.resource
set to None despite it being dependent on
dataset
resource. However, in
MyIOManager.load_input
and
MyIOManager.handle_output
functions
context.resource.dataset
is set (as you can see when running the pipeline without the
MEMOIZED_RUN_TAG
. Is there any way around this issue?
import dagstermill as dm
from dagster import ModeDefinition, pipeline, OutputDefinition, solid, Any
from dagster.utils import script_relative_path
from dagster import IOManager, io_manager, resource, make_values_resource
from dagster.core.storage.memoizable_io_manager import MemoizableIOManager
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
import pandas as pd
import pickle
import os

output_versions = {
    "df": "1"
}

class MyIOManager(MemoizableIOManager):

    def _get_obj_dir_path(self, context):
        client_id = context.resources.dataset["client_id"]
        dataset_id = context.resources.dataset["dataset_id"]
        #
        #invoice_line_df_version = context.resources.dataset["invoice_line_df_version"]          
  
        obj_dir_path = f"/tmp/{client_id}/{dataset_id}/"
        
        return obj_dir_path
    
    def _get_solid_version(self, context, obj_name):
        #This is used as dagstermill doesn't allow (yet?) for versioning of solids
        if context.version != None:
            solid_version = context.version
        else:
            if obj_name in output_versions:
                solid_version = output_versions[obj_name]
            else:
                solid_version = "0"
        return solid_version
        
    
    def handle_output(self, context, obj):
        # name is the name given to the OutputDefinition that we're storing for
        obj_name = context.name
        obj_dir_path = self._get_obj_dir_path(context)
        solid_version = self._get_solid_version(context,obj_name)
        
        if not os.path.exists(obj_dir_path):
            os.makedirs(obj_dir_path)

        with open(obj_dir_path + obj_name + f"_v{solid_version}" + ".pickle", "wb") as f:
            pickle.dump(obj, f)

    def load_input(self, context):
        # upstream_output.name is the name given to the OutputDefinition that we're loading for
        obj_name = context.upstream_output.name
        solid_version = self._get_solid_version(context,obj_name)

        file_path = self._get_obj_dir_path(context) + obj_name +  f"_v{solid_version}.pickle"
        
        with open(file_path, "rb") as f:
            return pickle.load(f)

    def has_output(self, context):
        print(context)
        obj_name = context.name
        obj_dir_path = self._get_obj_dir_path(context)
        solid_version = self._get_solid_version(context,obj_name)
        return os.path.exists(obj_dir_path + obj_name + f"_v{solid_version}" + extension)
        

            
@io_manager(required_resource_keys={"dataset"})
def my_io_manager(_):
    return MyIOManager()

@solid
def test_solid():
    return pd.DataFrame.from_records([{"a": 1, "b": 2}, {"a": 2, "b": 3}])


@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={"io_manager": my_io_manager, "dataset": make_values_resource()}
        )
    ],
    #tags={MEMOIZED_RUN_TAG: "true"}
)
def ingestion_pipeline():
    test_solid()
Example config:
resources:
  dataset:
    config:
      client_id: test-client
      dataset_id: test-dataset
c

chris

05/24/2021, 2:20 PM
Hi Piotr, thanks for reporting this! It's an issue with how we're constructing the OutputContext during memoization. Made an issue to track this as well; should be able to get out a fix for this by next release: https://github.com/dagster-io/dagster/issues/4204
p

Piotr Dworzyński

05/24/2021, 2:24 PM
Thanks for looking at it so quickly!
View count: 1