On Fri, 23 Oct 2020 at 17:02, Kyotaro Horiguchi <horikyota....@gmail.com>
wrote:

> At Fri, 23 Oct 2020 19:53:00 +1100, Peter Smith <smithpb2...@gmail.com>
> wrote in
> > On Fri, Oct 23, 2020 at 5:20 PM Kyotaro Horiguchi
> > <horikyota....@gmail.com> wrote:
> > >
> > > At Thu, 22 Oct 2020 22:31:41 -0300, Alvaro Herrera <
> alvhe...@alvh.no-ip.org> wrote in
> > > > On 2020-Oct-22, Ashutosh Bapat wrote:
> > > >
> > > > > On Thu, 22 Oct 2020 at 14:46, Kyotaro Horiguchi <
> horikyota....@gmail.com>
> > > > > wrote:
> > > >
> > > > > > pg_send_logicalrep_msg_type() looks somewhat too-much.  If we
> need
> > > > > > something like that we shouldn't do this refactoring, I think.
> > > > >
> > > > > Enum is an integer, and we want to send byte. The function asserts
> that the
> > > > > enum fits a byte. If there's a way to declare byte long enums I
> would use
> > > > > that. But I didn't find a way to do that.
> >
> > The pq_send_logicalrep_msg_type() function seemed a bit overkill to me.
>
> Ah, yes, it is what I meant. I didn't come up with the word "overkill".
>
> > The comment in the LogicalRepMsgType enum will sufficiently ensure
> > nobody is going to accidentally add any bad replication message codes.
> > And it's not like these are going to be changed often.
>
> Agreed.
>
> > Why not simply downcast your enums when calling pq_sendbyte?
> > There are only a few of them.
> >
> > e.g. pq_sendbyte(out, (uint8)LOGICAL_REP_MSG_STREAM_COMMIT);
>
> If you are worried about compiler warning, that explicit cast is not
> required. Even if the symbol is larger than 0xff, the upper bytes are
> silently truncated off.
>
>
I agree with Peter that the prologue of  LogicalRepMsgType is enough.

I also agree with Kyotaro, that explicit cast is unnecessary.

All this together makes the second patch useless. Removed it. Instead used
Kyotaro's idea in previous mail.

PFA updated patch.

-- 
Best Wishes,
Ashutosh
From 76f578416503478bf5e39993eec4bbd0f17d5a17 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com>
Date: Fri, 16 Oct 2020 12:31:35 +0530
Subject: [PATCH] Enumize top level logical replication actions

Logical replication protocol uses single byte character to identify
different chunks of logical repliation messages. The code uses string
literals for the same. Enumize those so that
1. All the string literals used can be found at a single place. This
makes it easy to add more actions without the risk of conflicts.
2. It's easy to locate the code handling a given action.

Ashutosh Bapat
---
 src/backend/replication/logical/proto.c  | 26 +++----
 src/backend/replication/logical/worker.c | 87 ++++++++++++------------
 src/include/replication/logicalproto.h   | 25 +++++++
 3 files changed, 81 insertions(+), 57 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index eb19142b48..fdb31182d7 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
 void
 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
 {
-	pq_sendbyte(out, 'B');		/* BEGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
 
 	/* fixed fields */
 	pq_sendint64(out, txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'C');		/* sending COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
 
 	/* send the flags field (unused for now) */
 	pq_sendbyte(out, flags);
@@ -112,7 +112,7 @@ void
 logicalrep_write_origin(StringInfo out, const char *origin,
 						XLogRecPtr origin_lsn)
 {
-	pq_sendbyte(out, 'O');		/* ORIGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
 
 	/* fixed fields */
 	pq_sendint64(out, origin_lsn);
@@ -141,7 +141,7 @@ void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'I');		/* action INSERT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'U');		/* action UPDATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
-	pq_sendbyte(out, 'D');		/* action DELETE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
 	int			i;
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'T');		/* action TRUNCATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 {
 	char	   *relname;
 
-	pq_sendbyte(out, 'R');		/* sending RELATION */
+	pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
 	HeapTuple	tup;
 	Form_pg_type typtup;
 
-	pq_sendbyte(out, 'Y');		/* sending TYPE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
 logicalrep_write_stream_start(StringInfo out,
 							  TransactionId xid, bool first_segment)
 {
-	pq_sendbyte(out, 'S');		/* action STREAM START */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
 
 	Assert(TransactionIdIsValid(xid));
 
@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
 void
 logicalrep_write_stream_stop(StringInfo out)
 {
-	pq_sendbyte(out, 'E');		/* action STREAM END */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
 }
 
 /*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'c');		/* action STREAM COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
 
 	Assert(TransactionIdIsValid(txn->xid));
 
@@ -849,7 +849,7 @@ void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 							  TransactionId subxid)
 {
-	pq_sendbyte(out, 'A');		/* action STREAM ABORT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
 
 	Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3a5b733ee3..1e3a3e63ac 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1897,67 +1897,66 @@ apply_handle_truncate(StringInfo s)
 static void
 apply_dispatch(StringInfo s)
 {
-	char		action = pq_getmsgbyte(s);
+	LogicalRepMsgType action = pq_getmsgbyte(s);
 
 	switch (action)
 	{
-			/* BEGIN */
-		case 'B':
+		case LOGICAL_REP_MSG_BEGIN:
 			apply_handle_begin(s);
-			break;
-			/* COMMIT */
-		case 'C':
+			return;
+
+		case LOGICAL_REP_MSG_COMMIT:
 			apply_handle_commit(s);
-			break;
-			/* INSERT */
-		case 'I':
+			return;
+
+		case LOGICAL_REP_MSG_INSERT:
 			apply_handle_insert(s);
-			break;
-			/* UPDATE */
-		case 'U':
+			return;
+
+		case LOGICAL_REP_MSG_UPDATE:
 			apply_handle_update(s);
-			break;
-			/* DELETE */
-		case 'D':
+			return;
+
+		case LOGICAL_REP_MSG_DELETE:
 			apply_handle_delete(s);
-			break;
-			/* TRUNCATE */
-		case 'T':
+			return;
+
+		case LOGICAL_REP_MSG_TRUNCATE:
 			apply_handle_truncate(s);
-			break;
-			/* RELATION */
-		case 'R':
+			return;
+
+		case LOGICAL_REP_MSG_RELATION:
 			apply_handle_relation(s);
-			break;
-			/* TYPE */
-		case 'Y':
+			return;
+
+		case LOGICAL_REP_MSG_TYPE:
 			apply_handle_type(s);
-			break;
-			/* ORIGIN */
-		case 'O':
+			return;
+
+		case LOGICAL_REP_MSG_ORIGIN:
 			apply_handle_origin(s);
-			break;
-			/* STREAM START */
-		case 'S':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
-			break;
-			/* STREAM END */
-		case 'E':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_END:
 			apply_handle_stream_stop(s);
-			break;
-			/* STREAM ABORT */
-		case 'A':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_ABORT:
 			apply_handle_stream_abort(s);
-			break;
-			/* STREAM COMMIT */
-		case 'c':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
 			apply_handle_stream_commit(s);
-			break;
-		default:
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("invalid logical replication message type \"%c\"", action)));
+			return;
 	}
+
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid logical replication message type \"%c\"", action)));
 }
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0c2cda264e..15ee2304c8 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -33,6 +33,31 @@
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
+/*
+ * Logical message types
+ *
+ * Used by logical replication wire protocol.
+ *
+ * Note: though this is an enum it should fit a single byte and should be a
+ * printable character.
+ */
+typedef enum
+{
+	LOGICAL_REP_MSG_BEGIN = 'B',
+	LOGICAL_REP_MSG_COMMIT = 'C',
+	LOGICAL_REP_MSG_ORIGIN = 'O',
+	LOGICAL_REP_MSG_INSERT = 'I',
+	LOGICAL_REP_MSG_UPDATE = 'U',
+	LOGICAL_REP_MSG_DELETE = 'D',
+	LOGICAL_REP_MSG_TRUNCATE = 'T',
+	LOGICAL_REP_MSG_RELATION = 'R',
+	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_STREAM_START = 'S',
+	LOGICAL_REP_MSG_STREAM_END = 'E',
+	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+} LogicalRepMsgType;
+
 /*
  * This struct stores a tuple received via logical replication.
  * Keep in mind that the columns correspond to the *remote* table.
-- 
2.17.1

Reply via email to