Brian Abelson
02/10/2021, 12:24 AM0.10.4
. Everything runs fine, except the scheduler seems to continually shut down after about 2-3 hours with the following error (pasted below). It seems that I have to restart the daemon continually to address this. is this normal? is there a way to suppress these errors? I'm invoking daagster-daemonn
via supervisord
with the simple run
commannd.
dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
with RepositoryLocationHandle.create_from_repository_location_origin(
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
self.grpc_server_process = GrpcServerProcess(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
self.server_process = open_server_process(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
wait_for_grpc_server(server_process, output_file)
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
raise DagsterIPCProtocolError(
daniel
02/10/2021, 12:38 AMBrian Abelson
02/10/2021, 12:39 AMStatus
window is seemingly stuck like this2021-02-10 00:41:32 - SchedulerDaemon - INFO - Checking for new runs for the following schedules: dbt_run_all, mysql_drupal_to_psql_warehouse_all_else, mysql_drupal_to_psql_warehouse_commerce_fields, mysql_drupal_to_psql_warehouse_commerce_core, mysql_drupal_to_psql_warehouse_ioby_sf, mysql_drupal_to_psql_warehouse_commerce_donations, mysql_drupal_to_psql_warehouse_match_programs, mysql_drupal_to_psql_warehouse_node, mysql_drupal_to_psql_warehouse_people, mysql_drupal_to_psql_warehouse_projects, mysql_drupal_to_psql_warehouse_revisions
ioby-data | 2021-02-09 19:41:38 2021-02-10 00:41:37 - dagster - INFO - system - 5ae2890f-3653-4763-bbf2-89f4f196936e - copy_mysql_drupal_tables_to_psql_warehouse - WRITING MYSQL ioby_sf_opportunities TO tmp_ioby_data_pipelines_etl_mysql_drupal_to_psql_warehouse_1326a.ioby_sf_opportunities IN WAREHOUSE
ioby-data | 2021-02-09 19:41:48 2021-02-10 00:41:48 - SchedulerDaemon - ERROR - Scheduler failed for dbt_run_all : dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
ioby-data | 2021-02-09 19:41:48
ioby-data | 2021-02-09 19:41:48 Stack Trace:
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
ioby-data | 2021-02-09 19:41:48 with RepositoryLocationHandle.create_from_repository_location_origin(
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
ioby-data | 2021-02-09 19:41:48 return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
ioby-data | 2021-02-09 19:41:48 self.grpc_server_process = GrpcServerProcess(
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
ioby-data | 2021-02-09 19:41:48 self.server_process = open_server_process(
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
ioby-data | 2021-02-09 19:41:48 wait_for_grpc_server(server_process, output_file)
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
ioby-data | 2021-02-09 19:41:48 event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
ioby-data | 2021-02-09 19:41:48 messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
ioby-data | 2021-02-09 19:41:48 File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
ioby-data | 2021-02-09 19:41:48 raise DagsterIPCProtocolError(
ioby-data | 2021-02-09 19:41:48
dagit
+ dagster-daemonn
, since im running both via supervisord
daniel
02/10/2021, 12:47 AMBrian Abelson
02/10/2021, 12:51 AMdaniel
02/10/2021, 12:56 AMBrian Abelson
02/10/2021, 1:04 AMdaniel
02/10/2021, 1:11 AMBrian Abelson
02/10/2021, 1:16 AMOpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k
daniel
02/10/2021, 1:32 AMBrian Abelson
02/10/2021, 2:14 PMOpenBLAS
warning seems to be associated with OOM errors in other cases, but cant be sure since some things continue to run?daniel
02/10/2021, 2:58 PMBrian Abelson
02/10/2021, 3:13 PMdagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
?daniel
02/10/2021, 3:23 PMBrian Abelson
02/10/2021, 4:45 PMQueuedRunCoordinator
to ensure that there are never be too many jobs running at once?daniel
02/10/2021, 4:46 PMBrian Abelson
02/10/2021, 5:04 PMdaniel
02/10/2021, 5:13 PMBrian Abelson
02/10/2021, 10:23 PMQueuedRunCoordinator
, i still got CPU spikes and the scheduler failed as before. im now seeing whether maybe supervisor is the culprit. if not that, i may just try the CronScheduling option...daniel
02/10/2021, 10:25 PMBrian Abelson
02/10/2021, 10:33 PMTimeout: read stream has not received any data in 15 seconds
ioby-data | 2021-02-10 17:33:15
ioby-data | 2021-02-10 17:33:15 Stack Trace:
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
ioby-data | 2021-02-10 17:33:15 with RepositoryLocationHandle.create_from_repository_location_origin(
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
ioby-data | 2021-02-10 17:33:15 return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
ioby-data | 2021-02-10 17:33:15 self.grpc_server_process = GrpcServerProcess(
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
ioby-data | 2021-02-10 17:33:15 self.server_process = open_server_process(
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
ioby-data | 2021-02-10 17:33:15 wait_for_grpc_server(server_process, output_file)
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
ioby-data | 2021-02-10 17:33:15 event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
ioby-data | 2021-02-10 17:33:15 messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
ioby-data | 2021-02-10 17:33:15 File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
ioby-data | 2021-02-10 17:33:15 raise DagsterIPCProtocolError(
ioby-data | 2021-02-10 17:33:15
daniel
02/10/2021, 10:35 PMBrian Abelson
02/10/2021, 10:36 PM6 2021-02-10 22:35:16 - dagster-daemon - ERROR - Caught error in DaemonType.QUEUED_RUN_COORDINATOR:
ioby-data | 2021-02-10 17:35:16 SerializableErrorInfo(message='dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds\n', stack=[' File "/usr/local/lib/python3.8/site-packages/dagster/daemon/controller.py", line 117, in run_iteration\n error_info = check.opt_inst(next(generator), SerializableErrorInfo)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 172, in run_iteration\n self._dequeue_run(run, location_manager)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 206, in _dequeue_run\n external_pipeline = location_manager.get_external_pipeline_from_run(run)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 97, in get_external_pipeline_from_run\n ] = RepositoryLocationHandle.create_from_repository_location_origin(\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin\n return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__\n self.grpc_server_process = GrpcServerProcess(\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__\n self.server_process = open_server_process(\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process\n wait_for_grpc_server(server_process, output_file)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server\n event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response\n messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))\n', ' File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream\n raise DagsterIPCProtocolError(\n'], cls_name='DagsterIPCProtocolError', cause=None)
daniel
02/10/2021, 10:39 PMBrian Abelson
02/10/2021, 10:40 PMdaniel
02/10/2021, 10:41 PMBrian Abelson
02/10/2021, 10:42 PMdaniel
02/10/2021, 10:45 PMBrian Abelson
02/10/2021, 10:52 PMdaniel
02/10/2021, 10:55 PMBrian Abelson
02/10/2021, 10:57 PMhtop
on the node seems to indicate that all dagster-related python
processes are running at 800% CPUdaniel
02/11/2021, 3:33 PMBrian Abelson
02/11/2021, 3:48 PMsupervisor
and nginx
daniel
02/11/2021, 3:57 PMBrian Abelson
02/11/2021, 3:58 PMpy-spy top --pid 12345
?daniel
02/11/2021, 4:00 PMBrian Abelson
02/11/2021, 4:02 PMdaniel
02/11/2021, 4:04 PMBrian Abelson
02/11/2021, 4:06 PMdaniel
02/11/2021, 4:09 PMBrian Abelson
02/11/2021, 4:12 PMUSER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.0 0.3 13540 7032 ? Ss 16:03 0:00 /bin/bash /opt/dagster/ops/entrypoint.sh
root 1088 6.4 9.5 1301460 199884 ? Sl 16:06 0:15 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmpasi8
root 11 0.4 1.2 34632 25512 ? S 16:03 0:01 /usr/bin/python2 /usr/bin/supervisord -c /etc/supervisor/co
root 1162 0.1 0.6 21480 14076 ? S 16:06 0:00 /usr/local/bin/python -c from multiprocessing.resource_trac
root 1163 107 25.8 1165684 542872 ? Rl 16:06 4:04 /usr/local/bin/python -c from multiprocessing.spawn import
root 1261 0.2 0.2 12132 4404 ? S 16:07 0:00 tail -F -c +0 /opt/dagster/storage/4551b2d6-bc6f-4965-b587-
root 1262 0.2 0.5 19060 11648 ? S 16:07 0:00 /usr/local/bin/python /usr/local/lib/python3.8/site-package
root 1263 0.2 0.2 12132 4528 ? S 16:07 0:00 tail -F -c +0 /opt/dagster/storage/4551b2d6-bc6f-4965-b587-
root 1265 0.3 0.6 19060 12624 ? S 16:07 0:00 /usr/local/bin/python /usr/local/lib/python3.8/site-package
root 14 0.2 3.1 118892 65436 ? S 16:03 0:00 nginx: master process /usr/sbin/nginx -g daemon off;
root 15 0.0 0.2 13540 6040 ? S 16:03 0:00 /bin/bash ops/start-dagit.sh
root 1503 0.2 0.2 13804 5852 ? Ss 16:07 0:00 bash
root 16 0.0 0.3 13540 6912 ? S 16:03 0:00 /bin/bash ops/start-dagster-daemon.sh
root 17 10.1 6.0 683080 126244 ? Sl 16:03 0:41 /usr/local/bin/python /usr/local/bin/dagit -h 0.0.0.0 -p 30
root 18 9.4 4.9 624816 104088 ? Sl 16:03 0:38 /usr/local/bin/python /usr/local/bin/dagster-daemon run
root 19 0.0 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 20 0.0 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 21 0.0 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 22 0.0 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 23 0.0 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 2351 2.3 0.3 13804 7396 ? Ss 16:10 0:00 bash
root 24 0.0 2.2 119236 47148 ? S 16:03 0:00 nginx: worker process
root 25 0.1 2.2 119236 47208 ? S 16:03 0:00 nginx: worker process
root 26 0.0 2.2 119236 47144 ? S 16:03 0:00 nginx: worker process
root 2660 0.0 0.0 0 0 ? Z 16:10 0:19 [python] <defunct>
root 2853 0.0 9.4 1312028 198292 ? Sl 16:11 0:12 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmpd6mm
root 2926 0.0 0.3 17444 7448 ? R 16:11 0:00 ps aux
root 35 7.6 10.0 1407660 210488 ? Sl 16:04 0:30 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmph9_l
root 48 0.0 0.3 13804 6944 ? Ss 16:04 0:00 bash
root 725 14.6 0.2 13196 4676 ? R 16:05 0:42 htop
/usr/local/bin/python -c from multiprocessing.spawn import
daniel
02/11/2021, 4:14 PMBrian Abelson
02/11/2021, 4:14 PMpy-spy
can't find these PIDs, it returns Error: No such file or directory (os error 2)
daniel
02/11/2021, 4:21 PMBrian Abelson
02/11/2021, 4:24 PMroot 8924 46.5 11.8 861680 241936 ? Sl 16:16 2:32 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=14, pipe_handle=16) --multiprocessing-fork
daniel
02/11/2021, 4:25 PMBrian Abelson
02/11/2021, 4:30 PMCould you please mention what would be size of temporary files that Dagster writes to disk. There is a 2GiB limit on ephemeral disk storage within the App container. If it's filling up the temporary files the App might be restarted.
INFO
and print out a lot of information.daniel
02/11/2021, 5:35 PMBrian Abelson
02/11/2021, 5:37 PMfrom dagster import pipeline, solid, Output, OutputDefinition, InputDefinition
from ioby_data import modes, solids
from ioby_data.utils import sql
PIPELINE = __name__.replace(".", "_").lower()
@solid(
required_resource_keys={"mysql_drupal"},
config_schema={
"from_schema": str,
"exclude": list,
"include": list,
},
output_defs=[
OutputDefinition(
dagster_type=dict,
name="table_info",
description="Information about the mysql tables selected",
)
],
)
def get_mysql_drupal_tables_to_sync(context):
"""
Fetch a list of table names inside a schema, optionally excluding some
"""
# build exclude table clauses
exclude = context.solid_config["exclude"]
if len(exclude):
exclude_table_clauses = "\n AND ".join(
[f"table_name NOT LIKE '{exc}'" for exc in exclude if exc.strip() != ""]
)
else:
exclude_table_clauses = "1=1"
# build include table clauses
include = context.solid_config["include"]
if len(include):
include_table_clauses = "\n OR ".join(
[
f"table_name LIKE '{inc}'"
for inc in context.solid_config["include"]
if inc.strip() != ""
]
)
else:
include_table_clauses = "1=1"
sql = f"""
SELECT
table_name,
table_rows
FROM
information_schema.tables
WHERE
table_rows > 0
AND table_schema='{context.solid_config['from_schema']}'
AND {exclude_table_clauses}
AND (
{include_table_clauses}
)
ORDER BY RAND()
"""
<http://context.log.info|context.log.info>(f"Running query: {sql}")
df = context.resources.mysql_drupal.df_from_query(sql)
mysql_tables = {
row.table_name: {"num_rows": row.table_rows, "part": (idx % 10) + 1}
for idx, row in df.iterrows()
}
<http://context.log.info|context.log.info>(f"RETRIEVED {len(mysql_tables.keys())} MYSQL TABLES")
table_info = {
"from_schema": context.solid_config["from_schema"],
"tables": mysql_tables,
}
yield Output(table_info, "table_info")
@solid(
required_resource_keys={"psql_warehouse", "mysql_drupal"},
config_schema={
"limit": int,
},
input_defs=[
InputDefinition(
dagster_type=dict,
name="table_info",
description="Information about the mysql tables selected",
)
],
output_defs=[
OutputDefinition(
dagster_type=list,
name="tables",
description="A list of temp tables created in the warehouse",
)
],
)
def copy_mysql_drupal_tables_to_psql_warehouse(context, table_info):
# setup warehouse access
wh = context.resources.psql_warehouse
mysql = context.resources.mysql_drupal
# setup schema to copy the table to
from_schema = table_info["from_schema"]
input_tables = table_info["tables"]
tmp_schema = sql.gen_temp_schema(PIPELINE)
<http://context.log.info|context.log.info>(f"CREATING TMP SCHEMA: {tmp_schema}")
wh.create_schema(tmp_schema)
# copy the tables
output_tables = []
for table, table_info in input_tables.items():
<http://context.log.info|context.log.info>(f"FETCHING SCHEMA FOR {table}")
column_schema = mysql.get_table_column_schema(table)
create_table_stmt = wh.create_table(table, tmp_schema, column_schema)
<http://context.log.info|context.log.info>(f"SUCCESSFULLY RAN IN WAREHOUSE:\n{create_table_stmt}")
<http://context.log.info|context.log.info>(f"LOADING MYSQL TABLE {table} INTO WAREHOUSE")
# export mysql table to csv
limit_stmt = ""
if context.solid_config["limit"] > 0:
limit_stmt = f"LIMIT {context.solid_config['limit']}"
rows = mysql.execute(
f"""
SELECT * FROM {table} {limit_stmt}
"""
)
<http://context.log.info|context.log.info>(f"WRITING MYSQL {table} TO {tmp_schema}.{table} IN WAREHOUSE")
wh.insert_rows_to_table(rows, table, tmp_schema)
<http://context.log.info|context.log.info>(f"FINISHED LOADING TABLE {table}")
output_tables.append(f"{tmp_schema}.{table}")
yield Output(output_tables, "tables")
@solid(
required_resource_keys={"psql_warehouse"},
config_schema={
"to_schema": str,
},
input_defs=[
InputDefinition(
dagster_type=list,
name="tables",
description="A list of temp tables created in the warehouse",
)
],
)
def replace_existing_tables_with_new_tables(context, tables):
# setup warehouse access
wh = context.resources.psql_warehouse
dest_schema = context.solid_config["to_schema"]
for src_table in tables:
# format src/ dest table names and schema
src_schema, table_name = src_table.split(".")
dest_table = f"{dest_schema}.{table_name}"
<http://context.log.info|context.log.info>(f"Replacing {dest_table} with {src_table}")
# swap table in a single transaction
wh.swap_table(src_table, dest_table)
# drop the temp schema for this operation.
wh.drop_schema(src_schema)
@pipeline(mode_defs=[modes.DEFAULT])
def mysql_drupal_to_psql_warehouse():
table_info = get_mysql_drupal_tables_to_sync()
tables = copy_mysql_drupal_tables_to_psql_warehouse(table_info)
replace_existing_tables_with_new_tables(tables)
dagster.yaml
# ==================================================================================================
# Run Storage
# ==================================================================================================
# Controls how the history of runs is persisted. Can be set to SqliteRunStorage (default) or
# PostgresRunStorage.
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
username:
env: IOBY_DAGSTER_DB_USERNAME
password:
env: IOBY_DAGSTER_DB_PASSWORD
hostname:
env: IOBY_DAGSTER_DB_HOST
db_name:
env: IOBY_DAGSTER_DB_NAME
port:
env: IOBY_DAGSTER_DB_PORT
# ==================================================================================================
# Event Log Storage
# ==================================================================================================
# Controls how the structured event logs produced by each run are persisted. Can be set to
# SqliteEventLogStorage (default) or PostgresEventLogStorage.
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
username:
env: IOBY_DAGSTER_DB_USERNAME
password:
env: IOBY_DAGSTER_DB_PASSWORD
hostname:
env: IOBY_DAGSTER_DB_HOST
db_name:
env: IOBY_DAGSTER_DB_NAME
port:
env: IOBY_DAGSTER_DB_PORT
# ==================================================================================================
# Scheduler
# ==================================================================================================
# Provides an optional scheduler which controls execution of pipeline runs at regular intervals.
# We recommend using the default DagsterDaemonScheduler - SystemCronScheduler and K8sScheduler are
# also available but are deprecated.
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
# ==================================================================================================
# Schedule Storage
# ==================================================================================================
# Controls the backing storage used by the scheduler to manage the state of schedules and persist
# records of attempts.
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
username:
env: IOBY_DAGSTER_DB_USERNAME
password:
env: IOBY_DAGSTER_DB_PASSWORD
hostname:
env: IOBY_DAGSTER_DB_HOST
db_name:
env: IOBY_DAGSTER_DB_NAME
port:
env: IOBY_DAGSTER_DB_PORT
# ==================================================================================================
# Run Launcher
# ==================================================================================================
# Component that determines where runs are executed.
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
# ==================================================================================================
# Run Coordinator
# ==================================================================================================
# Determines the policy used to determine the prioritization rules and concurrency limits for runs.
# Can be set to DefaultRunCoordinator (default) or QueuedRunCoordinator when you want to maintain
# limits on the number of runs that can be executing at once.
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 2
telemetry:
enabled: false