https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • j

    jonvet

    02/01/2022, 3:35 PM
    I have a graph which sometimes I want to run by itself, and sometimes as part of a different graph. It was all working fine by itself. I just decorated the function that defines the graph with
    @graph
    , created a job via
    to_job
    and imported the job in my
    repo.py
    . In order to run it as part of another graph I added
    ins
    and
    out
    to the
    @graph
    decorator. That also runs fine. However since I added the
    ins
    and
    out
    to the
    @graph
    I can’t launch dagit anymore. It’s the job described above that’s causing the problem. it either complains about
    Missing required config entry "inputs" at the root
    or when I add
    inputs
    to my yaml it complains that
    Error 1: Received unexpected config entry "inputs" at the root.
    question: do I have to define 2 separate graphs in this case?
    a
    d
    • 3
    • 15
  • d

    Dylan Hunt

    02/01/2022, 4:01 PM
    Hi Everyone, I'm using build_reconstructable_pipeline method to make my pipeline reconstructable in pythonic execution. Below is the code snippet and for that I'm getting a warning saying
    UserWarning: Module builder was resolved using the working directory. The ability to load uninstalled modules from the working directory is deprecated and will be removed in a future release.  Please use the python-file based load arguments or install builder to your python environment.
    warnings.warn(
    reconstructable_pipeline = build_reconstructable_pipeline(
                'builder',
                'reconstruct_pipeline',
                reconstructable_args=(pipe,)
            )
    Is there a way to pass module name here, coz it is expecting first 2 arguments as string.
    a
    p
    • 3
    • 10
  • d

    Dylan Hunt

    02/01/2022, 4:09 PM
    Hi Team, I used to capture the pipeline/graph output from execute_pipeline method using output_capture. Below is the code snippet.
    output = execute_pipeline(
                pipeline=reconstructable_pipeline,
                run_config=conf,
                instance=DagsterInstance.local_temp(),
            ).output_capture
    Now in multiprocess execution mode using fs as io manager it not returning any outputs, I believe it is due to the I/O operations writing output to fs in multi processing. Is there a way to capture the output?
    a
    • 2
    • 4
  • w

    won

    02/01/2022, 6:12 PM
    hello/ I have repo with 2 jobs and 2 different envs. Now i specified 2 run configs. But i think it's too complicated. Any ideas to simplify my code? my code inside thread
    o
    • 2
    • 2
  • f

    Francis

    02/02/2022, 3:08 AM
    Hello, when will dagit have support for flask v2? We’re having problems resolving dependencies. Thanks!
    a
    • 2
    • 1
  • s

    Stefan Adelbert

    02/02/2022, 4:22 AM
    Theming
    dagit
    Are there any options for theming
    dagit
    , e.g. changing the logo or theme colours?
    p
    • 2
    • 2
  • s

    sourabh upadhye

    02/02/2022, 8:12 AM
    Hey Dagster Team! Is there a way to access the input provided to a particular op/solid with the help of hook_context?
    o
    • 2
    • 1
  • r

    Roel Hogervorst

    02/02/2022, 9:05 AM
    Goodday y'all :daggy-cowboy:. I have work in a very restricted kubernetes cluster but I've got dagit, a daemon, usercodeserver and a postgres database running in the same namespace. Everything works really well, but every day I get a graphql error in the dagit workspace (in the browser) :
    Operation name: RootworkspaceQuery. psycopg2.OperationError server closed the connection unexpectedly.
    There is more in this error but maybe this is enough? Is this something I can fix?
    m
    • 2
    • 11
  • s

    Srini R

    02/02/2022, 9:58 AM
    Hi, Currently we are running airflow and looking forward to migrate to dagster.. evaluation of options going on right now. What's the best possible route to migrate airflow dags to dagster? Is there any tool available? Any document available to follow best practices..?
    d
    • 2
    • 12
  • j

    Jay Sharma

    02/02/2022, 3:08 PM
    Hi all, I am trying to define a schedule in dagster. However I'm getting this error: NameError: name 'my_schedule' is not defined This is my code:
    @repository
    def ingestion_pipeline_repository():
        return get_pipelines() + [my_schedule]
    
    
    @schedule(cron_schedule="0 22 * * *", pipeline_name="my_pipeline", execution_timezone="US/Eastern")
    def my_schedule(context):
        return {}
    I'm following this tutorial for scheduling : https://docs.dagster.io/tutorial/advanced-tutorial/scheduling My setup is: Dagster Version 0.13.1 and I'm running this through a docker image. However I was previously on Dagster Version 0.11.15 and my code is currently legacy code (pipelines. solids, etc). Not sure if that has anything to do with the error. Can someone tell me why I'm getting this name error? Thanks for the help.
    d
    k
    • 3
    • 9
  • z

    Zach

    02/02/2022, 3:48 PM
    Hello all - when using
    build_op_context()
    I'm receiving the following exception:
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:459: in build_op_context
        return build_solid_context(
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:512: in build_solid_context
        return UnboundSolidExecutionContext(
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/context/invocation.py:68: in __init__
        self._instance = self._instance_cm.__enter__()  # pylint: disable=no-member
    /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py:119: in __enter__
        return next(self.gen)
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/execution/api.py:333: in ephemeral_instance_if_missing
        with DagsterInstance.ephemeral() as ephemeral_instance:
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/instance/__init__.py:363: in ephemeral
        tempdir = DagsterInstance.temp_storage()
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
        @staticmethod
        def temp_storage() -> str:
    >       from dagster.core.test_utils import environ
    E       ImportError: cannot import name 'environ' from 'dagster.core.test_utils' (/Users/zachary.romer/Documents/empirico/etxpipelines/dagster_0.13.17/lib/python3.9/site-packages/dagster/core/test_utils.py)
    
    ../../dagster_0.13.17/lib/python3.9/site-packages/dagster/core/instance/__init__.py:488: ImportError
    I'm using dagster 0.13.17, just upgraded from 0.13.14. This is running locally within a unit test. When I inspect the dagster.core.test_utils module it does appear to have a top-level "environ" attribute, so I'm not quite sure what's going on here. Any help is greatly appreciated! Triggering code example here:
    def test_nominal_case(self, mock_file):
            context = build_op_context(op_config={'bucket': 'test-bucket',
                                                  'key': 'test-key'})
    Sorry, turns out I was patching something low-level on the test and didn't realize it was interfering with the dagster internals!
  • j

    Javier Llorente Mañas

    02/02/2022, 4:15 PM
    Hello! I am trying to launch dagster jobs from GraphQL python sdk, I am wondering how could I pass parameters to the job that later are used in the different ops, It is there any parameter I could use, maybe the
    run_config
    ?Also I am trying to look at some tutorials in this are apart from this one
    a
    • 2
    • 7
  • c

    Chris Nogradi

    02/02/2022, 7:23 PM
    I am trying to understand how to pass a set of keys/ids from the IOManager.handle_output() to the IOManager.load_input() if the handle_output() creates the key/id from the external resource (in my case MongoDB's ObjectIDs). I see this done in the memory IO manager by using a local variable but this won't work cross process. Any examples, I can look at?
    o
    • 2
    • 10
  • j

    Johnny Bravo

    02/02/2022, 9:11 PM
    Hey guys, do I miss something but I don't understand how lazy loading works, or at least how to start dagit without running my task. Here how lazy loading is documented https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/core/definitions/decorators/repository.py#L181-L202, also I saw other examples using lambdas. However this code is failing:
    @repository
    def my_repo():
        return {"jobs": {"my_job": lambda: my_job}}
    and by reading the source, I don't see why it would not fail https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/core/definitions/repository_definition.py#L546-L549 obviously
    my_job
    is an instance of
    JobDefinition
    but not a lambda, by my understanding. without wrapping my job will load the repo as expected, however it will execute my job's code for some reason
    o
    • 2
    • 3
  • m

    Mykola Palamarchuk

    02/02/2022, 9:29 PM
    Hi team! We are trying to design a Dagster system that runs across different AWS account. But we have a restrictions on cross-account database access. We think about implementing our own RunStorage to store metadata on S3 to negotiate this restrictions. What do you think about that?
    d
    • 2
    • 3
  • m

    Mykola Palamarchuk

    02/02/2022, 9:39 PM
    And another relative question: is it possible to run some jobs with Celery but others with K8s executor in the context of the same Dagster instance?
    o
    d
    • 3
    • 4
  • c

    Chris Nogradi

    02/02/2022, 11:05 PM
    In this example: https://docs.dagster.io/concepts/ops-jobs-graphs/ops#op-factory. How are the inputs provided to the _x_ops() function? I see how they are used in the decorator. The example speaks to the arg argument but does not use it so it is not clear how it might be used.
    o
    • 2
    • 2
  • s

    Stefan Adelbert

    02/03/2022, 4:05 AM
    Getting a resource's dependent resource by name I'm using a factory function to create a resource, something like
    def storage_bucket(dependent_resource_name):
        @resource(
            required_resource_keys={dependent_resource_name},
        )
        def _storage_bucket(init_context):
            credentials = init_context.resources[dependent_resource_name]
            return StorageBucket(credentials)
        return _storage_bucket
    The problem is in the line
    credentials = init_context.resources[dependent_resource_name]
    which is giving me the error
    TypeError: tuple indices must be integers or slices, not str
    . I'm aware that
    init_context.resources
    is a
    _ScopeResources
    . I also see that
    _ScopedResource
    implements
    __getitem__
    , which is why I assumed this would work. I know that I could call
    _ScopedResources._asdict()
    and the get the resource by name. Can anyone suggest the :dagster: way to do this?
    a
    y
    d
    • 4
    • 4
  • a

    Alex Service

    02/03/2022, 4:39 AM
    Hey folks, I have a GCPFileManager issue. I’m trying to use the dagster-gcp
    GCPFileManager.read_data
    to retrieve a compressed tarfile (
    blah.tar.gz
    ) from a bucket. It seems to retrieve the file, but truncates the last few hundred bytes and therefore fails to decompress. I verified it’s not using a bad cached version and if I download the file through other means (gsutil, gcp UI, etc), it works as expected. Is this a known issue, or am I just missing something?
    s
    o
    • 3
    • 11
  • s

    Scobie Smith

    02/03/2022, 5:07 AM
    When I install dagster, following the Getting Started page, using either conda or pip, on Windows 10... it does not create the dagit tool or the .dagster folder. So, when I proceed to try to run dagit, it says 'dagit' is not recognized, etc. I thought maybe I need to add something to my PATH environment variable, but actually there is nothing installed to point to. I'm not sure what is going wrong with the install, because it runs though successfully...
    a
    • 2
    • 2
  • r

    Roel Hogervorst

    02/03/2022, 10:10 AM
    Good morning I'm going to bump my question again : How can I solve the graphql error
    Operation name: RootworkspaceQuery. psycopg2.OperationError server closed the connection unexpectedly.
    https://dagster.slack.com/archives/C01U954MEER/p1643792705600429
    ✅ 1
    d
    a
    • 3
    • 12
  • d

    Dominik Liebler

    02/03/2022, 2:19 PM
    Hi everyone! I wonder what the best way to integrate dagster with ETLs written in other languages than Python (targeting the JVM here) is? I know that it uses GRPC to communicate with user code but the Service description is not so well documented and I doubt I could just use Java to create a Repository with Jobs and all.
    a
    • 2
    • 2
  • c

    Carlos Sanoja

    02/03/2022, 5:24 PM
    Hello! I have a question. When defining a resource to make an update to a database. If two jobs use the same resource, does dagster lock the resource usage and knows when to share it between the jobs or should I verify in some external way that the resource is not being used by another job? I hope you can help me, thanks!
    a
    • 2
    • 7
  • g

    George Pearse

    02/03/2022, 5:30 PM
    I have my dagster service running on docker-compose with a volume mount to the pipeline code so that a docker container restart is sufficient for an update. There have been a few occasions where it looks as if the wrong code has been executed when I've been viewing another branch from that server, the only explanation for this I can think of is that the pipeline code can update without a container restart. Is that possible?
    a
    • 2
    • 2
  • c

    Chris Nogradi

    02/03/2022, 5:38 PM
    Sorry for the newbie question: Is it expected that Dagit is significantly slower than running from python all in one process? Seems like there are long delays between each op's execution and I see execute_windows_tail timeouts in the log (Timed out waiting for process to start). The trivial pipeline with 4 ops I am using takes 5 minutes to run in dagit and 30 seconds in straight python. Is this an operator error?
    o
    • 2
    • 4
  • i

    Igor

    02/03/2022, 5:44 PM
    help
    @op(ins={'front': In(str)}, out=Out(None))
    def start(context, front):
    	<http://context.log.info|context.log.info>(f'{front}')
    
    	yield Output(None)
    
    
    @job
    def data_parser():
        start(1)
        start(1)
    
    job_data_parser_schedule = ScheduleDefinition(cron_schedule='00 00 * * *', job=data_parser,
                                                  execution_timezone='Europe/London')
    
    
    @repository
    def repository_parser():
        return [job_data_parser_schedule]
    error
    dagster.core.errors.DagsterInvalidDefinitionError: In @job data_parser, received invalid type <class 'str'> for input "front" (at position 0) in op invocation "start". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
    o
    • 2
    • 3
  • i

    Igor

    02/03/2022, 7:02 PM
    How to make visible volume/mount directory in running task? docker-compose.yml
    volumes:
          - ./files:/work
    Does not work
    d
    • 2
    • 1
  • e

    Elizabeth

    02/03/2022, 7:29 PM
    Is there a way to
    get_runs
    based on
    run_key
    in a sensor? It doesn’t appear to be an option for
    PipelineRunsFilter
    .
    o
    y
    d
    • 4
    • 4
  • e

    Evan Arnold

    02/03/2022, 7:51 PM
    Hi all - I hope this is an appropriate question to be writing here. I am curious about any established "best practices" around structuring
    job
    and
    op
    imports. Currently, we have a hand-rolled ETL that is super object oriented. So we will have things like the following:
    class FolderExtractor:
        def __init__(
            self, path_to_input: str
        ) -> None:
           # some code
    
        def run(self) -> list[str]:
           # other code
    
    class DataFrameBuilder:
        def __init__(self, files: list[str]) -> None:
           # code 
    
        def run(self) -> pd.DataFrame:
           # code
    Which allows us to stack the objects:
    DataFrameBuilder(FolderExtractor(inp).run()).run()
    We chose this pattern because we have pretty complicated internal logic that we wanted to pack away into private methods. How would one convert this to `op`s? I am kind of thinking you would just create something like:
    @op
    def folder_extractor: 
    
    @op
    def data_frame_builder
    But then where do the private methods live? In the same file? Or do you create an object that you hid inside of the
    op
    ? Or... Very welcome to any & all feedback. Perhaps also helpful to say: I've got a decade+ of coding experience, but I am still fairly new to python.
    o
    m
    • 3
    • 11
  • e

    Evan Arnold

    02/03/2022, 9:38 PM
    On additional question: Is there a dagster equivalent for the dask
    ProgressBar
    ? We have a pretty slow map_partions/compute thing going on, and the progress bar has been pretty key to debugging (unfortunately)
    • 1
    • 1
Powered by Linen
Title
e

Evan Arnold

02/03/2022, 9:38 PM
On additional question: Is there a dagster equivalent for the dask
ProgressBar
? We have a pretty slow map_partions/compute thing going on, and the progress bar has been pretty key to debugging (unfortunately)
Interestingly, it works really nicely with the
dagit
logs, which might be enough for my needs:
View count: 1