From 3810ff2dbd58eb812b95eb6c801fa23a010e82bc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH v2] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 224 ++++++--------------
 1 file changed, 68 insertions(+), 156 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 3a2d2e357e..baccca30e8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1448,187 +1448,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
-
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
-			break;
-		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+		}
+	}
 
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		/* Convert tuple if needed. */
+		if (relentry->attrmap)
+		{
+			TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+													  &TTSOpsVirtual);
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+		}
+	}
 
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Send BEGIN if we haven't yet.
+	 *
+	 * We send the BEGIN message after ensuring that we will actually send the
+	 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+	 * transactions.
+	 */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
 
-			OutputPluginPrepareWrite(ctx, true);
+	/*
+	 * Schema should be sent using the original relation because it also sends
+	 * the ancestor's relation.
+	 */
+	maybe_send_schema(ctx, change, relation, relentry);
 
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
+	OutputPluginPrepareWrite(ctx, true);
 
-			OutputPluginWrite(ctx, true);
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+									data->binary, relentry->columns);
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
-			else
-				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+									data->binary, relentry->columns);
 			break;
 		default:
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
 		ancestor = NULL;
 	}
 
-	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 }
-- 
2.30.0.windows.2

