From a2516f302b7a4eca62cb6f80bedca0543114a4ba Mon Sep 17 00:00:00 2001
From: EC2 Default User <ec2-user@ip-172-31-26-221.ec2.internal>
Date: Tue, 21 Mar 2023 12:49:56 +0000
Subject: [PATCH 1/1] Correct accumulation of counters for extended query
 protocol

In pg_stat_statements, EState->es_processed cannot reliably
be used to count the rows processed for a statement that
goes through extended query protocol. This is because, such
a statement may go through ExecutorRun more than once, and
es_processed is reset after every call. This fix addresses
this issue by accuumulating the # of calls and row processed
in Instrumentation and the correct totals are available to
ExecutorEnd ( and pgss_store ) before the Instrumentation
is destroyed.
---
 .../pg_stat_statements/pg_stat_statements.c    | 18 ++++++++++++++----
 src/include/executor/instrument.h              |  2 ++
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 5285c3f7fa..821cb4dad7 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -340,7 +340,7 @@ static void pgss_store(const char *query, uint64 queryId,
 					   int query_location, int query_len,
 					   pgssStoreKind kind,
 					   double total_time, uint64 rows,
-					   const BufferUsage *bufusage,
+					   int64 calls, const BufferUsage *bufusage,
 					   const WalUsage *walusage,
 					   const struct JitInstrumentation *jitusage,
 					   JumbleState *jstate);
@@ -857,6 +857,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate)
 				   PGSS_INVALID,
 				   0,
 				   0,
+				   0,
 				   NULL,
 				   NULL,
 				   NULL,
@@ -942,6 +943,7 @@ pgss_planner(Query *parse,
 				   PGSS_PLAN,
 				   INSTR_TIME_GET_MILLISEC(duration),
 				   0,
+				   1,
 				   &bufusage,
 				   &walusage,
 				   NULL,
@@ -1014,6 +1016,12 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
 		exec_nested_level--;
 	}
 	PG_END_TRY();
+
+	if (queryDesc->totaltime)
+	{
+		queryDesc->totaltime->calls++;
+		queryDesc->totaltime->rows_processed += queryDesc->estate->es_processed;
+	}
 }
 
 /*
@@ -1060,7 +1068,8 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
 				   queryDesc->plannedstmt->stmt_len,
 				   PGSS_EXEC,
 				   queryDesc->totaltime->total * 1000.0,	/* convert to msec */
-				   queryDesc->estate->es_processed,
+				   queryDesc->totaltime->rows_processed,
+				   queryDesc->totaltime->calls,
 				   &queryDesc->totaltime->bufusage,
 				   &queryDesc->totaltime->walusage,
 				   queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL,
@@ -1191,6 +1200,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
 				   PGSS_EXEC,
 				   INSTR_TIME_GET_MILLISEC(duration),
 				   rows,
+				   1,
 				   &bufusage,
 				   &walusage,
 				   NULL,
@@ -1225,7 +1235,7 @@ pgss_store(const char *query, uint64 queryId,
 		   int query_location, int query_len,
 		   pgssStoreKind kind,
 		   double total_time, uint64 rows,
-		   const BufferUsage *bufusage,
+		   int64 calls, const BufferUsage *bufusage,
 		   const WalUsage *walusage,
 		   const struct JitInstrumentation *jitusage,
 		   JumbleState *jstate)
@@ -1350,7 +1360,7 @@ pgss_store(const char *query, uint64 queryId,
 		if (IS_STICKY(e->counters))
 			e->counters.usage = USAGE_INIT;
 
-		e->counters.calls[kind] += 1;
+		e->counters.calls[kind] += calls;
 		e->counters.total_time[kind] += total_time;
 
 		if (e->counters.calls[kind] == 1)
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 87e5e2183b..5436564568 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -88,6 +88,8 @@ typedef struct Instrumentation
 	double		nfiltered2;		/* # of tuples removed by "other" quals */
 	BufferUsage bufusage;		/* total buffer usage */
 	WalUsage	walusage;		/* total WAL usage */
+	int64       calls;			/* # of total calls to ExecutorRun */
+	int64       rows_processed;	/* # of total rows processed in ExecutorRun */
 } Instrumentation;
 
 typedef struct WorkerInstrumentation
-- 
2.39.2

