On 14/07/2025 4:20 am, Zhijie Hou (Fujitsu) wrote:
Thank you for the proposal ! I find it to be a very interesting feature。

I tested the patch you shared in your original email and encountered potential
deadlocks when testing pgbench TPC-B like workload. Could you please provide an
updated patch version so that I can conduct further performance experiments ?

Sorry, it was fixed in my repo: https://github.com/knizhnik/postgres/pull/3
Updated patch is attached.


Additionally, I was also exploring ways to improve performance and have tried an
alternative version of prefetch for experimentation. The alternative design is
that we assigns each non-streaming transaction to a parallel apply worker, while
strictly maintaining the order of commits. During parallel apply, if the
transactions that need to be committed before the current transaction are not
yet finished, the worker performs pre-fetch operations. Specifically, for
updates and deletes, the worker finds and caches the target local tuple to be
updated/deleted. Once all preceding transactions are committed, the parallel
apply worker uses these cached tuples to execute the actual updates or deletes.
What do you think about this alternative ? I think the alternative might offer
more stability in scenarios where shared buffer elimination occurs frequently
and avoids leaving dead tuples in the buffer. However, it also presents some
drawbacks, such as the need to add wait events to maintain commit order,
compared to the approach discussed in this thread.

So as far as I understand your PoC is doing the same as approach 1 in my proposal - prefetch of replica identity, but it is done not by parallel prefetch workers, but normal parallel apply workers when they have to wait until previous transaction is committed.  I consider it to be more complex but may be more efficient than my approach.

The obvious drawback of both your's and my approaches is that it prefetch only pages of primary index (replica identity). If there are some other indexes which keys are changed by update, then pages of such indexes will be read from the disk when you apply update. The same is also true for insert (in this case you always has to include new tuple in all indexes) - this is why I have also implemented another approach: apply operation in prefetch worker and then rollback transaction.

Also I do not quite understand how you handle invalidations? Assume that we have two transactions - T1 and T2:

T1: ... W1 Commit

T2: ...                     W1


So T1 writes tuple 1 and then commits transaction. Then T2 updates tuple 1.

If I correctly understand your approach, parallel apply worker for T2 will try to prefetch tuple 1 before T1 is committed. But in this case it will get old version of the tuple. It is not a problem if parallel apply worker will repeat lookup and not just use cached tuple.

One more moment. As far as you assigns each non-streaming transaction to a parallel apply worker, number of such transactions is limited by assigns each non-streaming transaction to a parallel apply worker,umber of background workers. Usually it is not so large (~10). So if there were 100 parallel transactions and publishers, then at subscriber you still be able to executed concurrently not more than few of them. In this sense my approach with separate prefetch workers is more flexible: each prefetch worker can prefetch as many operations as it can.
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index 53ddd25c42d..ff7c4ed684d 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, 
Relation idxrel,
  * invoking table_tuple_lock.
  */
 static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode 
lockmode)
 {
        bool            refetch = false;
 
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
                        break;
                case TM_Updated:
                        /* XXX: Improve handling here */
-                       if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
-                               ereport(LOG,
-                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("tuple to be locked was 
already moved to another partition due to concurrent update, retrying")));
-                       else
-                               ereport(LOG,
-                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("concurrent update, 
retrying")));
-                       refetch = true;
+                       if (lockmode != LockTupleTryExclusive)
+                       {
+                               if 
(ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+                                       ereport(LOG,
+                                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("tuple to be 
locked was already moved to another partition due to concurrent update, 
retrying")));
+                               else
+                                       ereport(LOG,
+                                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("concurrent 
update, retrying")));
+                               refetch = true;
+                       }
                        break;
                case TM_Deleted:
-                       /* XXX: Improve handling here */
-                       ereport(LOG,
-                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                        errmsg("concurrent delete, 
retrying")));
-                       refetch = true;
+                       if (lockmode != LockTupleTryExclusive)
+                       {
+                               /* XXX: Improve handling here */
+                               ereport(LOG,
+                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                errmsg("concurrent delete, 
retrying")));
+                               refetch = true;
+                       }
                        break;
                case TM_Invisible:
                        elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
                 */
                if (TransactionIdIsValid(xwait))
                {
-                       XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-                       goto retry;
+                       if (lockmode == LockTupleTryExclusive)
+                       {
+                               found = false;
+                               break;
+                       }
+                       else if (lockmode != LockTupleNoLock)
+                       {
+                               XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+                               goto retry;
+                       }
                }
 
                /* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
        }
 
        /* Found tuple, try to lock it in the lockmode. */
-       if (found)
+       if (found && lockmode != LockTupleNoLock)
        {
                TM_FailureData tmfd;
                TM_Result       res;
@@ -256,14 +270,14 @@ retry:
                res = table_tuple_lock(rel, &(outslot->tts_tid), 
GetActiveSnapshot(),
                                                           outslot,
                                                           
GetCurrentCommandId(false),
-                                                          lockmode,
+                                                          lockmode == 
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
                                                           LockWaitBlock,
                                                           0 /* don't follow 
updates */ ,
                                                           &tmfd);
 
                PopActiveSnapshot();
 
-               if (should_refetch_tuple(res, &tmfd))
+               if (should_refetch_tuple(res, &tmfd, lockmode))
                        goto retry;
        }
 
@@ -395,16 +409,23 @@ retry:
                 */
                if (TransactionIdIsValid(xwait))
                {
-                       XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-                       goto retry;
+                       if (lockmode == LockTupleTryExclusive)
+                       {
+                               found = false;
+                               break;
+                       }
+                       else if (lockmode != LockTupleNoLock)
+                       {
+                               XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+                               goto retry;
+                       }
                }
-
                /* Found our tuple and it's not locked */
                break;
        }
 
        /* Found tuple, try to lock it in the lockmode. */
-       if (found)
+       if (found && lockmode != LockTupleNoLock)
        {
                TM_FailureData tmfd;
                TM_Result       res;
@@ -414,14 +435,14 @@ retry:
                res = table_tuple_lock(rel, &(outslot->tts_tid), 
GetActiveSnapshot(),
                                                           outslot,
                                                           
GetCurrentCommandId(false),
-                                                          lockmode,
+                                                          lockmode == 
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
                                                           LockWaitBlock,
                                                           0 /* don't follow 
updates */ ,
                                                           &tmfd);
 
                PopActiveSnapshot();
 
-               if (should_refetch_tuple(res, &tmfd))
+               if (should_refetch_tuple(res, &tmfd, lockmode))
                        goto retry;
        }
 
@@ -508,7 +529,7 @@ retry:
 
        PopActiveSnapshot();
 
-       if (should_refetch_tuple(res, &tmfd))
+       if (should_refetch_tuple(res, &tmfd, LockTupleShare))
                goto retry;
 
        return true;
@@ -560,7 +581,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState 
*estate,
  */
 void
 ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
-                                                EState *estate, TupleTableSlot 
*slot)
+                                                EState *estate, TupleTableSlot 
*slot, bool prefetch)
 {
        bool            skip_tuple = false;
        Relation        rel = resultRelInfo->ri_RelationDesc;
@@ -604,7 +625,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
                if (resultRelInfo->ri_NumIndices > 0)
                        recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
                                                                                
                   slot, estate, false,
-                                                                               
                   conflictindexes ? true : false,
+                                                                               
                   conflictindexes || prefetch ? true : false,
                                                                                
                   &conflict,
                                                                                
                   conflictindexes, false);
 
@@ -623,7 +644,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
                 * be a frequent thing so we preferred to save the performance
                 * overhead of extra scan before each insertion.
                 */
-               if (conflict)
+               if (conflict && !prefetch)
                        CheckAndReportConflict(resultRelInfo, estate, 
CT_INSERT_EXISTS,
                                                                   
recheckIndexes, NULL, slot);
 
@@ -650,7 +671,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 void
 ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
                                                 EState *estate, EPQState 
*epqstate,
-                                                TupleTableSlot *searchslot, 
TupleTableSlot *slot)
+                                                TupleTableSlot *searchslot, 
TupleTableSlot *slot, bool prefetch)
 {
        bool            skip_tuple = false;
        Relation        rel = resultRelInfo->ri_RelationDesc;
@@ -701,7 +722,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
                if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != 
TU_None))
                        recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
                                                                                
                   slot, estate, true,
-                                                                               
                   conflictindexes ? true : false,
+                                                                               
                   conflictindexes || prefetch ? true : false,
                                                                                
                   &conflict, conflictindexes,
                                                                                
                   (update_indexes == TU_Summarizing));
 
@@ -710,7 +731,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
                 * ExecSimpleRelationInsert to understand why this check is 
done at
                 * this point.
                 */
-               if (conflict)
+               if (conflict && !prefetch)
                        CheckAndReportConflict(resultRelInfo, estate, 
CT_UPDATE_EXISTS,
                                                                   
recheckIndexes, searchslot, slot);
 
diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
  */
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
 pa_launch_parallel_worker(void)
 {
        MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
        }
 }
 
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+       if (MyParallelShared->do_prefetch)
+       {
+               PG_TRY();
+               {
+                       apply_dispatch(s);
+               }
+               PG_CATCH();
+               {
+                       HOLD_INTERRUPTS();
+
+                       elog(DEBUG1, "Failed to prefetch LR operation");
+
+                       /* TODO: should we somehow dump the error or just 
silently ignore it? */
+                       /* EmitErrorReport(); */
+                       FlushErrorState();
+
+                       RESUME_INTERRUPTS();
+
+                       lr_prefetch_errors += 1;
+               }
+               PG_END_TRY();
+               if (!prefetch_replica_identity_only)
+               {
+                       /* We need to abort transaction to undo insert */
+                       AbortCurrentTransaction();
+               }
+       }
+       else
+       {
+               apply_dispatch(s);
+       }
+}
+
 /* Parallel apply worker main loop. */
 static void
 LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
                         */
                        s.cursor += SIZE_STATS_MESSAGE;
 
-                       apply_dispatch(&s);
+                       pa_apply_dispatch(&s);
                }
                else if (shmq_res == SHM_MQ_WOULD_BLOCK)
                {
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
 
        InitializingApplyWorker = false;
 
-       /* Setup replication origin tracking. */
-       StartTransactionCommand();
-       ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+       if (!MyParallelShared->do_prefetch)
+       {
+               /* Setup replication origin tracking. */
+               StartTransactionCommand();
+               ReplicationOriginNameForLogicalRep(MySubscription->oid, 
InvalidOid,
                                                                           
originname, sizeof(originname));
-       originid = replorigin_by_name(originname, false);
-
-       /*
-        * The parallel apply worker doesn't need to monopolize this replication
-        * origin which was already acquired by its leader process.
-        */
-       replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-       replorigin_session_origin = originid;
-       CommitTransactionCommand();
+               originid = replorigin_by_name(originname, false);
 
+               /*
+                * The parallel apply worker doesn't need to monopolize this 
replication
+                * origin which was already acquired by its leader process.
+                */
+               replorigin_session_setup(originid, 
MyLogicalRepWorker->leader_pid);
+               replorigin_session_origin = originid;
+               CommitTransactionCommand();
+       }
+       else
+       {
+               /* Do not write WAL for prefetch */
+               wal_level = WAL_LEVEL_MINIMAL;
+       }
        /*
         * Setup callback for syscache so that we know when something changes in
         * the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size 
nbytes, const void *data)
        shm_mq_result result;
        TimestampTz startTime = 0;
 
-       Assert(!IsTransactionState());
-       Assert(!winfo->serialize_changes);
+       if (!winfo->shared->do_prefetch)
+       {
+               Assert(!IsTransactionState());
+               Assert(!winfo->serialize_changes);
+       }
 
        /*
         * We don't try to send data to parallel worker for 'immediate' mode. 
This
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int                    max_logical_replication_workers = 4;
 int                    max_sync_workers_per_subscription = 2;
 int                    max_parallel_apply_workers_per_subscription = 2;
+int                    max_parallel_prefetch_workers_per_subscription = 2;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index fd11805a44c..8f1bec5c529 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool           InitializingApplyWorker = false;
 
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the 
subskiplsn.
@@ -329,6 +341,11 @@ bool               InitializingApplyWorker = false;
 static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 #define is_skipping_changes() 
(unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching()       (am_parallel_apply_worker() && 
MyParallelShared->do_prefetch)
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, 
StringInfo s)
        TransApplyAction apply_action;
        StringInfoData original_msg;
 
+       if (is_prefetching())
+       {
+               return false;
+       }
+
        apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
        /* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
                                                RelationGetRelationName(rel))));
 }
 
+#define SAFE_APPLY(call)                                                       
                \
+       if (is_prefetching())                                                   
                \
+       {                                                                       
                                        \
+               PG_TRY();                                                       
                                \
+               {                                                               
                                        \
+                       call;                                                   
                                \
+               }                                                               
                                        \
+               PG_CATCH();                                                     
                                \
+               {                                                               
                                        \
+                       HOLD_INTERRUPTS();                                      
                        \
+                       elog(DEBUG1, "Failed to prefetch LR operation");\
+                       FlushErrorState();                                      
                        \
+                       RESUME_INTERRUPTS();                                    
                \
+                       lr_prefetch_errors += 1;                                
                \
+               }                                                               
                                        \
+               PG_END_TRY();                                                   
                        \
+       } else {                                                                
                                \
+               call;                                                           
                                \
+       }
+
+
 /*
  * Handle INSERT message.
  */
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
                ResultRelInfo *relinfo = edata->targetRelInfo;
 
                ExecOpenIndices(relinfo, false);
-               apply_handle_insert_internal(edata, relinfo, remoteslot);
+               SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, 
remoteslot));
                ExecCloseIndices(relinfo);
        }
 
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
                   !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
                   RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
 
-       /* Caller will not have done this bit. */
-       Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
-       InitConflictIndexes(relinfo);
+       if (is_prefetching() && prefetch_replica_identity_only)
+       {
+               TupleTableSlot *localslot = NULL;
+               LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+               Relation                localrel = relinfo->ri_RelationDesc;
+               EPQState                epqstate;
 
-       /* Do the insert. */
-       TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
-       ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+               EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+               (void)FindReplTupleInLocalRel(edata, localrel,
+                                                                         
&relmapentry->remoterel,
+                                                                         
relmapentry->localindexoid,
+                                                                         
remoteslot, &localslot);
+       }
+       else
+       {
+               /* Caller will not have done this bit. */
+               Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+               InitConflictIndexes(relinfo);
+
+               /* Do the insert. */
+               TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+               ExecSimpleRelationInsert(relinfo, estate, remoteslot, 
is_prefetching());
+       }
+       if (is_prefetching())
+       {
+               lr_prefetch_inserts += 1;
+       }
 }
 
 /*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
                apply_handle_tuple_routing(edata,
                                                                   remoteslot, 
&newtup, CMD_UPDATE);
        else
-               apply_handle_update_internal(edata, edata->targetRelInfo,
-                                                                        
remoteslot, &newtup, rel->localindexoid);
+               SAFE_APPLY(apply_handle_update_internal(edata, 
edata->targetRelInfo,
+                                                                               
                remoteslot, &newtup, rel->localindexoid));
 
        finish_edata(edata);
 
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                                        
localindexoid,
                                                                        
remoteslot, &localslot);
 
+       if (is_prefetching())
+       {
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               if (prefetch_replica_identity_only)
+                       goto Cleanup;
+       }
+
        /*
         * Tuple found.
         *
@@ -2722,7 +2796,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                /* Do the actual update. */
                TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
                ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
-                                                                remoteslot);
+                                                                remoteslot, 
is_prefetching());
        }
        else
        {
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                        remoteslot, newslot, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        ExecCloseIndices(relinfo);
        EvalPlanQualEnd(&epqstate);
 }
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
                ResultRelInfo *relinfo = edata->targetRelInfo;
 
                ExecOpenIndices(relinfo, false);
-               apply_handle_delete_internal(edata, relinfo,
-                                                                        
remoteslot, rel->localindexoid);
+               SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+                                                                               
                remoteslot, rel->localindexoid));
                ExecCloseIndices(relinfo);
        }
 
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
        found = FindReplTupleInLocalRel(edata, localrel, remoterel, 
localindexoid,
                                                                        
remoteslot, &localslot);
 
+       if (is_prefetching())
+       {
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               goto Cleanup;
+       }
+
        /* If found delete it. */
        if (found)
        {
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
                                                        remoteslot, NULL, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        EvalPlanQualEnd(&epqstate);
 }
 
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, 
Relation localrel,
        EState     *estate = edata->estate;
        bool            found;
 
+       LockTupleMode lockmode = is_prefetching() ? 
prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : 
LockTupleExclusive;
+
        /*
         * Regardless of the top-level operation, we're performing a read here, 
so
         * check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, 
Relation localrel,
 #endif
 
                found = RelationFindReplTupleByIndex(localrel, localidxoid,
-                                                                               
         LockTupleExclusive,
+                                                                               
         lockmode,
                                                                                
         remoteslot, *localslot);
        }
        else
-               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+               found = RelationFindReplTupleSeq(localrel, lockmode,
                                                                                
 remoteslot, *localslot);
 
        return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
        switch (operation)
        {
                case CMD_INSERT:
-                       apply_handle_insert_internal(edata, partrelinfo,
-                                                                               
 remoteslot_part);
+                       SAFE_APPLY(apply_handle_insert_internal(edata, 
partrelinfo,
+                                                                               
                        remoteslot_part));
                        break;
 
                case CMD_DELETE:
-                       apply_handle_delete_internal(edata, partrelinfo,
-                                                                               
 remoteslot_part,
-                                                                               
 part_entry->localindexoid);
+                       SAFE_APPLY(apply_handle_delete_internal(edata, 
partrelinfo,
+                                                                               
                        remoteslot_part,
+                                                                               
                        part_entry->localindexoid));
                        break;
 
                case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                {
                                        TupleTableSlot *newslot = localslot;
 
+                                       if (is_prefetching())
+                                               return;
+
                                        /* Store the new tuple for conflict 
reporting */
                                        slot_store_data(newslot, part_entry, 
newtup);
 
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                {
                                        TupleTableSlot *newslot;
 
+                                       if (is_prefetching())
+                                               return;
+
                                        /* Store the new tuple for conflict 
reporting */
                                        newslot = table_slot_create(partrel, 
&estate->es_tupleTable);
                                        slot_store_data(newslot, part_entry, 
newtup);
@@ -3144,7 +3235,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                        
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
                                                                                
  ACL_UPDATE);
                                        ExecSimpleRelationUpdate(partrelinfo, 
estate, &epqstate,
-                                                                               
         localslot, remoteslot_part);
+                                                                               
         localslot, remoteslot_part, is_prefetching());
                                }
                                else
                                {
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                slot_getallattrs(remoteslot);
                                        }
                                        MemoryContextSwitchTo(oldctx);
-                                       apply_handle_insert_internal(edata, 
partrelinfo_new,
-                                                                               
                 remoteslot_part);
+                                       
SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+                                                                               
                                        remoteslot_part));
                                }
 
                                EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr 
local_lsn)
        MemoryContextSwitchTo(ApplyMessageContext);
 }
 
-
 /* Update statistics of the worker. */
 static void
 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz 
send_time, bool reply)
        }
 }
 
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+       ParallelApplyWorkerInfo* winfo;
+
+       if (buf[0] != 'w')
+               return;
+
+       switch (buf[MSG_CODE_OFFSET])
+       {
+               case LOGICAL_REP_MSG_INSERT:
+               case LOGICAL_REP_MSG_UPDATE:
+               case LOGICAL_REP_MSG_DELETE:
+                       /* Round robin prefetch worker */
+                       winfo = prefetch_worker[prefetch_worker_rr++ % 
n_prefetch_workers];
+                       pa_send_data(winfo, len, buf);
+                       break;
+
+               case LOGICAL_REP_MSG_TYPE:
+               case LOGICAL_REP_MSG_RELATION:
+                       /* broadcast to all prefetch workers */
+                       for (int i = 0; i < n_prefetch_workers; i++)
+                       {
+                               winfo = prefetch_worker[i];
+                               pa_send_data(winfo, len, buf);
+                       }
+                       break;
+
+               default:
+                       /* Ignore other messages */
+                       break;
+       }
+}
+
 /*
  * Apply main loop.
  */
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        bool            ping_sent = false;
        TimeLineID      tli;
        ErrorContextCallback errcallback;
+       char* prefetch_buf = NULL;
+       size_t prefetch_buf_pos = 0;
+       size_t prefetch_buf_used = 0;
+       size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
 
        /*
         * Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                                                
                        "LogicalStreamingContext",
                                                                                
                        ALLOCSET_DEFAULT_SIZES);
 
+       if (max_parallel_prefetch_workers_per_subscription != 0)
+       {
+               int i;
+               for (i = 0; i < max_parallel_prefetch_workers_per_subscription; 
i++)
+               {
+                       prefetch_worker[i] = pa_launch_parallel_worker();
+                       if (!prefetch_worker[i])
+                       {
+                               elog(LOG, "Launch only %d prefetch workers from 
%d",
+                                        i, 
max_parallel_prefetch_workers_per_subscription);
+                               break;
+                       }
+                       prefetch_worker[i]->in_use = true;
+                       prefetch_worker[i]->shared->do_prefetch = true;
+               }
+               n_prefetch_workers = i;
+               prefetch_buf = palloc(prefetch_buf_size);
+       }
+
        /* mark as idle, before starting to loop */
        pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        {
                pgsocket        fd = PGINVALID_SOCKET;
                int                     rc;
-               int                     len;
+               int32           len;
                char       *buf = NULL;
                bool            endofstream = false;
+               bool            no_more_data = false;
                long            wait_time;
 
                CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
-               if (len != 0)
+               /* Loop to process all available data (without blocking). */
+               for (;;)
                {
-                       /* Loop to process all available data (without 
blocking). */
-                       for (;;)
-                       {
-                               CHECK_FOR_INTERRUPTS();
+                       CHECK_FOR_INTERRUPTS();
 
-                               if (len == 0)
+                       if (len > 0 && n_prefetch_workers != 0 && 
prefetch_buf_pos == prefetch_buf_used)
+                       {
+                               prefetch_buf_used = 0;
+                               do
                                {
-                                       break;
-                               }
-                               else if (len < 0)
+                                       if (prefetch_buf_used + len + 4 > 
prefetch_buf_size)
+                                       {
+                                               prefetch_buf_size *= 2;
+                                               elog(DEBUG1, "Increase prefetch 
buffer size to %ld", prefetch_buf_size);
+                                               prefetch_buf = 
repalloc(prefetch_buf, prefetch_buf_size);
+                                       }
+                                       
memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+                                       
memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+                                       prefetch_buf_used += 4 + len;
+                                       if (prefetch_buf_used >= 
INIT_PREFETCH_BUF_SIZE)
+                                               break;
+                                       len = 
walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+                               } while (len > 0);
+
+                               no_more_data = len <= 0;
+
+                               for (prefetch_buf_pos = 0; prefetch_buf_pos < 
prefetch_buf_used; prefetch_buf_pos += 4 + len)
                                {
-                                       ereport(LOG,
-                                                       (errmsg("data stream 
from publisher has ended")));
-                                       endofstream = true;
-                                       break;
+                                       memcpy(&len, 
&prefetch_buf[prefetch_buf_pos], 4);
+                                       
lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
                                }
-                               else
-                               {
-                                       int                     c;
-                                       StringInfoData s;
+                               memcpy(&len, prefetch_buf, 4);
+                               buf = &prefetch_buf[4];
+                               prefetch_buf_pos = len + 4;
+                       }
 
-                                       if (ConfigReloadPending)
-                                       {
-                                               ConfigReloadPending = false;
-                                               ProcessConfigFile(PGC_SIGHUP);
-                                       }
+                       if (len == 0)
+                       {
+                               break;
+                       }
+                       else if (len < 0)
+                       {
+                               ereport(LOG,
+                                               (errmsg("data stream from 
publisher has ended")));
+                               endofstream = true;
+                               break;
+                       }
+                       else
+                       {
+                               int                     c;
+                               StringInfoData s;
 
-                                       /* Reset timeout. */
-                                       last_recv_timestamp = 
GetCurrentTimestamp();
-                                       ping_sent = false;
+                               if (ConfigReloadPending)
+                               {
+                                       ConfigReloadPending = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
 
-                                       /* Ensure we are reading the data into 
our memory context. */
-                                       
MemoryContextSwitchTo(ApplyMessageContext);
+                               /* Reset timeout. */
+                               last_recv_timestamp = GetCurrentTimestamp();
+                               ping_sent = false;
 
-                                       initReadOnlyStringInfo(&s, buf, len);
+                               /* Ensure we are reading the data into our 
memory context. */
+                               MemoryContextSwitchTo(ApplyMessageContext);
 
-                                       c = pq_getmsgbyte(&s);
+                               initReadOnlyStringInfo(&s, buf, len);
 
-                                       if (c == 'w')
-                                       {
-                                               XLogRecPtr      start_lsn;
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz send_time;
+                               c = pq_getmsgbyte(&s);
 
-                                               start_lsn = pq_getmsgint64(&s);
-                                               end_lsn = pq_getmsgint64(&s);
-                                               send_time = pq_getmsgint64(&s);
+                               if (c == 'w')
+                               {
+                                       XLogRecPtr      start_lsn;
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz send_time;
 
-                                               if (last_received < start_lsn)
-                                                       last_received = 
start_lsn;
+                                       start_lsn = pq_getmsgint64(&s);
+                                       end_lsn = pq_getmsgint64(&s);
+                                       send_time = pq_getmsgint64(&s);
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       if (last_received < start_lsn)
+                                               last_received = start_lsn;
 
-                                               
UpdateWorkerStats(last_received, send_time, false);
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                               apply_dispatch(&s);
-                                       }
-                                       else if (c == 'k')
-                                       {
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz timestamp;
-                                               bool            reply_requested;
+                                       UpdateWorkerStats(last_received, 
send_time, false);
 
-                                               end_lsn = pq_getmsgint64(&s);
-                                               timestamp = pq_getmsgint64(&s);
-                                               reply_requested = 
pq_getmsgbyte(&s);
+                                       apply_dispatch(&s);
+                               }
+                               else if (c == 'k')
+                               {
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz timestamp;
+                                       bool            reply_requested;
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       end_lsn = pq_getmsgint64(&s);
+                                       timestamp = pq_getmsgint64(&s);
+                                       reply_requested = pq_getmsgbyte(&s);
 
-                                               send_feedback(last_received, 
reply_requested, false);
-                                               
UpdateWorkerStats(last_received, timestamp, true);
-                                       }
-                                       /* other message types are purposefully 
ignored */
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                       MemoryContextReset(ApplyMessageContext);
+                                       send_feedback(last_received, 
reply_requested, false);
+                                       UpdateWorkerStats(last_received, 
timestamp, true);
                                }
+                               /* other message types are purposefully ignored 
*/
 
+                               MemoryContextReset(ApplyMessageContext);
+                       }
+                       if (prefetch_buf_pos < prefetch_buf_used)
+                       {
+                               memcpy(&len, &prefetch_buf[prefetch_buf_pos], 
4);
+                               buf = &prefetch_buf[prefetch_buf_pos + 4];
+                               prefetch_buf_pos += 4 + len;
+                       }
+                       else if (prefetch_buf_used != 0 && no_more_data)
+                       {
+                               break;
+                       }
+                       else
+                       {
                                len = walrcv_receive(LogRepWorkerWalRcvConn, 
&buf, &fd);
                        }
                }
diff --git a/src/backend/utils/misc/guc_tables.c 
b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"prefetch_replica_identity_only",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Whether LR prefetch work should prefetch 
only replica identity index or all other indexes too."),
+                       NULL,
+               },
+               &prefetch_replica_identity_only,
+               true,
+               NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"max_parallel_prefetch_workers_per_subscription",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Maximum number of parallel prefetch 
workers per subscription."),
+                       NULL,
+               },
+               &max_parallel_prefetch_workers_per_subscription,
+               2, 0, MAX_LR_PREFETCH_WORKERS,
+               NULL, NULL, NULL
+       },
+
        {
                {"max_active_replication_origins",
                        PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..fbe705bfe3f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -761,10 +761,10 @@ extern bool RelationFindReplTupleSeq(Relation rel, 
LockTupleMode lockmode,
                                                                         
TupleTableSlot *searchslot, TupleTableSlot *outslot);
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
-                                                                        EState 
*estate, TupleTableSlot *slot);
+                                                                        EState 
*estate, TupleTableSlot *slot, bool prefetch);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
                                                                         EState 
*estate, EPQState *epqstate,
-                                                                        
TupleTableSlot *searchslot, TupleTableSlot *slot);
+                                                                        
TupleTableSlot *searchslot, TupleTableSlot *slot, bool prefetch);
 extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
                                                                         EState 
*estate, EPQState *epqstate,
                                                                         
TupleTableSlot *searchslot);
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
        LockTupleNoKeyExclusive,
        /* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
        LockTupleExclusive,
+       /* Do not lock tuple */
+       LockTupleNoLock,
+       /* Try explusive lock, silent give up in case of conflict */
+       LockTupleTryExclusive,
 } LockTupleMode;
 
 #endif                                                 /* LOCKOPTIONS_H */
diff --git a/src/include/replication/logicallauncher.h 
b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
         */
        PartialFileSetState fileset_state;
        FileSet         fileset;
+
+       /*
+        * Prefetch worker
+        */
+       bool            do_prefetch;
 } ParallelApplyWorkerShared;
 
 /*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                
                bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                                   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType 
action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use &&                     
\
                                                                           
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
                                                                   
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
 
 static inline bool
 am_tablesync_worker(void)

Reply via email to