Adam Bloom
10/19/2022, 6:33 PMdaniel
10/19/2022, 6:34 PMAdam Bloom
10/19/2022, 6:36 PMdaniel
10/19/2022, 6:39 PMAdam Bloom
10/19/2022, 6:40 PMdaniel
10/19/2022, 6:41 PMAdam Bloom
10/19/2022, 6:43 PMdaniel
10/19/2022, 6:43 PMAdam Bloom
10/19/2022, 6:44 PMdaniel
10/19/2022, 6:44 PMAdam Bloom
10/19/2022, 6:45 PMdaniel
10/19/2022, 6:45 PMAdam Bloom
10/19/2022, 6:46 PMdaniel
10/19/2022, 6:47 PMsensors:
# Whether to evaluate sensors using an asynchronous thread pool. Defaults to false
useThreads: false
# The max number of worker threads to use when asynchronously evaluating sensors. Will use the
# default value used by Python, which depends on the number of cores available.
numWorkers: ~
Adam Bloom
10/19/2022, 6:48 PMdaniel
10/19/2022, 6:50 PMAdam Bloom
10/19/2022, 6:51 PMdaniel
10/19/2022, 6:51 PMAdam Bloom
10/19/2022, 6:52 PMdaniel
10/19/2022, 6:52 PMAdam Bloom
10/19/2022, 6:53 PMdaniel
10/19/2022, 6:53 PMAdam Bloom
10/19/2022, 6:54 PMdaniel
10/19/2022, 6:54 PMAdam Bloom
10/19/2022, 6:55 PMdaniel
10/19/2022, 6:56 PMAdam Bloom
10/19/2022, 6:56 PM[dagster-daemon-8959d56bb-lq8s5 dagster] 2022-10-19 19:00:59 +0000 - dagster.daemon.SensorDaemon - INFO - Checking for new runs for sensor: slack_on_run_failure
[dagster-daemon-8959d56bb-lq8s5 dagster] 2022-10-19 19:01:00 +0000 - dagster.daemon.SensorDaemon - INFO - Sensor slack_on_run_failure skipped: Sensor function returned an empty result
daniel
10/19/2022, 7:03 PMAdam Bloom
10/19/2022, 7:05 PMdaniel
10/19/2022, 7:10 PMAdam Bloom
10/19/2022, 7:11 PMdaniel
10/19/2022, 7:12 PM[dagster-daemon-8959d56bb-lq8s5 dagster] res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
line in the stack trace?Adam Bloom
10/19/2022, 7:13 PMdaniel
10/19/2022, 7:13 PMAdam Bloom
10/19/2022, 7:14 PM[dagster-daemon-8959d56bb-lq8s5 dagster] 2022-10-19 19:12:49 +0000 - dagster.daemon.SensorDaemon - ERROR - Sensor daemon caught an error for sensor dbt_test_and_doc_vault : dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server
[dagster-daemon-8959d56bb-lq8s5 dagster]
[dagster-daemon-8959d56bb-lq8s5 dagster] Stack Trace:
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 481, in _process_tick_generator
[dagster-daemon-8959d56bb-lq8s5 dagster] sensor_debug_crash_flags,
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_daemon/sensor.py", line 546, in _evaluate_sensor
[dagster-daemon-8959d56bb-lq8s5 dagster] instigator_data.cursor if instigator_data else None,
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_core/host_representation/repository_location.py", line 817, in get_external_sensor_execution_data
[dagster-daemon-8959d56bb-lq8s5 dagster] cursor,
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_api/snapshot_sensor.py", line 61, in sync_get_external_sensor_execution_data_grpc
[dagster-daemon-8959d56bb-lq8s5 dagster] cursor=cursor,
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 375, in external_sensor_execution
[dagster-daemon-8959d56bb-lq8s5 dagster] sensor_execution_args
[dagster-daemon-8959d56bb-lq8s5 dagster] File "/usr/local/lib/python3.7/site-packages/dagster/_grpc/client.py", line 166, in _streaming_query
[dagster-daemon-8959d56bb-lq8s5 dagster] raise DagsterUserCodeUnreachableError("Could not reach user code server") from e
[dagster-daemon-8959d56bb-lq8s5 dagster]
[dagster-daemon-8959d56bb-lq8s5 dagster] The above exception was caused by the following exception:
[dagster-daemon-8959d56bb-lq8s5 dagster] grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
[dagster-daemon-8959d56bb-lq8s5 dagster] status = StatusCode.DEADLINE_EXCEEDED
[dagster-daemon-8959d56bb-lq8s5 dagster] details = "Deadline Exceeded"
[dagster-daemon-8959d56bb-lq8s5 dagster] debug_error_string = "{"created":"@1666206767.859389389","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
[dagster-daemon-8959d56bb-lq8s5 dagster] >
daniel
10/19/2022, 7:14 PMAdam Bloom
10/19/2022, 7:15 PM@multi_asset_sensor(
asset_keys=asset_keys,
job=dbt_test_doc_job_factory(config, environment, schema_cfg),
name="dbt_test_and_doc_vault" if config.is_vault else f"dbt_test_and_doc_{config.schema}",
default_status=DefaultSensorStatus.RUNNING,
)
def dbt_test_docs_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
yield RunRequest()
else:
return SkipReason("Waiting for new dbt assets")
daniel
10/19/2022, 7:15 PMAdam Bloom
10/19/2022, 7:16 PMdaniel
10/19/2022, 7:19 PMAdam Bloom
10/19/2022, 7:19 PMalex
10/19/2022, 7:20 PMAdam Bloom
10/19/2022, 7:21 PM[dagster-daemon-559fb67f7f-4rhv8 dagster] 2022-10-19 19:26:52 +0000 - dagster.daemon.SensorDaemon - INFO - Checking for new runs for sensor: dbt_test_and_doc_vault
[dagster-daemon-559fb67f7f-4rhv8 dagster] 2022-10-19 19:26:53 +0000 - dagster.daemon.SensorDaemon - INFO - Sensor dbt_test_and_doc_vault skipped: Sensor function returned an empty result
daniel
10/19/2022, 7:28 PMAdam Bloom
10/19/2022, 7:33 PMdaniel
10/19/2022, 7:33 PMAdam Bloom
10/19/2022, 7:34 PMminimum_interval_seconds
at defaults - I could try setting those higher. but these queries should also not be taking this longdaniel
10/19/2022, 7:34 PMAdam Bloom
10/19/2022, 7:35 PMLimit (cost=0.44..7277.46 rows=1 width=595) (actual time=101759.561..101759.562 rows=0 loops=1)
-> Index Scan Backward using event_logs_pkey on event_logs (cost=0.44..2910810.03 rows=400 width=595) (actual time=101759.560..101759.560 rows=0 loops=1)
Filter: ((dagster_event_type = 'ASSET_MATERIALIZATION'::text) AND ((asset_key = '["<key_part_1>", "<key_part_2>"]'::text) OR (asset_key = '<key>'::text)))
Rows Removed by Filter: 23635163
Planning Time: 0.102 ms
Execution Time: 101759.581 ms
daniel
10/19/2022, 7:35 PMAdam Bloom
10/19/2022, 7:35 PMalex
10/19/2022, 7:35 PMdagster instance migrate
/ dagster instance reindex
?sandy
10/19/2022, 7:36 PMbuild_asset_reconciliation_sensor
?Adam Bloom
10/19/2022, 7:36 PMSELECT event_logs.id, event_logs.event
FROM event_logs
WHERE event_logs.dagster_event_type = 'ASSET_MATERIALIZATION' AND (event_logs.asset_key = '["<key_part_1>", "<key_part_2>"]' OR event_logs.asset_key = '<key_part_1>.<key_part_2>') ORDER BY event_logs.id DESC
LIMIT 1;
daniel
10/19/2022, 7:36 PMAdam Bloom
10/19/2022, 7:36 PMdagster_event_type
and event id
- curious. I bet we can fix this with a new index. kinda tempted to add one just to see what happens if I add an asset_key index that is restricted to ASSET_MATERIALIZATION event typesalex
10/19/2022, 7:46 PMcreate index concurrently test_events_by_asset_idx on event_logs (asset_key, id) where asset_key is not null;
sandy
10/19/2022, 7:46 PMalex
10/19/2022, 7:48 PMid
should be in the asset key index to be able to do ordered cursored lookupsAdam Bloom
10/19/2022, 7:48 PMCREATE INDEX CONCURRENTLY idx_asset_key_materializations ON event_logs (asset_key) WHERE dagster_event_type='ASSET_MATERIALIZATION' AND asset_key IS NOT NULL;
now, but that doesn't have the IDalex
10/19/2022, 8:04 PM\d event_logs
look like currently (assuming psql)?Adam Bloom
10/19/2022, 8:19 PMdagster=> \d event_logs
Table "public.event_logs"
Column | Type | Collation | Nullable | Default
--------------------+-----------------------------+-----------+----------+----------------------------------------
id | integer | | not null | nextval('event_logs_id_seq'::regclass)
run_id | character varying(255) | | |
event | text | | not null |
dagster_event_type | text | | |
timestamp | timestamp without time zone | | |
step_key | text | | |
asset_key | text | | |
partition | text | | |
Indexes:
"event_logs_pkey" PRIMARY KEY, btree (id)
"idx_asset_key" btree (asset_key)
"idx_asset_key_materializations" btree (id, asset_key) WHERE dagster_event_type = 'ASSET_MATERIALIZATION'::text AND asset_key IS NOT NULL INVALID
"idx_asset_partition" btree (asset_key, partition)
"idx_event_type" btree (dagster_event_type, id)
"idx_run_id" btree (run_id)
"idx_step_key" btree (step_key)
alex
10/19/2022, 8:21 PMreindex
to repair i think. Also I believe having id
first vs second may also be a factor so may want to try (asset_key, id)
if that you have doesn’t work.Adam Bloom
10/19/2022, 8:25 PMrebuild
locks, and dagster is still running, so I went with drop/re-create. trying that now. And back at all of you - super appreciate the support for an open source product!alex
10/19/2022, 8:44 PMexplain analyze
?Adam Bloom
10/19/2022, 8:45 PMLimit (cost=0.43..2172.80 rows=1 width=595) (actual time=4019.545..4019.546 rows=0 loops=1)
-> Index Scan Backward using idx_asset_key_materializations on event_logs (cost=0.43..864605.62 rows=398 width=595) (actual time=4019.544..4019.545 rows=0 loops=1)
Filter: ((asset_key = '["<key_part_1>", "<key_part_2>"]'::text) OR (asset_key = '<key_part_1>.<key_part_2>'::text))
Rows Removed by Filter: 3439164
Planning Time: 0.485 ms
Execution Time: 4019.567 ms
alex
10/19/2022, 8:47 PMAdam Bloom
10/19/2022, 8:48 PMLimit (cost=1612.63..1612.63 rows=1 width=595) (actual time=0.029..0.030 rows=0 loops=1)
-> Sort (cost=1612.63..1613.62 rows=398 width=595) (actual time=0.029..0.029 rows=0 loops=1)
Sort Key: id DESC
Sort Method: quicksort Memory: 25kB
-> Bitmap Heap Scan on event_logs (cost=28.29..1610.64 rows=398 width=595) (actual time=0.025..0.025 rows=0 loops=1)
Recheck Cond: (((asset_key = '["<key_part_1>", "<key_part_2>"]'::text) AND (dagster_event_type = 'ASSET_MATERIALIZATION'::text)) OR ((asset_key = '<key_part_1>.<key_part_2>'::text) AND (dagster_event_type = 'ASSET_MATERIALIZATION'::text)))
-> BitmapOr (cost=28.29..28.29 rows=398 width=0) (actual time=0.023..0.023 rows=0 loops=1)
-> Bitmap Index Scan on idx_asset_key_materializations_2 (cost=0.00..14.05 rows=199 width=0) (actual time=0.018..0.018 rows=0 loops=1)
Index Cond: (asset_key = '["<key_part_1>", "<key_part_2>"]'::text)
-> Bitmap Index Scan on idx_asset_key_materializations_2 (cost=0.00..14.05 rows=199 width=0) (actual time=0.005..0.005 rows=0 loops=1)
Index Cond: (asset_key = '<key_part_1>.<key_part_2>'::text)
Planning Time: 0.163 ms
Execution Time: 0.053 ms
alex
10/19/2022, 8:53 PMAdam Bloom
10/19/2022, 8:56 PMCREATE INDEX CONCURRENTLY idx_asset_key_materializations_2 ON event_logs (asset_key, id) WHERE dagster_event_type='ASSET_MATERIALIZATION' AND asset_key IS NOT NULL;