https://dagster.io/ logo
Join the conversationJoin Slack
Channels
announcements
dagster-airbyte
dagster-airflow
dagster-bigquery
dagster-cloud
dagster-cube
dagster-dask
dagster-dbt
dagster-de
dagster-ecs
dagster-feedback
dagster-kubernetes
dagster-noteable
dagster-releases
dagster-serverless
dagster-showcase
dagster-snowflake
dagster-support
dagster-wandb
dagstereo
data-platform-design
events
faq-read-me-before-posting
gigs-freelance
github-discussions
introductions
jobs
random
tools
豆瓣酱帮
Powered by Linen
dagster-support
  • q

    Qwame

    12/22/2021, 10:20 PM
    Hi, I got the following error in one of my scheduled jobs:
    dagster.core.executor.child_process_executor.ChildProcessCrashException
      File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 163, in execute
        event_or_none = next(step_iter)
      File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 268, in execute_step_out_of_process
        for ret in execute_child_process_command(command):
      File "C:\dagster\.venv\lib\site-packages\dagster\core\executor\child_process_executor.py", line 157, in execute_child_process_command
        raise ChildProcessCrashException(exit_code=process.exitcode)
    I don't quite understand what happened. Any help?
    d
    • 2
    • 8
  • r

    Rasheed Elsaleh

    12/22/2021, 11:00 PM
    Hi, I'm trying to orchestrate a pipeline in kubernetes that runs a series of docker containers created from different images. Each container will read files as input and write files as output. Example: containerA -> fileA, fileA -> containerB -> fileB, fileB -> containerC -> fileC. I'm new to dagster and I'm wondering if this is something that can be done using it?
    d
    • 2
    • 3
  • p

    paul.q

    12/23/2021, 12:28 AM
    Hi, I'm trying to migrate to 0.13.12 from 0.12.12 and having some trouble achieving with a partitioned job what we used to be able to do with
    PartitionSetDefinition
    ,
    create_schedule_definition
    and
    pipeline
    . We used to have a
    PartitionSetDefinition
    with a
    partition_fn
    that created a list of
    Partition
    for a date range, but only Mon-Fri. We were able to use this to create different
    PartitionSet
    objects for different pipelines as well as create a schedule using
    create_schedule_definition
    on a
    PartitionSet
    object. Now, I have a
    Job
    for my
    Graph
    and its config is coming from a
    daily_partitioned_config
    - which doesn't exclude weekends as I would like. Should I be messing with
    dynamic_partition_config
    to achieve this? I also use
    build_schedule_from_partitioned_job
    to achieve having a schedule for the job. I was able to get the partitions looking the way I wanted using
    dynamic_partition_config
    , but
    build_schedule_from_partitioned_job
    returned this error, where
    get_effective_dates1
    returns a set of date strings of the form `%Y-%m-%d`:
    dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) with type <class 'dagster.core.definitions.partition.DynamicPartitionsDefinition'>.
    Thanks Paul
  • b

    Bryan Chavez

    12/23/2021, 1:45 AM
    Is there a way to make certain op or resource configs ("graph_configs" below) that are sensitive, to come from environment variables so they are not visible in Dagit? I thought I saw something like below which uses 'env' in the configs: {'database': 'DEV_DEMO', 'schema': {'env': 'secret-test'}} but this produced an error: "Received unexpected config entry "env" at path root:" I basically have logic generating graphs dynamically from config files and then separate logic that generates "graph_configs" to assign to the graph.
    sample_graph = sample_graph.to_job(
        executor_def=in_process_executor,
        resource_defs=resource_defs_map,
        config=graph_configs,
    )
    o
    • 2
    • 1
  • n

    Nitin Madhavan

    12/23/2021, 4:59 AM
    HI all, I want to use Dagster to orchestrate my data recievers. These may include Kafka, a csv file or any other source. The active sources will be available in a config file. My planned deployment strategy - So for example, let us assume that my config file has one kafka source and a file folder source. My schedule will read the config file and generate two jobs - one for reading from kafka and one for reading from file. If the job is already running, it will not initiate a new run using context.instance.get_run_records. The problem I am facing is with the files. When the schedule runs, it sees "file" source in config. It then initiates a Job run for reading the files. In the Op, it loops through all the files and processes them. However, the next time the job runs, it again reads all the same files. One solution from docs is using filename as run_key when creating a job run. But in that case, the loop reading the filenames has to be outside the job_run (in a sensor). I do not know how to achieve this as I am generating the job runs based on a schedule which is looping through all active sources. Thus this would require the schedule to generate a normal job_run if the source is Kafka, and generate/activate a sensor (to generate job runs for each file) if source is file. I know this sounds complicated. Would be grateful if anyone can give some pointers.
  • r

    Rahul Sharma

    12/23/2021, 9:06 AM
    hello all, i am working on dagster-celery executor ,i am able to run pipeline by celery-executor of dagster-version==0.13 but same process not working for me of dagster-version==0.12.14 please let me know anyone have any idea . i am following this docs-->https://docs.dagster.io/0.12.14/deployment/guides/celery
    d
    • 2
    • 42
  • f

    Florian Giroud

    12/23/2021, 10:54 AM
    Hi, we have started to implement a few jobs, both during development time and in production, we find the dagster log a little bit noisy. Is there a global setting se can activate to disable all logs of type "ENGINE_EVENT", "STEPSTART" and so on ... basically all internal dagster log, to see only the log emitted by us ? Thx
    o
    • 2
    • 3
  • j

    Jazzy

    12/23/2021, 5:59 PM
    Hi, I think perhaps my understanding of how static partitions work and then config they generate isn't correct. I'm try to pass a static partition to only one op that is part of a graph.
    CONTINENTS = ["Africa", "Antarctica", "Asia", "Europe", "North America", "Oceania", "South America"]
    
    @static_partitioned_config(partition_keys=CONTINENTS)
    def continent_config(partition_key: str):
        return {"config": {"continent_name": partition_key}}
    
    @op(config_schema={"continent_name": str})
    def continent_op(context):
        <http://context.log.info|context.log.info>(context.op_config["continent_name"])
    
    normal_config = {"ops": {
                "validate_date_input_variables": {"config": {"date_begin": '2021-12-01', "date_end": '2021-12-23'}},
                "set_order_type": {"config": {"order_type": 'NORMAL'}},
                "continent_op": continent_config
                }
            }
    For which I get the following error:
    Error 1: Value at path root:continent_op must be dict. Expected: "{ config: { continent_name: String } outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }".
    Is there a different way that I should be doing this? I think it might relate to a previous thread which led me to multi-dimension partitions... The end goal is to have many different jobs that run hourly for a few different combinations of parameters
    o
    • 2
    • 2
  • q

    Quy

    12/23/2021, 6:36 PM
    hi, i am facing a problem with
    dagster.core.errors.PartitionExecutionError
    not sure that I have a proper config due to error which doesn’t give more details on this. would anyone can help me?
    dagster.core.errors.PartitionExecutionError: Error occurred during the evaluation of the `run_config_for_partition` function for partition set download_firebase_data_local_partition_set 
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 292, in get_partition_config
        return ExternalPartitionConfigData(name=partition.name, run_config=run_config)
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/contextlib.py", line 137, in __exit__
        self.gen.throw(typ, value, traceback)
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 192, in user_code_error_boundary
        raise error_cls(
    The above exception was caused by the following exception:
    
    TypeError: daily_download_config() takes 1 positional argument but 2 were given 
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/errors.py", line 185, in user_code_error_boundary
        yield
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/grpc/impl.py", line 291, in get_partition_config
        run_config = partition_set_def.run_config_for_partition(partition)
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/partition.py", line 441, in run_config_for_partition
        return copy.deepcopy(self._user_defined_run_config_fn_for_partition(partition))
      File "/Users/bryan/miniconda3/envs/dagster-injector/lib/python3.9/site-packages/dagster/core/definitions/time_window_partitions.py", line 192, in <lambda>
        run_config_for_partition_fn=lambda partition: fn(
    https://stackoverflow.com/questions/70465752/what-is-proper-partition-configs-for-dagster-job
    o
    • 2
    • 1
  • s

    schrockn

    12/23/2021, 11:38 PM
    set the channel topic: Note: Core team on break until Jan 3! 🎅🎅🎅 🙋 Ask your question after checking out our documentation! https://docs.dagster.io/
    😛artydagster: 1
  • c

    Christian Bay

    12/24/2021, 6:47 AM
    Is it possible to configure the format of date and time shown in Dagit?
    c
    • 2
    • 1
  • m

    Martin Carlsson

    12/24/2021, 9:36 AM
    Using multiple GitHub repositories with Dagster. I’m doing a Dagster POC for a financial company. One important thing for that company is that teams are independent and can create code independently. This means they would need to have their Dagster code in their own GitHub repositories. And that a central Dagster GitHub repository controls the entire setup. In dbt I would create packages. The Workspaces documentation describes it in broad terms. However, I don’t know how to implement it if the code is in different GitHub repositories. What strategy should I use here? Are there any code examples I could get inspiration from? Is there a documentation page I have missed?
    👀 1
    q
    c
    • 3
    • 6
  • g

    George Pearse

    12/24/2021, 10:32 AM
    Most of my company's analytics related data exists in Mongo, I need it in a relational database in order to use a drag and drop graphing tool like Superset. At the minute I'm just planning on running hourly sync jobs that retrieve all the data added to Mongo that isn't in the relational DB (this isn't particularly clean because some of the Mongo values can be updated so I have to check multiple timestamps for a single record). Will anything like Meltano or Airbyte help me out here? Bit terrified this is going to bury me in maintenance tasks.
  • u

    张强

    12/25/2021, 8:08 AM
    Whether op or job has a base class for inheritance? In order to uniformly set up retry, timeout, error handling, etc. Like this:
    d
    • 2
    • 11
  • g

    geoHeil

    12/25/2021, 9:11 AM
    @asset(out=Out(MyCustomTypedDataFrame))
    how can I specify a custom type definition not only for op outputs but also for an asset?
    c
    • 2
    • 1
  • j

    Jahid Hasan

    12/26/2021, 2:53 AM
    Hi all, I need a little help. I actually read the single job creation documentation where it works with CSV files. I actually have a trained model which formatted in .pt, any suggestions how can I use different file format (means my .pt file) replace of csv file, any suggestions or resources for this to look into. Thank you
    c
    • 2
    • 3
  • o

    Or Asher

    12/26/2021, 12:14 PM
    Hey, I was wondering if there is a way to set the executing image on a per job basis. Even better if this available at job execution time
    d
    y
    d
    • 4
    • 4
  • r

    rhl

    12/26/2021, 2:36 PM
    Hi, regarding partitioned by day jobs. You can imagine an example where you want to fit a model every day on the last N days of ‘input_data.’ (Partitioned by day). Can you have a model_fit job that is also partitioned by day, then have day D depend on the prior [D-N,D) days of history of the ‘input_data’ job?
    c
    • 2
    • 1
  • m

    Manny Schneck

    12/26/2021, 11:10 PM
    I'm getting set up to run/work on dagster on a fresh Arch install and I thought's I'd share what I've figured out: http://blog.shemanigans.soy/dagster-on-arch-linux
  • d

    daniel blinick

    12/27/2021, 10:21 AM
    hey! having an issue running op tests. Im using
    build_op_context
    to create the context for the op and the created context has a
    run_id
    set to "ephemeral". the problem is that there is a resource that im trying to use in the test that also makes use of the
    run_id
    , but the
    run_id
    in the resource context is
    None
    . seems to be caused by the fact that it tries to grab the
    run_id
    from the pipeline run object, which, in this case, does not exist. is there a way this can be fixed so that the run_id 'ephemeral' is perpetuated throughout, including the resources? thanks!
    c
    d
    • 3
    • 3
  • g

    geoHeil

    12/27/2021, 12:09 PM
    How can I combine (in the experimental software defined assets api) steps with regular operations? I.e. have a foreign asset as the input perhaps some dynamic graph/ops in between and then an asset per final output of the dynamic graph (or a fan-in operation and a single asset)?
    c
    m
    • 3
    • 5
  • b

    Bryan Chavez

    12/27/2021, 2:19 PM
    I'm trying to setup Dagster with Docker but I'm getting this error when triggering jobs, is there something wrong with the config I have below for the run_launcher?
    Traceback (most recent call last):
      
    File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
        
    return _run_code(code, main_globals, None,
      
    File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
        
    exec(code, run_globals)
      
    File "/usr/local/lib/python3.8/site-packages/dagster/__main__.py", line 3, in <module>
        
    main()
      
    File "/usr/local/lib/python3.8/site-packages/dagster/cli/__init__.py", line 50, in main
        
    cli(auto_envvar_prefix=ENV_PREFIX)  # pylint:disable=E1123
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        
    return self.main(*args, **kwargs)
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        
    rv = self.invoke(ctx)
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        
    return _process_result(sub_ctx.command.invoke(sub_ctx))
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        
    return _process_result(sub_ctx.command.invoke(sub_ctx))
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        
    return ctx.invoke(self.callback, **ctx.params)
      
    File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        
    return callback(*args, **kwargs)
      
    File "/usr/local/lib/python3.8/site-packages/dagster/cli/api.py", line 48, in execute_run_command
        
    DagsterInstance.from_ref(args.instance_ref)
      
    File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 423, in from_ref
        
    run_launcher=instance_ref.run_launcher,
      
    File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 264, in run_launcher
        
    return self.run_launcher_data.rehydrate() if self.run_launcher_data else None
      
    File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 56, in rehydrate
        
    module = importlib.import_module(self.module_name)
      
    File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
        
    return _bootstrap._gcd_import(name[level:], package, level)
      
    File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
      
    File "<frozen importlib._bootstrap>", line 991, in _find_and_load
      
    File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
      
    File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
      
    File "<frozen importlib._bootstrap_external>", line 848, in exec_module
      
    File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
      
    File "/usr/local/lib/python3.8/site-packages/dagster_docker/__init__.py", line 4, in <module>
        
    from .docker_run_launcher import DockerRunLauncher
      
    File "/usr/local/lib/python3.8/site-packages/dagster_docker/docker_run_launcher.py", line 5, in <module>
        
    from dagster.core.launcher.base import (
    ImportError: cannot import name 'ResumeRunContext' from 'dagster.core.launcher.base' (/usr/local/lib/python3.8/site-packages/dagster/core/launcher/base.py)
    run_launcher:
    #  module: dagster.core.launcher
    #  class: DefaultRunLauncher
      module: dagster_docker
      class: DockerRunLauncher
      config:
        env_vars:
          - DAGSTER_POSTGRES_USER
          - DAGSTER_POSTGRES_PASSWORD
          - DAGSTER_POSTGRES_DB
        network: docker_khde_network
        container_kwargs:
          volumes:
            - repository.py:/opt/dagster/app/kh_dagster/
    d
    • 2
    • 2
  • d

    Daniel Suissa

    12/27/2021, 2:25 PM
    Anyone has experience running a remote debugger to the dagster code provider on pycharm? Having a hard time understanding which part of the code should run
    pydevd_pycharm.settrace
    Edit: for some reason settrace jams the job / graph execution, but when I put it inside an op, it works fine and I'm able to attach the code to the pycharm debugger. It's a bit annoying that I have to declare the debugger in every op (although I can wrap a decorator around @op). I guess the problem arises when the code provider parses the python code to create the dependency wiring... If anyone knows a way to run settrace once for the whole job I'd appreciate a quick description of what you did..
  • q

    Qumber Ali

    12/27/2021, 2:34 PM
    Hi all, I have few questions I'm adding and getting the records from AssetMaterialization. • when I read a record from AssetMaterialization, does it removes from AssetMaterialization or we need to remove that manually? • How I can clear all the records from AssetMaterialization?
    p
    • 2
    • 11
  • u

    张强

    12/28/2021, 5:41 AM
    Is there a hook before op starts running?
    c
    • 2
    • 1
  • c

    Cezary Pukownik

    12/28/2021, 8:35 AM
    Hi everyone! Is there an option to configure a
    dbt_resource
    in job config, eg. in dagit? In documentation I've seen only in code configuration using
    dbt_cli_resource.configured
    method. But this requires to set some configs inside the code. I want to set
    project_dir
    and
    dbt_executable
    outside of code, to have an option to switch between dev and prod envs without chaning code.
    p
    y
    d
    • 4
    • 5
  • h

    Harris Hoke

    12/28/2021, 2:03 PM
    I’m trying to get the dagster pytests set up properly. I’ve been able to work my way through https://docs.dagster.io/community/contributing, but can’t actually get the tests to work. I’ve tried a few things below, such as using
    --import-mode=importlib
    , but nothing has worked so far:
    (dagster37) ➜  dagster git:(master) ✗ pwd
    /Users/harris.hoke/personal_projects/dagster/dagster
    (dagster37) ➜  dagster git:(master) ✗ git pull
    Already up to date.
    (dagster37) ➜  dagster git:(master) ✗ python3 --version                      
    Python 3.7.4
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:14: in <module>
        from dagster_test.dagster_core_docker_buildkite import (
    E   ModuleNotFoundError: No module named 'dagster_test'
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:10: in <module>
        from dagster.core.errors import DagsterUserCodeUnreachableError
    E   ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
    (dagster37) ➜  dagster git:(master) ✗ ls python_modules/dagster/dagster_tests
    __init__.py        cli_tests          daemon_tests       execution_tests    workspace.yaml
    __pycache__        conftest.py        docker-compose.yml general_tests
    api_tests          core_tests         environments       scheduler_tests
    (dagster37) ➜  dagster git:(master) ✗ rm python_modules/dagster/dagster_tests/__init__.py 
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:10: in <module>
        from dagster.core.errors import DagsterUserCodeUnreachableError
    E   ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests                        
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:10: in <module>
        from dagster.core.errors import DagsterUserCodeUnreachableError
    E   ImportError: cannot import name 'DagsterUserCodeUnreachableError' from 'dagster.core.errors' (/Users/harris.hoke/.pyenv/versions/dagster37/lib/python3.7/site-packages/dagster/core/errors.py)
    (dagster37) ➜  dagster git:(master) ✗ pip uninstall dagster
    [...]
      Successfully uninstalled dagster-0.13.12
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest python_modules/dagster/dagster_tests
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:9: in <module>
        from dagster import check, seven
    E   ModuleNotFoundError: No module named 'dagster'
    (dagster37) ➜  dagster git:(master) ✗ python3 -m pytest --import-mode=importlib python_modules/dagster/dagster_tests
    ImportError while loading conftest '/Users/harris.hoke/personal_projects/dagster/dagster/python_modules/dagster/dagster_tests/conftest.py'.
    python_modules/dagster/dagster_tests/conftest.py:9: in <module>
        from dagster import check, seven
    E   ModuleNotFoundError: No module named 'dagster'
    d
    • 2
    • 3
  • c

    Chris Retford

    12/28/2021, 9:31 PM
    hello. I've been very impressed with dagster so far. I've managed to resolve all my issues but now I'm stuck. I'd like to use the daily partition feature. it would really be helpful. I have a job that runs fine from launchpad with a supplied date. But if I use the backfill launcher from I get the following error:
    2021-12-28 14:30:51 - BackfillDaemon - ERROR - Backfill failed for vrixduce: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    	status = StatusCode.UNKNOWN
    	details = "Exception iterating responses: Object of type type is not JSON serializable"
    	debug_error_string = "{"created":"@1640727051.106427000","description":"Error received from peer unix:/var/folders/95/hh0q96hs3dddfgp8ty7x8fvd4p202x/T/tmpovml57vv","file":"src/core/lib/surface/call.cc","file_line":1075,"grpc_message":"Exception iterating responses: Object of type type is not JSON serializable","grpc_status":2}"
    >
    
    Stack Trace:
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/daemon/backfill.py", line 91, in execute_backfill_iteration
        for _run_id in submit_backfill_runs(
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/core/execution/backfill.py", line 147, in submit_backfill_runs
        result = repo_location.get_external_partition_set_execution_param_data(
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/core/host_representation/repository_location.py", line 742, in get_external_partition_set_execution_param_data
        return sync_get_external_partition_set_execution_param_data_grpc(
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/api/snapshot_partition.py", line 110, in sync_get_external_partition_set_execution_param_data_grpc
        api_client.external_partition_set_execution_params(
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/grpc/client.py", line 206, in external_partition_set_execution_params
        chunks = list(
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/dagster/grpc/client.py", line 118, in _streaming_query
        yield from response_stream
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
        return self._next()
      File "/opt/anaconda3/envs/etl38/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
        raise self
    d
    • 2
    • 17
  • m

    Manny Schneck

    12/28/2021, 9:42 PM
    If anybody is running into issues with the grpc tests
    python_modules/dagster/dagster_tests/general_tests/grpc_tests
    and they think it might be dns, you can try
    export GRPC_DNS_RESOLVER=native; pytest ....
    , and if that fixes it, then you know that it's DNS.
    • 1
    • 2
  • a

    Arun Kumar

    12/29/2021, 12:48 AM
    Hi team, trying to trigger the
    LaunchPipelineExecution
    graphQL API from our Kotlin backend using a Kotlin client and I am passing the
    runConfigData
    as a valid JSON string. Seeing the following error on dagster 0.12.12. Any thoughts on how I can fix this?
    PythonError(message=dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got '{"ops": {"load_analysis": {"config": {"analysis_id": b3e61740-ed8d-482b-943d-8ce354b11632}}}}' which is type <class 'str'>.
    d
    k
    • 3
    • 20
Powered by Linen
Title
a

Arun Kumar

12/29/2021, 12:48 AM
Hi team, trying to trigger the
LaunchPipelineExecution
graphQL API from our Kotlin backend using a Kotlin client and I am passing the
runConfigData
as a valid JSON string. Seeing the following error on dagster 0.12.12. Any thoughts on how I can fix this?
PythonError(message=dagster.check.ParameterCheckError: Param "run_config" is not one of ['dict', 'frozendict']. Got '{"ops": {"load_analysis": {"config": {"analysis_id": b3e61740-ed8d-482b-943d-8ce354b11632}}}}' which is type <class 'str'>.
d

daniel

12/29/2021, 2:04 AM
Hey arun - could the client be JSON encoding it twice? The server is acting as though it received a double encoded string
a

Arun Kumar

12/29/2021, 2:18 AM
Thanks Daniel for the response. I am pretty new with this client and let me check it. How do you say that it is double encoded? The JSON string in the log above looks fine to me ?
d

daniel

12/29/2021, 2:31 AM
Like this:
>>> my_dict = {"foo": "bar"}
>>> json.dumps(my_dict)
'{"foo": "bar"}'
>>> json.dumps(json.dumps(my_dict))
'"{\\"foo\\": \\"bar\\"}"
If you were passing the second string in (instead of the first) - the server would json decode it once, but instead of being a dict as expected, it would now be a (still JSON-encoded) string
a

Arun Kumar

12/29/2021, 3:37 AM
I see. I tried to debug the serialization logic and the output of the serializer looked good to me.
{
  "variables": {
    "params": {
      "mode": "default",
      "runConfigData": "{\"ops\": {\"load_analysis\": {\"config\": {\"analysis_id\": \"b3e61740-ed8d-482b-943d-8ce354b11632\"}}}}",
      "selector": {
        "pipelineName": "analyses_exposures_loader",
        "repositoryLocationName": "metrics-repo",
        "repositoryName": "metrics-repo"
      }
    }
  },
.
>>> json.loads("{\"ops\": {\"load_analysis\": {\"config\": {\"analysis_id\": \"b3e61740-ed8d-482b-943d-8ce354b11632\"}}}}")
{'ops': {'load_analysis_exposures': {'config': {'analysis_id': 'b3e61740-ed8d-482b-943d-8ce354b11632'}}}}
>>>
Does the dagit server tries to parse the run config as a json string ? It looks like it expects the runconfig to be a dict and does not do a string to dict conversion. Even when I try to trigger the API from the playground, strings are not working
k

Keshav

12/29/2021, 4:27 AM
Hi @Arun Kumar. runConfigData must be a python dict here.
👍 2
d

daniel

12/29/2021, 4:33 AM
Yeah, runConfigData should be a dict (much like “selector” below). The double serialization likely happens when the whole “variables” dict is serialized to JSON to be sent over the network
a

Arun Kumar

12/29/2021, 4:59 AM
Since
RunConfigData
is typed as a
scalar
in graphQL, the client on our side assumes it to be a String by default. Even if the double serialization is not happening, I assume the run config will be sent as a string to the server and not sure if I understand where the str to dict conversion will happen. Am I missing something?
k

Keshav

12/29/2021, 5:58 AM
I am not familiar to kotlin but something like this similarly in python, If you are using DagsterGraphQLClient in your client then you can pass variables as below
client = DagsterGraphQLClient("url", port)
query_string = YOUR_QUERY_STRING
variable_dict = YOUR_VARIABLES_DICT
response = client._execute(query_string,variables=variable_dict)
If you are using something similar to requests then
query_string = YOUR_QUERY_STRING_INCLUDING_YOUR_VARIABLES
response = <http://requests.post|requests.post>("url", json={"query", query_string})
d

daniel

12/29/2021, 6:01 AM
Ah got it. Yeah, it looks like the graphql spec doesn't currently have consistent support for an 'arbitrary JSON object' type across all languages. We're using this GenericScalar type from graphene/python which allows you to pass in an Object rather than a String: https://github.com/graphql-python/graphene/blob/master/graphene/types/generic.py - but it doesn't seem to be consistently applied as part of the core spec (I see this other similar thing in JS: https://www.npmjs.com/package/graphql-type-json) It's possible your Kotlin GraphQL client doesn't have this as an option. It's true that the run config is sent in as a string in the sense that the whole request is JSON-serialized - but it isn't expecting the runConfigData field to still be JSON-serialized once that transport-level JSON deserialization happens. For example, that serializer output you sent earlier had a "selector" key with a dict value - if you changed that value to a JSON-serialized string instead of a dict, you'd get a similar error. (But selector has a specific constrained GraphQL Object type, so the client knows how to send it correctly) We have a bunch of graphql experts on the team (particularly Nick who co-created it 🙂 ), I may need to check with them when we're all back early next week to see how we should handle this for non-Python clients.
(but if your Kotlin client does have some way to let you pass in a dict or object representation of the run config rather than a JSON-serialized string, that would be the easiest way to fix this without needing any changes on our side)
a

Arun Kumar

12/29/2021, 9:39 AM
Thanks Daniel. I think I have figured out a way to set it as object instead of a JSON string and it is working now. However it was a lot of work. I had to create data classes for the run config structure and do the conversions for that particular scalar type.
Not sure if there is a much easier way. I also think this approach won't scale better for complex and more dynamic run configs. I would still prefer passing it as a valid JSON string which can be deserialized and validated on the Dagit server side. Let me know what you think
d

daniel

12/30/2021, 3:43 AM
Yeah, that looks like a pain, you shouldn't need to specify all the keys like that just to be able to send it over GraphQL. I think its reasonable to make runConfigData also accept a string, we should be able to have an answer for you next week when everybody's back. I also saw that the kotlin graphql docs linked to this project which does seem to have an object scalar type for java/kotlin? https://github.com/graphql-java/graphql-java-extended-scalars#object--json-scalars But that might be a pain to integrate
(found that here: https://opensource.expediagroup.com/graphql-kotlin/docs/schema-generator/writing-schemas/scalars/)
a

Arun Kumar

12/30/2021, 4:10 AM
Thanks Daniel. I followed the above approach from the same doc. But I did not use
graphql-java-extended-scalars
as it seems like it requires change in the graphQL schema itself (correct me if I am wrong). Even with that I might still have to create objects on my side to set to the run config scalar which would still require similar code.
This approach unblocks me for now. I will reach out next week again on alternatives. Thanks for the help Daniel 🙏
d

daniel

01/03/2022, 11:19 PM
Just to follow up - we're discussing some options to support this on https://github.com/dagster-io/dagster/pull/6071
:thankyou: 1
😛artydagster: 1
fix landed, will go live next Thursday
😛artydagster: 1
View count: 1