Chris Evans
09/14/2021, 5:11 PMmax_concurrent_runs
param, but for the life of me I can’t seem to get things to work with tag_concurrency_limits
. Do I have this test setup correctly?
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 2
tag_concurrency_limits:
- key: "test"
limit: 1
Sebitien
09/14/2021, 9:45 PMCameron Gallivan
09/15/2021, 12:22 AMCameron Gallivan
09/15/2021, 12:22 AMWARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
WARNING:root:Retrying failed database connection
2021-09-14 07:20:20 - SensorDaemon - ERROR - Sensor daemon caught an error for sensor dragen_postprocessing_sort_sqs_sensor : dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection
Stack Trace:
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 230, in execute_sensor_iteration
yield from _evaluate_sensor(
ERROR:SensorDaemon:Sensor daemon caught an error for sensor dragen_postprocessing_sort_sqs_sensor : dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection
Stack Trace:
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 230, in execute_sensor_iteration
yield from _evaluate_sensor(
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 114, in __exit__
self._write()
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 85, in _write
self._instance.update_job_tick(self._tick)
File "/opt/venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1529, in update_job_tick
return self._schedule_storage.update_job_tick(tick)
File "/opt/venv/lib/python3.8/site-packages/dagster/core/storage/schedules/sql_schedule_storage.py", line 196, in update_job_tick
with self.connect() as conn:
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/opt/venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 160, in create_pg_connection
conn = retry_pg_connection_fn(engine.connect)
File "/opt/venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 127, in retry_pg_connection_fn
raise DagsterPostgresException("too many retries for DB connection") from exc
The above exception was the direct cause of the following exception:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name "docker_postgresql" to address: Temporary failure in name resolution
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Stack Trace:
File "/opt/venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 116, in retry_pg_connection_fn
return fn()
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3166, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3245, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3215, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2069, in _handle_dbapi_exception_noconnection
util.raise_(
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3212, in _wrap_pool_connect
return fn()
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 307, in connect
return _ConnectionFairy._checkout(self)
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 767, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 425, in checkout
rec = pool._do_get()
File "/opt/venv/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 256, in _do_get
return self._create_connection()
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 114, in __exit__
self._write()
File "/opt/venv/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 85, in _write
self._instance.update_job_tick(self._tick)
File "/opt/venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1529, in update_job_tick
return self._schedule_storage.update_job_tick(tick)
File "/opt/venv/lib/python3.8/site-packages/dagster/core/storage/schedules/sql_schedule_storage.py", line 196, in update_job_tick
with self.connect() as conn:
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/opt/venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 160, in create_pg_connection
conn = retry_pg_connection_fn(engine.connect)
File "/opt/venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 127, in retry_pg_connection_fn
raise DagsterPostgresException("too many retries for DB connection") from exc
Cameron Gallivan
09/15/2021, 12:22 AMdocker-compose down
and the command times out as well.
This seems to only happen during sustained higher load times but I’m not sure if that is the sole/exclusive cause?Devaraj Nadiger
09/15/2021, 7:26 AMNavneet Sajwan
09/15/2021, 10:14 AMSebastian Napiorkowski
09/15/2021, 3:42 PM@solid(input_defs=[InputDefinition(name="a_list")],
output_defs=[DynamicOutputDefinition(dagster_type=dagster.Any)])
def emit_tasks_for_list(context, a_list: typing.List[typing.Any]):
for num, item in enumerate(a_list):
yield DynamicOutput(item, mapping_key=f'subtask_{num}')
it results in a weird grpc error, recently I've made the list general "[Any]", beforehand it was working with "[str]" any ideas:Chris Evans
09/15/2021, 5:45 PM@solid
def start_after():
return
@solid(required_resource_keys={"foo"}, input_defs=[InputDefinition("start_after", Nothing),])
def solid_requires_foo(context):
return f"found {context.resources.foo}"
def test_solid_with_context():
context = build_solid_context(resources={"foo": "bar"})
assert solid_requires_foo(context, start_after=start_after()) == "found bar"
This results in a error:
E TypeError: solid_requires_foo() got an unexpected keyword argument 'start_after'
Arun Kumar
09/15/2021, 7:41 PMNavneet Sajwan
09/15/2021, 8:09 PMChris Chan
09/15/2021, 9:17 PMlog_configured = json_console_logger.configured({
"log_level": "INFO"
})
my_job = my_graph.to_job(
logger_defs={'my_logger': log_configured}
)
It seems like my_logger
doesn’t do anything unless a config is explicitly provided in to_job
Nazar Lysenko
09/16/2021, 7:47 AMSebitien
09/16/2021, 9:18 AMSebastian Napiorkowski
09/16/2021, 10:30 AMimport dagster
from dagster import pipeline, solid, InputDefinition, DynamicOutputDefinition, DynamicOutput
@solid
def get_list():
return [1, 2, "3", {"foo": "bar"}]
@solid(input_defs=[InputDefinition(name="a_list", dagster_type=dagster.List[dagster.Any])],
output_defs=[DynamicOutputDefinition(dagster_type=dagster.Any)])
def emit_tasks_for_list(context, a_list):
for num, item in enumerate(a_list):
yield DynamicOutput(item, mapping_key=f'subtask_{num}')
@solid
def process_item(item):
return str(item)
@solid
def concat_results(all):
return " ".join(all)
@pipeline()
def dynamic_test():
subtasks_result = emit_tasks_for_list(a_list=get_list()).map(process_item)
concat_results(subtasks_result.collect())
But having the same code in a different context, ends in the gRPC error "Failure condition: Unexpected dynamic fan in dep created" any idea how to debug this? What does this code do: https://github.com/dagster-io/dagster/blob/8e95214331f94936dd9dbb8fc384979ed92fa34[…]2/python_modules/dagster/dagster/core/definitions/dependency.pyDavid Hyman
09/16/2021, 10:39 AMpropagate=False
... oops!
New logging feature works just fine; excellent, thank you 😁
(props to https://pypi.org/project/logging_tree/ for keeping me sane)Sebastian Napiorkowski
09/16/2021, 12:33 PMEdward Smith
09/16/2021, 3:51 PMJay Sharma
09/16/2021, 4:18 PMChris Chan
09/16/2021, 5:12 PMconsole
to each pipeline and have a run config set the log level to INFODavid Hyman
09/16/2021, 5:45 PMRohan Ahire
09/16/2021, 6:31 PMjay
09/16/2021, 7:12 PMrun_config
in to my pipeline as shown here: https://docs.dagster.io/concepts/configuration/config-schema
It has following snipped of code as an example:
execute_pipeline(
config_example_pipeline,
run_config={"solids": {"config_example_solid": {"config": {"iterations": 1}}}},
)
Following is my execute_pipeline
invocation:
execute_pipeline(
poc,
run_config={"solids": {"with_run_config_alias": {"config": {"s": "testStr"}}}},
)
Following is the pipeline:
@pipeline
def poc():
with_run_config_alias=with_run_config.alias("with_run_config_alias")
with_run_config_alias()
Following is the solid executed by the pipeline above:
@solid(config_schema={"s":str})
def with_run_config(context, s):
print("with_run_config s: ", s)
return to_str(s)
But I am getting this error:
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline poc
Error 1: Missing required config entry "inputs" at path root:solids:with_run_config. Sample config for missing entry: {'inputs': {'s': '<selector>'}}
Yassine Marzougui
09/16/2021, 7:19 PMsolid_selection
in dagit playground?Dan Stoner
09/16/2021, 7:50 PMdagit
without any additional args so I have been trying to get workspace.yaml
working but now I'm in what seems like Python import hell. I have an existing sample that works fine with dagit -f hello_dagster.py
and all I want to do is move that into a subdirectory and have it available when running just dagit
.Francis Niu
09/17/2021, 2:50 AMdocker run
command is a solid.zafar mahmood
09/17/2021, 1:27 PM@solid( required_resource_keys = {'foo_str' , 'bar_int'})
def solid_function (context, some_json):
var1 = context.resources.foo_str
var2 = context.resources.bar_int
### now test
def test_solid_function ():
### too build the context
from dagster import build_solid_context
context = build_solid_context(resources={"foo_str":"some string ... ",
"bar_int": 344335
})
solid_function(context=context , some_json= '{"name":"John", "age":30, "car":null}' )
can anyone share what i am doing wrong ? and getting the error ' DagsterInvalidInvocationError' Compute function of solid 'solid_function' has context argument, but no context was provided when invoking.Deveshi
09/17/2021, 2:22 PMEric Cheminot
09/17/2021, 3:28 PMYassine Marzougui
09/19/2021, 8:36 AM@root_input_manager(
version= dagster.hash_return_value() ???
)
def input_loader(_):
return get_data_from_kv_store('key')
or with type loaders:
@dagster_type_loader(
loader_version= dagster.hash_return_value()
...
)
def foo_loader(context, config_value):
get_data_from_kv_store(func(config_value))
or with solids:
@solid(
input_defs=[InputDefinition("foo", str), InputDefinition("foo", str)],
version=[
hash_solid_code(),
dagster.hash_inputs() ???
]
)
def some_solid(context, foo, bar):
...
That way, in a memorized run, even if the config and the code are still the same, a solid would be executed again if one of its inputs changes.
Is it maybe possible using `MemoizableIOManager`'s has_output
function? I can't figure out how to do it.