On Wed, 18 Sep 2019 at 12:24, Amit Khandekar <amitdkhan...@gmail.com> wrote:
> Probably, for now at least, what everyone seems to agree is to take my
> earlier attached patch forward.
>
> I am going to see if I can add a TAP test for the patch, and will add
> the patch into the commitfest soon.

Attached is an updated patch v2. Has a new test scenario added in
contrib/test_decoding/sql/spill test, and some minor code cleanup.

Going to add this into Nov commitfest.





--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company
diff --git a/contrib/test_decoding/expected/spill.out b/contrib/test_decoding/expected/spill.out
index 10734bd..e1d150b6 100644
--- a/contrib/test_decoding/expected/spill.out
+++ b/contrib/test_decoding/expected/spill.out
@@ -247,6 +247,25 @@ GROUP BY 1 ORDER BY 1;
  'serialize-nested-subbig-subbigabort-subbig-3 |  5000 | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:5001' | table public.spill_test: INSERT: data[text]:'serialize-nested-subbig-subbigabort-subbig-3:10000'
 (2 rows)
 
+-- large number of spilling subxacts. Should not exceed maxAllocatedDescs
+BEGIN;
+DO $$
+BEGIN
+    FOR i IN 1..600 LOOP
+        BEGIN
+            INSERT INTO spill_test select generate_series(1, 5000) ;
+        EXCEPTION
+            when division_by_zero then perform 'dummy';
+        END;
+    END LOOP;
+END $$;
+COMMIT;
+SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
 DROP TABLE spill_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql
index e638cac..715d1f9 100644
--- a/contrib/test_decoding/sql/spill.sql
+++ b/contrib/test_decoding/sql/spill.sql
@@ -174,6 +174,21 @@ SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(d
 FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
 GROUP BY 1 ORDER BY 1;
 
+-- large number of spilling subxacts. Should not exceed maxAllocatedDescs
+BEGIN;
+DO $$
+BEGIN
+    FOR i IN 1..600 LOOP
+        BEGIN
+            INSERT INTO spill_test select generate_series(1, 5000) ;
+        EXCEPTION
+            when division_by_zero then perform 'dummy';
+        END;
+    END LOOP;
+END $$;
+COMMIT;
+SELECT 1 from pg_logical_slot_get_changes('regression_slot', NULL,NULL) LIMIT 1;
+
 DROP TABLE spill_test;
 
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8ce28ad..1e5a725 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
 	CommandId	combocid;		/* just for debugging */
 } ReorderBufferTupleCidEnt;
 
+/* Virtual file descriptor with file offset tracking */
+typedef struct TXNEntryFile
+{
+	File		vfd;			/* -1 when the file is closed */
+	off_t		curOffset;		/* offset for next write or read. Reset to 0
+								 * when vfd is opened. */
+} TXNEntryFile;
+
 /* k-way in-order change iteration support structures */
 typedef struct ReorderBufferIterTXNEntry
 {
 	XLogRecPtr	lsn;
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
-	int			fd;
+	TXNEntryFile file;
 	XLogSegNo	segno;
 } ReorderBufferIterTXNEntry;
 
@@ -194,7 +202,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										int *fd, XLogSegNo *segno);
+										TXNEntryFile *file, XLogSegNo *segno);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -988,7 +996,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	for (off = 0; off < state->nr_txns; off++)
 	{
-		state->entries[off].fd = -1;
+		state->entries[off].file.vfd = -1;
 		state->entries[off].segno = 0;
 	}
 
@@ -1013,7 +1021,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		{
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
-			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
+			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
 										&state->entries[off].segno);
 		}
 
@@ -1043,7 +1051,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				/* serialize remaining changes */
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
-											&state->entries[off].fd,
+											&state->entries[off].file,
 											&state->entries[off].segno);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1124,7 +1132,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		dlist_delete(&change->node);
 		dlist_push_tail(&state->old_change, &change->node);
 
-		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
+		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
 			/* successfully restored changes from disk */
@@ -1163,8 +1171,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 
 	for (off = 0; off < state->nr_txns; off++)
 	{
-		if (state->entries[off].fd != -1)
-			CloseTransientFile(state->entries[off].fd);
+		if (state->entries[off].file.vfd != -1)
+			FileClose(state->entries[off].file.vfd);
 	}
 
 	/* free memory we might have "leaked" in the last *Next call */
@@ -2517,11 +2525,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							int *fd, XLogSegNo *segno)
+							TXNEntryFile *file, XLogSegNo *segno)
 {
 	Size		restored = 0;
 	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
+	File	   *fd = &file->vfd;
 
 	Assert(txn->first_lsn != InvalidXLogRecPtr);
 	Assert(txn->final_lsn != InvalidXLogRecPtr);
@@ -2562,7 +2571,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
 										*segno);
 
-			*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
+			*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+			/* No harm in resetting the offset even in case of failure */
+			file->curOffset = 0;
+
 			if (*fd < 0 && errno == ENOENT)
 			{
 				*fd = -1;
@@ -2582,23 +2595,26 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * end of this file.
 		 */
 		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-		readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_end();
+		readBytes = FileRead(file->vfd, rb->outbuf,
+							 sizeof(ReorderBufferDiskChange),
+							 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
 
 		/* eof */
 		if (readBytes == 0)
 		{
-			CloseTransientFile(*fd);
+			FileClose(*fd);
 			*fd = -1;
 			(*segno)++;
 			continue;
 		}
-		else if (readBytes < 0)
+		if (readBytes < 0)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from reorderbuffer spill file: %m")));
-		else if (readBytes != sizeof(ReorderBufferDiskChange))
+
+		file->curOffset += readBytes;
+
+		if (readBytes != sizeof(ReorderBufferDiskChange))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
@@ -2611,10 +2627,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									  sizeof(ReorderBufferDiskChange) + ondisk->size);
 		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
 
-		pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-		readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
-						 ondisk->size - sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_end();
+		readBytes = FileRead(file->vfd,
+							 rb->outbuf + sizeof(ReorderBufferDiskChange),
+							 ondisk->size - sizeof(ReorderBufferDiskChange),
+							 file->curOffset,
+							 WAIT_EVENT_REORDER_BUFFER_READ);
 
 		if (readBytes < 0)
 			ereport(ERROR,
@@ -2627,6 +2644,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 							readBytes,
 							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
 
+		file->curOffset += readBytes;
+
 		/*
 		 * ok, read a full change from disk, now restore it into proper
 		 * in-memory format

Reply via email to