benyuel
07/15/2020, 7:20 PMUsers should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME
but make_airflow_dag
accepts a DagsterInstance
at https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-airflow/dagster_airflow/factory.py#L240-L245
Is there prior art or recommendations here? Would you advise against supplying a DagsterInstance
to make_airflow_dag
?Ryan Lewis
07/16/2020, 3:46 PMDanny
07/16/2020, 7:41 PMis_required=False
input from a solid that is itself skipped (due to it not getting an upstream solid's optional output), then dagster still skips the fan in solid. It seems this is what's happening, still debugging tho. Can anyone confirm? (Note: Other inputs to the fan in solid were supplied so it's not skipping because all inputs were not provided.)user
07/16/2020, 8:03 PMdaniel
07/16/2020, 8:06 PMDanny
07/16/2020, 9:14 PMdagster
db are running the query LISTEN "run_events"
. I think 100 is the max clients setting for the postgres docker image I'm running, so it might be opening as many as it can get. Is that normal?aqm
07/17/2020, 6:25 PMThis is not the "blessed" file format nor is it our recommendation for doing things.
Curious from the dagster team why declaring dependencies in this way isn't recommended. thanks!Simon Späti
07/20/2020, 10:24 AM0.7.x
.
I’m having two small things for now:
1. When I _from_ pyspark.sql.functions _import_ explode
I get module>\n from pyspark.sql.functions import explode\n File "<frozen importlib._bootstrap>", line 971, in _find_and_load\n File "<frozen importlib._bootstrap>", line 945, in _find_and_load_unlocked\nKeyError: \'pyspark.sql\'\n'
. Am I doing something wrong? I need explode in my pyspark code. Full StackTrace in thread.
2. I used some special Druid and Delta Types. I used the dict_with_fields
that is used in the S3Coordinate
here. I guess that’s not the way to go as I also had to manually import the module dict_with_fields. What would be the best way to define a Dict Type as the following:
DeltaCoordinate = dict_with_fields(
'DeltaCoordinate',
fields={
'database': Field(String, description='database or schema or delta table'),
'table_name': Field(String, description='table name of the delta table'),
's3_coordinate_bucket': Field(String, description='s3 bucket'),
's3_coordinate_key': Field(String, description='s3 delta table path'),
},
)
Paul Wyatt
07/20/2020, 9:34 PMalir
07/21/2020, 3:56 PMPOST
to the /graphql
endpoint. I'm on 0.8.1 now and it looks like if I issue multiple POST
requests at the same time (around 5 or so), then the server goes into some deadlock-ish state and refuses to accept any more connections. Dagit becomes completely unusable. But if I issue pipeline launch requests using websockets to the same endpoint, I have no issues. Does this sound like something that's unique to my setup or does it sound like there's a problem somewhere?aqm
07/21/2020, 9:16 PMsolid_selection
arg introduced to execute_pipeline() in 0.8! A question on how to use it:
If I pass a selection query of solid_in_middle_of_dag+
, it should execute solid_in_middle_of_dag
and it's immediate children. solid_in_middle_of_dag
depends on the output of parent solids. how can I specify the parent_run_id that should be used for these upstream dependencies? Is the default behavior to use the previous successful run's outputs?
reexcute_pipeline() explicitly takes a parent_run_id, but doesn't use the selection query language (i believe instead you provide a list of solid.compute steps)Gaetan DELBART
07/22/2020, 9:07 AM"message": null, "serializable_error_info": {"__class__": "SerializableErrorInfo", "cause": null, "cls_name": "DagsterInvalidConfigError", "message": "dagster.core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for <dagster.config.field_utils.Selector object at 0x7fa311c0c790>.\n Error 1: Post processing at path root:postgres_db:password of original value {'env': 'DAGSTER_PG_PASSWORD'} failed:\n(PostProcessingError) - dagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable \"DAGSTER_PG_PASSWORD\" which is not set. In order for this execution to succeed it must be set in this environment.\n\nStack Trace: \n File \"/usr/local/lib/python3.7/site-packages/dagster/config/post_process.py\", line 72, in _post_process\n new_value = context.config_type.post_process(config_value)\n File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 42, in post_process\n return str(_ensure_env_variable(cfg))\n File \"/usr/local/lib/python3.7/site-packages/dagster/config/source.py\", line 23, in _ensure_env_variable\n ).format(var=var)\n\n", "stack": [" File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/ipc.py\", line 116, in ipc_write_stream\n yield FileBasedWriteStream(file_path)\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/cli/api.py\", line 434, in launch_scheduled_execution\n instance = DagsterInstance.get()\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 255, in get\n return DagsterInstance.from_config(_dagster_home())\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 277, in from_config\n return DagsterInstance.from_ref(instance_ref)\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py\", line 285, in from_ref\n run_storage=instance_ref.run_storage,\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/core/instance/ref.py\", line 185, in run_storage\n return self.run_storage_data.rehydrate()\n", " File \"/usr/local/lib/python3.7/site-packages/dagster/serdes/__init__.py\", line 378, in rehydrate\n config_dict,\n"]}}
When I launch the sh
script manually, it works great (my user has access to the env variables injected by k8s)
So, I supposed it has to do with the "shell" used by cron
Has somenone got any lead on this, or manage to run scheduled pipeline successfully on k8s pod ?szalai1
07/22/2020, 1:57 PMschedule_storage: postgres
my problem:
• scheduled jobs don't start when they supposed to
• when I run dagster schedule up --location 'Business Intelligence'
I get this every time:
Errors Resolved:
Schedule daily_answer_position_pipeline is set to be running, but the scheduler is not running the schedule.
Schedule daily_events_rounds_pipeline is set to be running, but the scheduler is not running the schedule
I renamed the repo several times, now when I run dagster schedule debug
I can see schedules from those repos with status: RUNNING
what can be the problem?sephi
07/22/2020, 6:16 PMmaterialization
.
We are having difficulty finding the location of the Assets
in postgesql.
In the db we can see the following tables: snapshots, runs, run_tags, event_logs.
But we could not locate the assets in any of the tables.
Any assistance would be appreciated.Kaushik Visvanathan
07/22/2020, 7:02 PMOutputdefinition(Optional[float], is_required=False
) based on a condition. As i understand it - if the condition fails, all downstream solids dependent on this solids input are skipped and the pipeline runs to completion. The issue arises when calling the output_values
on the enclosing composite's execution result - it seems to throw an error complaining that output does not exist for some of these conditional solids, even if the outputs are marked as optional. Is this expected behavior? The outputs of the composite are marked as optional tooFran Sanchez
07/22/2020, 11:39 PMTraceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 970, in handle_one_response
self.run_application()
File "/usr/local/lib/python3.7/site-packages/geventwebsocket/handler.py", line 82, in run_application
self.process_result()
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 904, in process_result
self.write(data)
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 751, in write
self._write_with_headers(data)
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 772, in _write_with_headers
self._write(data)
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 734, in _write
self._sendall(data)
File "/usr/local/lib/python3.7/site-packages/gevent/pywsgi.py", line 708, in _sendall
self.socket.sendall(data)
File "/usr/local/lib/python3.7/site-packages/gevent/_socket3.py", line 533, in sendall
data_memory = _get_memory(data)
File "src/gevent/_greenlet_primitives.py", line 98, in gevent._gevent_c_greenlet_primitives.get_memory
File "src/gevent/_greenlet_primitives.py", line 121, in gevent._gevent_c_greenlet_primitives.get_memory
File "src/gevent/_greenlet_primitives.py", line 109, in gevent._gevent_c_greenlet_primitives.get_memory
TypeError: memoryview: a bytes-like object is required, not 'str'
I think it might have something to do with the websockets and the router but I'm not sure... On the browser console I can see failed: Error during WebSocket handshake: Unexpected response code: 402
, also in the proxy logs I can see upstream prematurely closed connection while reading upstream
Ofer Caspi
07/23/2020, 9:35 AMsimplejson
, the celery worker fails, and I can’t seem to be able to extract meaningful information from the trace. It reproduces on several (MacOS Catalina) machines with both python 3.8 and 3.6, and having only dagster, dagster-celery, dagit and simplejson installed.
I couldn’t find any reference online, so I suspect it isn’t a known issue - is it?
I think it has to do with some config ser/de being miswritten/read by the worker - I’ll really appreciate your input.
Here’s the worker trace:
[2020-07-22 17:36:50,361: INFO/MainProcess] Received task: execute_plan[a833e3b9-3e70-4733-bc44-07b2eb611f2c]
[2020-07-22 17:36:50,425: ERROR/ForkPoolWorker-2] Task execute_plan[a833e3b9-3e70-4733-bc44-07b2eb611f2c] raised unexpected: CheckError("Failure condition: Couldn't import module module_name when attempting to rehydrate the configurable class module_name.class_name")
Traceback (most recent call last):
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 343, in rehydrate
module = importlib.import_module(self.module_name)
File "/Users/ofer/.pyenv/versions/3.8.3/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'module_name'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/private/tmp/venv/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/private/tmp/venv/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/private/tmp/venv/lib/python3.8/site-packages/dagster_celery/tasks.py", line 23, in _execute_plan
instance = DagsterInstance.from_ref(instance_ref)
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 284, in from_ref
local_artifact_storage=instance_ref.local_artifact_storage,
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 181, in local_artifact_storage
return self.local_artifact_storage_data.rehydrate()
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 345, in rehydrate
check.failed(
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/check/__init__.py", line 109, in failed
raise_with_traceback(CheckError('Failure condition: {desc}'.format(desc=desc)))
File "/private/tmp/venv/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "/private/tmp/venv/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 343, in rehydrate
module = importlib.import_module(self.module_name)
File "/Users/ofer/.pyenv/versions/3.8.3/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
dagster.check.CheckError: Failure condition: Couldn't import module module_name when attempting to rehydrate the configurable class module_name.class_name
Fran Sanchez
07/23/2020, 5:59 PMrun_launcher
section in my dagster.yaml
but I don't see any error or anything...yuhan
07/24/2020, 1:37 AMaqm
07/24/2020, 3:04 AMCris
07/24/2020, 3:37 PMDanny
07/24/2020, 4:43 PM$ pip install dagster-prometheus==0.8.9
ERROR: Could not find a version that satisfies the requirement dagster-prometheus==0.8.9
Release glitch?Ryan Lewis
07/24/2020, 7:22 PMKen
07/27/2020, 10:57 PMTitouan
07/28/2020, 9:03 AMFROM python:3.7.5-slim-stretch
ADD . .
# Set timezone
ENV TZ=Europe/Paris
ENV CRON_TZ=Europe/Paris
RUN apt-get update -yqq && apt-get install -yqq cron gcc git tzdata
You can see we change the timezone in the container to fit with our timezone (and scheduled pipelines are executed at the right hour).
It works great, and the timezone is apply at container wide:
$ docker exec -it data_pipelines_dagit_1 /bin/bash
root@523fbfca03d1:/tmp# date
Tue Jul 28 10:31:26 CEST 2020
In the UI (dagit), in the run list, overview, scheduled run, it displays the datetime in the correct timezone (no offset) 👌
But in the run logs, there is the UTC offset (date is display in UTC). Seems a known issue -> https://github.com/dagster-io/dagster/issues/1750
Add to that, it could be nice to be able to see the current date in Dagit (in the footer or in instance settings).
Let me know if you need more feedback on this (perhaps I could help contribute on an How to section of the documentation to explain how to configure the timezone for dagster) 👌.
Just sharing some ❤️ - Great jobs BTW, love Dagster and using it with my team in production!Richard Brady
07/29/2020, 8:01 AMimport dagstermill as dm
sum_num = 2 + 2
print(sum_num)
dm.yield_result(sum_num, 'sum_num')
And below is my script to create and execute a reconstructable pipline.
hello = dm.define_dagstermill_solid(
'Hello', script_relative_path('hello_world.ipynb'), output_defs=[OutputDefinition(name="sum_num")]
)
solid_defs = [hello]
def make_backward_pipeline():
print(solid_defs)
return PipelineDefinition(name="_pipeline", solid_defs=solid_defs)
execute_pipeline(reconstructable(make_backward_pipeline))
Exception below dagster.check.CheckError: Failure condition: Can not produce an instance reference for <dagster.core.instance.DagsterInstance object at 0x7f4d0e01d2d0>
I can't find much in the documentation so am hoping someone can help. I have tested the solid using dagit and changing the reconstructable pipeline to a decorated @pipline function and it seems to run OK, so I am sure it is a problem with my PipelineDefinition. Any ideas?wbonelli
07/29/2020, 4:39 PMreconstructable
can only be applied to module-level functions with no arguments. Writing/reading a temp file works but I'd rather not if there's another way (and definitely looking forward to 0.9.0 if it addresses dynamic DAGs!)QianXiao
07/30/2020, 1:31 PMuser
07/31/2020, 12:04 AMsashank
07/31/2020, 12:07 AM