Hi All, Logical replication protocol uses single byte character to identify different chunks of logical repliation messages. The code uses character literals for the same. These literals are used as bare constants in code as well. That's true for almost all the code that deals with wire protocol. With that it becomes difficult to identify the code which deals with a particular message. For example code that deals with message type 'B'. In various protocol 'B' has different meaning and it gets difficult and time consuming to differentiate one usage from other and find all places which deal with one usage. Here's a patch simplifying that for top level logical replication messages.
I think I have covered the places that need change. But I might have missed something, given that these literals are used at several other places (a problem this patch tries to fix :)). Initially I had used #define for the same, but Peter E suggested using Enums so that switch cases can detect any remaining items along with stronger type checks. Pavan offleast suggested to create a wrapper pg_send_logical_rep_message() on top of pg_sendbyte(), similarly for pg_getmsgbyte(). I wanted to see if this change is acceptable. If so, I will change that as well. Comments/suggestions welcome. -- Best Wishes, Ashutosh Bapat
From fa2447ff73cc94b27e3641eda1a3f3d26fd72381 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com> Date: Fri, 16 Oct 2020 12:31:35 +0530 Subject: [PATCH] Enumize top level logical replication actions Logical replication protocol uses single byte character to identify different chunks of logical repliation messages. The code uses string literals for the same. Enumize those so that 1. All the string literals used can be found at a single place. This makes it easy to add more actions without the risk of conflicts. 2. It's easy to locate the code handling a given action. We could create such enums even for the string literals that differentiate parts of a message, but I have not attempted that in this commit. Ashutosh Bapat --- src/backend/replication/logical/proto.c | 26 ++++++------ src/backend/replication/logical/worker.c | 52 ++++++++++++------------ src/include/replication/logicalproto.h | 19 +++++++++ 3 files changed, 58 insertions(+), 39 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142b48..09bce22d96 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in); void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) { - pq_sendbyte(out, 'B'); /* BEGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); /* BEGIN */ /* fixed fields */ pq_sendint64(out, txn->final_lsn); @@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'C'); /* sending COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); /* sending COMMIT */ /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -112,7 +112,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn) { - pq_sendbyte(out, 'O'); /* ORIGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); /* ORIGIN */ /* fixed fields */ pq_sendint64(out, origin_lsn); @@ -141,7 +141,7 @@ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'I'); /* action INSERT */ + pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); /* action INSERT */ /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -185,7 +185,7 @@ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'U'); /* action UPDATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); /* action UPDATE */ Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ + pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); /* action DELETE */ /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out, int i; uint8 flags = 0; - pq_sendbyte(out, 'T'); /* action TRUNCATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); /* action TRUNCATE */ /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; - pq_sendbyte(out, 'R'); /* sending RELATION */ + pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); /* sending RELATION */ /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) HeapTuple tup; Form_pg_type typtup; - pq_sendbyte(out, 'Y'); /* sending TYPE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); /* sending TYPE */ /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -755,7 +755,7 @@ void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment) { - pq_sendbyte(out, 'S'); /* action STREAM START */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); /* action STREAM START */ Assert(TransactionIdIsValid(xid)); @@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) void logicalrep_write_stream_stop(StringInfo out) { - pq_sendbyte(out, 'E'); /* action STREAM END */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); /* action STREAM END */ } /* @@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); /* action STREAM COMMIT */ Assert(TransactionIdIsValid(txn->xid)); @@ -849,7 +849,7 @@ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid) { - pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); /* action STREAM ABORT */ Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b8e297c5d3..e7e7f8403c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1904,58 +1904,58 @@ apply_dispatch(StringInfo s) switch (action) { - /* BEGIN */ - case 'B': + case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); break; - /* COMMIT */ - case 'C': + + case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); break; - /* INSERT */ - case 'I': + + case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); break; - /* UPDATE */ - case 'U': + + case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); break; - /* DELETE */ - case 'D': + + case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); break; - /* TRUNCATE */ - case 'T': + + case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); break; - /* RELATION */ - case 'R': + + case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); break; - /* TYPE */ - case 'Y': + + case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); break; - /* ORIGIN */ - case 'O': + + case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); break; - /* STREAM START */ - case 'S': + + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; - /* STREAM END */ - case 'E': + + case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); break; - /* STREAM ABORT */ - case 'A': + + case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); break; - /* STREAM COMMIT */ - case 'c': + + case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0c2cda264e..abe864c934 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -33,6 +33,25 @@ #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +/* Logical message types */ +typedef enum +{ + LOGICAL_REP_MSG_BEGIN = 'B', + LOGICAL_REP_MSG_COMMIT = 'C', + LOGICAL_REP_MSG_ORIGIN = 'O', + LOGICAL_REP_MSG_INSERT = 'I', + LOGICAL_REP_MSG_UPDATE = 'U', + LOGICAL_REP_MSG_DELETE = 'D', + LOGICAL_REP_MSG_TRUNCATE = 'T', + LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_STREAM_START = 'S', + LOGICAL_REP_MSG_STREAM_END = 'E', + LOGICAL_REP_MSG_STREAM_COMMIT = 'c', + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + +} LogicalRepMsgType; + /* * This struct stores a tuple received via logical replication. * Keep in mind that the columns correspond to the *remote* table. -- 2.17.1