Hi All,
 Logical replication protocol uses single byte character to identify
different chunks of logical repliation messages. The code uses
character literals for the same. These literals are used as bare
constants in code as well. That's true for almost all the code that
deals with wire protocol. With that it becomes difficult to identify
the code which deals with a particular message. For example code that
deals with message type 'B'. In various protocol 'B' has different
meaning and it gets difficult and time consuming to differentiate one
usage from other and find all places which deal with one usage. Here's
a patch simplifying that for top level logical replication messages.

I think I have covered the places that need change. But I might have
missed something, given that these literals are used at several other
places (a problem this patch tries to fix :)).

Initially I had used #define for the same, but Peter E suggested using
Enums so that switch cases can detect any remaining items along with
stronger type checks.

Pavan offleast suggested to create a wrapper
pg_send_logical_rep_message() on top of pg_sendbyte(), similarly for
pg_getmsgbyte(). I wanted to see if this change is acceptable. If so,
I will change that as well. Comments/suggestions welcome.

-- 
Best Wishes,
Ashutosh Bapat
From fa2447ff73cc94b27e3641eda1a3f3d26fd72381 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com>
Date: Fri, 16 Oct 2020 12:31:35 +0530
Subject: [PATCH] Enumize top level logical replication actions

Logical replication protocol uses single byte character to identify
different chunks of logical repliation messages. The code uses string
literals for the same. Enumize those so that
1. All the string literals used can be found at a single place. This
makes it easy to add more actions without the risk of conflicts.
2. It's easy to locate the code handling a given action.

We could create such enums even for the string literals that
differentiate parts of a message, but I have not attempted that in this
commit.

Ashutosh Bapat
---
 src/backend/replication/logical/proto.c  | 26 ++++++------
 src/backend/replication/logical/worker.c | 52 ++++++++++++------------
 src/include/replication/logicalproto.h   | 19 +++++++++
 3 files changed, 58 insertions(+), 39 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index eb19142b48..09bce22d96 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
 void
 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
 {
-	pq_sendbyte(out, 'B');		/* BEGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);		/* BEGIN */
 
 	/* fixed fields */
 	pq_sendint64(out, txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'C');		/* sending COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);		/* sending COMMIT */
 
 	/* send the flags field (unused for now) */
 	pq_sendbyte(out, flags);
@@ -112,7 +112,7 @@ void
 logicalrep_write_origin(StringInfo out, const char *origin,
 						XLogRecPtr origin_lsn)
 {
-	pq_sendbyte(out, 'O');		/* ORIGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);		/* ORIGIN */
 
 	/* fixed fields */
 	pq_sendint64(out, origin_lsn);
@@ -141,7 +141,7 @@ void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'I');		/* action INSERT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);		/* action INSERT */
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'U');		/* action UPDATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);		/* action UPDATE */
 
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
-	pq_sendbyte(out, 'D');		/* action DELETE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);		/* action DELETE */
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
 	int			i;
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'T');		/* action TRUNCATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);		/* action TRUNCATE */
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 {
 	char	   *relname;
 
-	pq_sendbyte(out, 'R');		/* sending RELATION */
+	pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);		/* sending RELATION */
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
 	HeapTuple	tup;
 	Form_pg_type typtup;
 
-	pq_sendbyte(out, 'Y');		/* sending TYPE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);		/* sending TYPE */
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
 logicalrep_write_stream_start(StringInfo out,
 							  TransactionId xid, bool first_segment)
 {
-	pq_sendbyte(out, 'S');		/* action STREAM START */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);		/* action STREAM START */
 
 	Assert(TransactionIdIsValid(xid));
 
@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
 void
 logicalrep_write_stream_stop(StringInfo out)
 {
-	pq_sendbyte(out, 'E');		/* action STREAM END */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);		/* action STREAM END */
 }
 
 /*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'c');		/* action STREAM COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);		/* action STREAM COMMIT */
 
 	Assert(TransactionIdIsValid(txn->xid));
 
@@ -849,7 +849,7 @@ void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 							  TransactionId subxid)
 {
-	pq_sendbyte(out, 'A');		/* action STREAM ABORT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);		/* action STREAM ABORT */
 
 	Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b8e297c5d3..e7e7f8403c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1904,58 +1904,58 @@ apply_dispatch(StringInfo s)
 
 	switch (action)
 	{
-			/* BEGIN */
-		case 'B':
+		case LOGICAL_REP_MSG_BEGIN:
 			apply_handle_begin(s);
 			break;
-			/* COMMIT */
-		case 'C':
+
+		case LOGICAL_REP_MSG_COMMIT:
 			apply_handle_commit(s);
 			break;
-			/* INSERT */
-		case 'I':
+
+		case LOGICAL_REP_MSG_INSERT:
 			apply_handle_insert(s);
 			break;
-			/* UPDATE */
-		case 'U':
+
+		case LOGICAL_REP_MSG_UPDATE:
 			apply_handle_update(s);
 			break;
-			/* DELETE */
-		case 'D':
+
+		case LOGICAL_REP_MSG_DELETE:
 			apply_handle_delete(s);
 			break;
-			/* TRUNCATE */
-		case 'T':
+
+		case LOGICAL_REP_MSG_TRUNCATE:
 			apply_handle_truncate(s);
 			break;
-			/* RELATION */
-		case 'R':
+
+		case LOGICAL_REP_MSG_RELATION:
 			apply_handle_relation(s);
 			break;
-			/* TYPE */
-		case 'Y':
+
+		case LOGICAL_REP_MSG_TYPE:
 			apply_handle_type(s);
 			break;
-			/* ORIGIN */
-		case 'O':
+
+		case LOGICAL_REP_MSG_ORIGIN:
 			apply_handle_origin(s);
 			break;
-			/* STREAM START */
-		case 'S':
+
+		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			break;
-			/* STREAM END */
-		case 'E':
+
+		case LOGICAL_REP_MSG_STREAM_END:
 			apply_handle_stream_stop(s);
 			break;
-			/* STREAM ABORT */
-		case 'A':
+
+		case LOGICAL_REP_MSG_STREAM_ABORT:
 			apply_handle_stream_abort(s);
 			break;
-			/* STREAM COMMIT */
-		case 'c':
+
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
 			apply_handle_stream_commit(s);
 			break;
+
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0c2cda264e..abe864c934 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -33,6 +33,25 @@
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
+/* Logical message types */
+typedef enum
+{
+	LOGICAL_REP_MSG_BEGIN = 'B',
+	LOGICAL_REP_MSG_COMMIT = 'C',
+	LOGICAL_REP_MSG_ORIGIN = 'O',
+	LOGICAL_REP_MSG_INSERT = 'I',
+	LOGICAL_REP_MSG_UPDATE = 'U',
+	LOGICAL_REP_MSG_DELETE = 'D',
+	LOGICAL_REP_MSG_TRUNCATE = 'T',
+	LOGICAL_REP_MSG_RELATION = 'R',
+	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_STREAM_START = 'S',
+	LOGICAL_REP_MSG_STREAM_END = 'E',
+	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+
+} LogicalRepMsgType;
+
 /*
  * This struct stores a tuple received via logical replication.
  * Keep in mind that the columns correspond to the *remote* table.
-- 
2.17.1

Reply via email to