https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Chris Evans

08/10/2022, 3:37 PM
Hi all, Have been experiencing Dagit GraphQL errors around statement timeouts for a few weeks now across a few different operations (current version 1.0.1). Any suggestions on how best to handle these types of performance issues? Reindexing perhaps?
Copy code
Operation name: ScheduleRootQuery

Message: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout

[SQL: SELECT runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time 
FROM runs JOIN run_tags ON runs.run_id = run_tags.run_id 
WHERE run_tags.key = %(key_1)s AND run_tags.value = %(value_1)s OR run_tags.key = %(key_2)s AND run_tags.value = %(value_2)s GROUP BY runs.run_body, runs.id 
HAVING count(runs.run_id) = %(count_1)s ORDER BY runs.id DESC 
 LIMIT %(param_1)s]
c

claire

08/10/2022, 5:44 PM
Hi Chris. Where else are you getting these Dagit GraphQL errors? Would you mind pasting the error traces wherever you are receiving them?
For now, something you could try is increasing the statement timeout limit (
--db-statement-timeout
on the Dagit cli)
c

Chris Evans

08/10/2022, 6:08 PM
Hi Claire, thanks for the response. That sounds like a place to start. Here's another one I just received
Copy code
Operation name: InstanceSensorsQuery

Message: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout

[SQL: SELECT subquery.id, subquery.run_body, subquery.status, subquery.create_timestamp, subquery.update_timestamp, subquery.start_time, subquery.end_time 
FROM (SELECT runs.id AS id, runs.run_body AS run_body, runs.status AS status, runs.create_timestamp AS create_timestamp, runs.update_timestamp AS update_timestamp, runs.start_time AS start_time, runs.end_time AS end_time, rank() OVER (PARTITION BY run_tags.value ORDER BY runs.id DESC) AS rank 
FROM runs JOIN run_tags ON runs.run_id = run_tags.run_id JOIN (SELECT runs.run_id AS run_id 
FROM runs JOIN run_tags ON runs.run_id = run_tags.run_id 
WHERE run_tags.key = %(key_1)s AND run_tags.value = %(value_1)s GROUP BY runs.run_body, runs.id 
HAVING count(runs.run_id) = %(count_1)s) AS filtered_query ON runs.run_id = filtered_query.run_id 
WHERE run_tags.key = %(key_2)s AND run_tags.value IN (%(value_2_1)s, %(value_2_2)s, %(value_2_3)s, %(value_2_4)s, %(value_2_5)s, %(value_2_6)s, %(value_2_7)s, %(value_2_8)s, %(value_2_9)s, %(value_2_10)s, %(value_2_11)s, %(value_2_12)s, %(value_2_13)s, %(value_2_14)s, %(value_2_15)s, %(value_2_16)s, %(value_2_17)s, %(value_2_18)s, %(value_2_19)s, %(value_2_20)s, %(value_2_21)s, %(value_2_22)s, %(value_2_23)s, %(value_2_24)s, %(value_2_25)s, %(value_2_26)s, %(value_2_27)s, %(value_2_28)s, %(value_2_29)s, %(value_2_30)s, %(value_2_31)s, %(value_2_32)s, %(value_2_33)s, %(value_2_34)s, %(value_2_35)s, %(value_2_36)s, %(value_2_37)s, %(value_2_38)s, %(value_2_39)s, %(value_2_40)s, %(value_2_41)s, %(value_2_42)s, %(value_2_43)s, %(value_2_44)s, %(value_2_45)s, %(value_2_46)s, %(value_2_47)s, %(value_2_48)s, %(value_2_49)s, %(value_2_50)s, %(value_2_51)s, %(value_2_52)s, %(value_2_53)s)) AS subquery 
WHERE subquery.rank <= %(rank_1)s ORDER BY subquery.rank ASC]
[parameters: {OFUSCATED}]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)

Path: ["repositoriesOrError","nodes",0,"sensors",0,"sensorState","runs"]

Locations: [{"line":143,"column":3}]
Copy code
Operation name: JobMetadataQuery

Message: (psycopg2.errors.QueryCanceled) canceling statement due to statement timeout

[SQL: SELECT runs.id, runs.run_body, runs.status, runs.create_timestamp, runs.update_timestamp, runs.start_time, runs.end_time 
FROM runs 
WHERE runs.pipeline_name = %(pipeline_name_1)s ORDER BY runs.id DESC 
 LIMIT %(param_1)s]
[parameters: {'pipeline_name_1': '...', 'param_1': 5}]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)

Path: ["pipelineRunsOrError","results"]

Locations: [{"line":12,"column":7}]
Updated to 75000 db-statement-timout on Dagit and still experiencing issues. I'm not able to load the instance/overview page; the instance/{instigator} pages usually timeout too. Running reindexing now - will see if that improves anything. Any other suggestions on what to try?
c

claire

08/10/2022, 8:12 PM
Question--how many runs do you currently have in your table?
c

Chris Evans

08/10/2022, 8:12 PM
c

claire

08/10/2022, 8:24 PM
Is it necessary for you to retain the run history?
If it isn't, you could create a schedule that periodically deletes old runs, which will speed up these queries
but hopefully the reindexing helps
c

Chris Evans

08/10/2022, 8:46 PM
I currently delete run history older than 35 days via a DAG
a

alex

08/10/2022, 8:56 PM
• how much resources does your PG DB have (cpus/mem) ? • manually running an
explain anaylze
on one of the timing out queries would pin point exactly whats causing the problem • how does the dag delete the runs?
instance.delete_run
? • how big is your
run_tags
table?
c

Chris Evans

08/10/2022, 9:06 PM
• Currently using a t3 medium. I don't really see any noticeable strain on the instance with the above queries • Will take a look at this now • Code for deleted runs is below • 302mb
Copy code
@op(
    config_schema={
        "max_age_days": Field(
            int,
            default_value=35,
            description=("The maximum number of days to keep a job."),
        ),
    },
)
def delete_old_runs(context: OpExecutionContext) -> None:
    """Cleans up finished old runs past a certain number of days

    Args:
        context (OpExecutionContext): _description_
    """
    created_before = datetime.datetime.utcnow() - datetime.timedelta(
        context.op_config["max_age_days"]
    )
    finished_runs = context.instance.get_runs(
        filters=PipelineRunsFilter(
            statuses=[
                PipelineRunStatus.SUCCESS,
                PipelineRunStatus.FAILURE,
                PipelineRunStatus.CANCELED,
            ],
            created_before=datetime.datetime(
                created_before.year, created_before.month, created_before.day
            ),
        )
    )

    <http://context.log.info|context.log.info>(
        f"Deleting {len(finished_runs)} old runs created before: {created_before}."
    )

    for run in finished_runs:
        context.instance.delete_run(run.run_id)
ty thankyou 1
a

alex

08/10/2022, 9:09 PM
how many rows is that 302MB run_tags table?
(looking at the source code i see a suspicious lack of deleting run tags in
delete_run
, thats my current hypothesis other than improving the indexes we create)
c

Chris Evans

08/10/2022, 9:10 PM
772,231 rows in run tags
a

alex

08/10/2022, 9:17 PM
ok the deletes are done via cascade which is why i missed them. Will be curious to see the explain analyze output
so it looks like you have 50+ sensors, how many schedules do you have?
c

Chris Evans

08/10/2022, 9:21 PM
Gotcha so the delete_old_runs op above looks good? I have about 25 I believe. Working on the explain analyze
a

alex

08/10/2022, 9:23 PM
ya that looks good
c

Chris Evans

08/10/2022, 9:52 PM
Here is the explain analyze results. Noticing a lot of IO:DataFileRead wait events for this operation. I feel like I need to bump my RDS to something meatier.
Copy code
Sort  (cost=84557.07..84557.09 rows=6 width=1287) (actual time=961808.358..961809.925 rows=49 loops=1)
  Sort Key: subquery.rank
  Sort Method: quicksort  Memory: 123kB
  ->  Subquery Scan on subquery  (cost=84556.41..84557.00 rows=6 width=1287) (actual time=961564.169..961809.775 rows=49 loops=1)
        Filter: (subquery.rank <= '1'::bigint)
        Rows Removed by Filter: 125924
        ->  WindowAgg  (cost=84556.41..84556.77 rows=18 width=1332) (actual time=961564.166..961801.011 rows=125973 loops=1)
              ->  Sort  (cost=84556.41..84556.46 rows=18 width=1324) (actual time=961564.137..961677.291 rows=125973 loops=1)
                    Sort Key: run_tags.value, runs.id DESC
                    Sort Method: external merge  Disk: 164136kB
                    ->  Nested Loop  (cost=65585.48..84556.04 rows=18 width=1324) (actual time=127023.763..960079.329 rows=125973 loops=1)
                          ->  Hash Join  (cost=65585.06..84484.27 rows=18 width=119) (actual time=127023.721..149515.640 rows=125973 loops=1)
                                Hash Cond: ((run_tags.run_id)::text = (filtered_query.run_id)::text)
                                ->  Bitmap Heap Scan on run_tags  (cost=2338.99..21158.37 rows=21240 width=82) (actual time=34919.607..56978.735 rows=125973 loops=1)
                                      Recheck Cond: ((key = 'dagster/sensor_name'::text) AND (value = ANY ('{OBFUSCATED(53)}'::text[])))
                                      Heap Blocks: exact=16144
                                      ->  Bitmap Index Scan on idx_run_tags  (cost=0.00..2333.68 rows=21240 width=0) (actual time=34489.417..34489.417 rows=132410 loops=1)
                                            Index Cond: ((key = 'dagster/sensor_name'::text) AND (value = ANY ('{OBFUSCATED(53)}'::text[])))
                                ->  Hash  (cost=63244.59..63244.59 rows=118 width=37) (actual time=92104.029..92105.586 rows=127766 loops=1)
                                      Buckets: 65536 (originally 1024)  Batches: 4 (originally 1)  Memory Usage: 3585kB
                                      ->  Subquery Scan on filtered_query  (cost=60418.46..63244.59 rows=118 width=37) (actual time=91712.221..92050.859 rows=127766 loops=1)
                                            ->  Finalize GroupAggregate  (cost=60418.46..63243.41 rows=118 width=1276) (actual time=91712.219..92031.601 rows=127766 loops=1)
                                                  Group Key: runs_1.id
                                                  Filter: (count(runs_1.run_id) = '1'::bigint)
                                                  ->  Gather Merge  (cost=60418.46..62851.54 rows=19594 width=1284) (actual time=91712.210..91929.981 rows=127766 loops=1)
                                                        Workers Planned: 2
                                                        Workers Launched: 0
                                                        ->  Partial GroupAggregate  (cost=59418.44..59589.88 rows=9797 width=1284) (actual time=91711.490..91912.673 rows=127766 loops=1)
                                                              Group Key: runs_1.id
                                                              ->  Sort  (cost=59418.44..59442.93 rows=9797 width=1276) (actual time=91711.475..91816.025 rows=127766 loops=1)
                                                                    Sort Key: runs_1.id
                                                                    Sort Method: external merge  Disk: 159912kB
                                                                    ->  Parallel Hash Join  (cost=19853.41..53308.99 rows=9797 width=1276) (actual time=90133.738..90588.433 rows=127766 loops=1)
                                                                          Hash Cond: ((runs_1.run_id)::text = (run_tags_1.run_id)::text)
                                                                          ->  Parallel Seq Scan on runs runs_1  (cost=0.00..33127.30 rows=57530 width=1276) (actual time=0.010..288.472 rows=137976 loops=1)
                                                                          ->  Parallel Hash  (cost=19730.95..19730.95 rows=9797 width=37) (actual time=90133.333..90133.334 rows=127766 loops=1)
                                                                                Buckets: 131072 (originally 32768)  Batches: 1 (originally 1)  Memory Usage: 10816kB
                                                                                ->  Parallel Bitmap Heap Scan on run_tags run_tags_1  (cost=2485.55..19730.95 rows=9797 width=37) (actual time=61986.157..90074.310 rows=127766 loops=1)
                                                                                      Recheck Cond: ((key = '.dagster/repository'::text) AND (value = 'main@bi'::text))
                                                                                      Heap Blocks: exact=16153
                                                                                      ->  Bitmap Index Scan on idx_run_tags  (cost=0.00..2479.67 rows=23512 width=0) (actual time=61981.621..61981.622 rows=134266 loops=1)
                                                                                            Index Cond: ((key = '.dagster/repository'::text) AND (value = 'main@bi'::text))
                          ->  Index Scan using runs_run_id_key on runs  (cost=0.42..3.99 rows=1 width=1316) (actual time=6.431..6.431 rows=1 loops=125973)
                                Index Cond: ((run_id)::text = (run_tags.run_id)::text)
Planning Time: 101.391 ms
Execution Time: 961896.227 ms
a

alex

08/10/2022, 10:04 PM
hmm theres probably some work we can do on the query as well
c

Chris Evans

08/10/2022, 10:29 PM
I see, any other suggestions in the meantime?
If it helps using, I see this operation being called quite a bit and taking a long time to run:
Copy code
SELECT runs.run_body, runs.status 
FROM runs JOIN run_tags ON runs.run_id = run_tags.run_id 
WHERE run_tags.key = $1 AND run_tags.value = $2 OR run_tags.key = $3 AND run_tags.value = $4 GROUP BY runs.run_body, runs.id 
HAVING count(runs.run_id) = $5 ORDER BY runs.id DESC
Upgraded to
db.r6g.large
which is working much better. Still feels like the queries are a bit slow though
a

alex

08/11/2022, 2:20 PM
I believe those queries are the sensor idempotence checks - if you get a chance to send over an explain analyze for one of those that would be helpful
4 Views