Amit Kapila <amit.kapil...@gmail.com> writes:
> On Wed, Jun 9, 2021 at 8:44 PM Mark Dilger <mark.dil...@enterprisedb.com> 
> wrote:
>> On Jun 9, 2021, at 7:52 AM, Tom Lane <t...@sss.pgh.pa.us> wrote:
>>> Somewhat unrelated, but ... am I reading the code correctly that
>>> apply_handle_stream_start and related routines are using Asserts
>>> to check that the remote sent stream-control messages in the correct
>>> order?

> This also needs to be changed to test-and-elog.

Here's a proposed patch for this.  It looks like pretty much all of the
bogosity is new with the streaming business.  You might quibble with
which things I thought deserved elog versus ereport.  Another thing
I'm wondering is how many of these messages really need to be
translated.  We could use errmsg_internal and avoid burdening the
translators, perhaps.

                        regards, tom lane

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 98c26002e8..26738d3589 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -177,7 +177,7 @@ bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
 /* fields valid only when processing streamed transaction */
-bool		in_streamed_transaction = false;
+static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
@@ -345,7 +345,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	 */
 	xid = pq_getmsgint(s, 4);
 
-	Assert(TransactionIdIsValid(xid));
+	if (!TransactionIdIsValid(xid))
+		elog(ERROR, "invalid transaction ID in streamed replication transaction");
 
 	/* Add the new subxact to the array (unless already there). */
 	subxact_info_add(xid);
@@ -785,7 +786,10 @@ apply_handle_commit(StringInfo s)
 
 	logicalrep_read_commit(s, &commit_data);
 
-	Assert(commit_data.commit_lsn == remote_final_lsn);
+	if (commit_data.commit_lsn != remote_final_lsn)
+		elog(ERROR, "incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+			 LSN_FORMAT_ARGS(commit_data.commit_lsn),
+			 LSN_FORMAT_ARGS(remote_final_lsn));
 
 	apply_handle_commit_internal(s, &commit_data);
 
@@ -824,7 +828,10 @@ apply_handle_stream_start(StringInfo s)
 	bool		first_segment;
 	HASHCTL		hash_ctl;
 
-	Assert(!in_streamed_transaction);
+	if (in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("duplicate STREAM START message")));
 
 	/*
 	 * Start a transaction on stream start, this transaction will be committed
@@ -873,7 +880,10 @@ apply_handle_stream_start(StringInfo s)
 static void
 apply_handle_stream_stop(StringInfo s)
 {
-	Assert(in_streamed_transaction);
+	if (!in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("STREAM STOP message without STREAM START")));
 
 	/*
 	 * Close the file with serialized changes, and serialize information about
@@ -905,7 +915,10 @@ apply_handle_stream_abort(StringInfo s)
 	TransactionId xid;
 	TransactionId subxid;
 
-	Assert(!in_streamed_transaction);
+	if (in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("STREAM ABORT message without STREAM STOP")));
 
 	logicalrep_read_stream_abort(s, &xid, &subxid);
 
@@ -967,13 +980,12 @@ apply_handle_stream_abort(StringInfo s)
 			return;
 		}
 
-		Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
 		ent = (StreamXidHash *) hash_search(xidhash,
 											(void *) &xid,
 											HASH_FIND,
 											&found);
-		Assert(found);
+		if (!found)
+			elog(ERROR, "transaction %u not found in hash table", xid);
 
 		/* open the changes file */
 		changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1012,7 +1024,10 @@ apply_handle_stream_commit(StringInfo s)
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
-	Assert(!in_streamed_transaction);
+	if (in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
 
@@ -1031,11 +1046,14 @@ apply_handle_stream_commit(StringInfo s)
 	/* open the spool file for the committed transaction */
 	changes_filename(path, MyLogicalRepWorker->subid, xid);
 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
 	ent = (StreamXidHash *) hash_search(xidhash,
 										(void *) &xid,
 										HASH_FIND,
 										&found);
-	Assert(found);
+	if (!found)
+		elog(ERROR, "transaction %u not found in hash table", xid);
+
 	fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
 
 	buffer = palloc(BLCKSZ);
@@ -1080,7 +1098,9 @@ apply_handle_stream_commit(StringInfo s)
 					 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
 							path)));
 
-		Assert(len > 0);
+		if (len <= 0)
+			elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+				 len, path);
 
 		/* make sure we have sufficiently large buffer */
 		buffer = repalloc(buffer, len);
@@ -1108,7 +1128,7 @@ apply_handle_stream_commit(StringInfo s)
 		nchanges++;
 
 		if (nchanges % 1000 == 0)
-			elog(DEBUG1, "replayed %d changes from file '%s'",
+			elog(DEBUG1, "replayed %d changes from file \"%s\"",
 				 nchanges, path);
 	}
 

Reply via email to