On 14/07/2025 4:20 am, Zhijie Hou (Fujitsu) wrote:
Thank you for the proposal ! I find it to be a very interesting feature。
I tested the patch you shared in your original email and encountered potential
deadlocks when testing pgbench TPC-B like workload. Could you please provide an
updated patch version so that I can conduct further performance experiments ?
Sorry, it was fixed in my repo: https://github.com/knizhnik/postgres/pull/3
Updated patch is attached.
Additionally, I was also exploring ways to improve performance and have tried an
alternative version of prefetch for experimentation. The alternative design is
that we assigns each non-streaming transaction to a parallel apply worker, while
strictly maintaining the order of commits. During parallel apply, if the
transactions that need to be committed before the current transaction are not
yet finished, the worker performs pre-fetch operations. Specifically, for
updates and deletes, the worker finds and caches the target local tuple to be
updated/deleted. Once all preceding transactions are committed, the parallel
apply worker uses these cached tuples to execute the actual updates or deletes.
What do you think about this alternative ? I think the alternative might offer
more stability in scenarios where shared buffer elimination occurs frequently
and avoids leaving dead tuples in the buffer. However, it also presents some
drawbacks, such as the need to add wait events to maintain commit order,
compared to the approach discussed in this thread.
So as far as I understand your PoC is doing the same as approach 1 in my
proposal - prefetch of replica identity, but it is done not by parallel
prefetch workers, but normal parallel apply workers when they have to
wait until previous transaction is committed. I consider it to be more
complex but may be more efficient than my approach.
The obvious drawback of both your's and my approaches is that it
prefetch only pages of primary index (replica identity). If there are
some other indexes which keys are changed by update, then pages of such
indexes will be read from the disk when you apply update. The same is
also true for insert (in this case you always has to include new tuple
in all indexes) - this is why I have also implemented another approach:
apply operation in prefetch worker and then rollback transaction.
Also I do not quite understand how you handle invalidations? Assume that
we have two transactions - T1 and T2:
T1: ... W1 Commit
T2: ... W1
So T1 writes tuple 1 and then commits transaction. Then T2 updates tuple 1.
If I correctly understand your approach, parallel apply worker for T2
will try to prefetch tuple 1 before T1 is committed.
But in this case it will get old version of the tuple. It is not a
problem if parallel apply worker will repeat lookup and not just use
cached tuple.
One more moment. As far as you assigns each non-streaming transaction to
a parallel apply worker, number of such transactions is limited by
assigns each non-streaming transaction to a parallel apply worker,umber
of background workers. Usually it is not so large (~10). So if there
were 100 parallel transactions and publishers, then at subscriber you
still be able to executed concurrently not more than few of them. In
this sense my approach with separate prefetch workers is more flexible:
each prefetch worker can prefetch as many operations as it can.
diff --git a/src/backend/executor/execReplication.c
b/src/backend/executor/execReplication.c
index 53ddd25c42d..ff7c4ed684d 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel,
Relation idxrel,
* invoking table_tuple_lock.
*/
static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode
lockmode)
{
bool refetch = false;
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
break;
case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
- ereport(LOG,
-
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was
already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
-
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update,
retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ if
(ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be
locked was already moved to another partition due to concurrent update,
retrying")));
+ else
+ ereport(LOG,
+
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent
update, retrying")));
+ refetch = true;
+ }
break;
case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
-
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete,
retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ /* XXX: Improve handling here */
+ ereport(LOG,
+
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete,
retrying")));
+ refetch = true;
+ }
break;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
/* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -256,14 +270,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid),
GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode ==
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow
updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -395,16 +409,23 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
-
/* Found our tuple and it's not locked */
break;
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -414,14 +435,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid),
GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode ==
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow
updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -508,7 +529,7 @@ retry:
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, LockTupleShare))
goto retry;
return true;
@@ -560,7 +581,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState
*estate,
*/
void
ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
- EState *estate, TupleTableSlot
*slot)
+ EState *estate, TupleTableSlot
*slot, bool prefetch)
{
bool skip_tuple = false;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -604,7 +625,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
slot, estate, false,
-
conflictindexes ? true : false,
+
conflictindexes || prefetch ? true : false,
&conflict,
conflictindexes, false);
@@ -623,7 +644,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
* be a frequent thing so we preferred to save the performance
* overhead of extra scan before each insertion.
*/
- if (conflict)
+ if (conflict && !prefetch)
CheckAndReportConflict(resultRelInfo, estate,
CT_INSERT_EXISTS,
recheckIndexes, NULL, slot);
@@ -650,7 +671,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
void
ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
EState *estate, EPQState
*epqstate,
- TupleTableSlot *searchslot,
TupleTableSlot *slot)
+ TupleTableSlot *searchslot,
TupleTableSlot *slot, bool prefetch)
{
bool skip_tuple = false;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -701,7 +722,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes !=
TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
slot, estate, true,
-
conflictindexes ? true : false,
+
conflictindexes || prefetch ? true : false,
&conflict, conflictindexes,
(update_indexes == TU_Summarizing));
@@ -710,7 +731,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
* ExecSimpleRelationInsert to understand why this check is
done at
* this point.
*/
- if (conflict)
+ if (conflict && !prefetch)
CheckAndReportConflict(resultRelInfo, estate,
CT_UPDATE_EXISTS,
recheckIndexes, searchslot, slot);
diff --git a/src/backend/replication/logical/applyparallelworker.c
b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
*/
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
pa_launch_parallel_worker(void)
{
MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
}
}
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+ if (MyParallelShared->do_prefetch)
+ {
+ PG_TRY();
+ {
+ apply_dispatch(s);
+ }
+ PG_CATCH();
+ {
+ HOLD_INTERRUPTS();
+
+ elog(DEBUG1, "Failed to prefetch LR operation");
+
+ /* TODO: should we somehow dump the error or just
silently ignore it? */
+ /* EmitErrorReport(); */
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ lr_prefetch_errors += 1;
+ }
+ PG_END_TRY();
+ if (!prefetch_replica_identity_only)
+ {
+ /* We need to abort transaction to undo insert */
+ AbortCurrentTransaction();
+ }
+ }
+ else
+ {
+ apply_dispatch(s);
+ }
+}
+
/* Parallel apply worker main loop. */
static void
LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
*/
s.cursor += SIZE_STATS_MESSAGE;
- apply_dispatch(&s);
+ pa_apply_dispatch(&s);
}
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (!MyParallelShared->do_prefetch)
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid,
InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this
replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid,
MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ /* Do not write WAL for prefetch */
+ wal_level = WAL_LEVEL_MINIMAL;
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size
nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->shared->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode.
This
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index fd11805a44c..8f1bec5c529 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the
subskiplsn.
@@ -329,6 +341,11 @@ bool InitializingApplyWorker = false;
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes()
(unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching() (am_parallel_apply_worker() &&
MyParallelShared->do_prefetch)
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action,
StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (is_prefetching())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
RelationGetRelationName(rel))));
}
+#define SAFE_APPLY(call)
\
+ if (is_prefetching())
\
+ {
\
+ PG_TRY();
\
+ {
\
+ call;
\
+ }
\
+ PG_CATCH();
\
+ {
\
+ HOLD_INTERRUPTS();
\
+ elog(DEBUG1, "Failed to prefetch LR operation");\
+ FlushErrorState();
\
+ RESUME_INTERRUPTS();
\
+ lr_prefetch_errors += 1;
\
+ }
\
+ PG_END_TRY();
\
+ } else {
\
+ call;
\
+ }
+
+
/*
* Handle INSERT message.
*/
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_insert_internal(edata, relinfo, remoteslot);
+ SAFE_APPLY(apply_handle_insert_internal(edata, relinfo,
remoteslot));
ExecCloseIndices(relinfo);
}
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (is_prefetching() && prefetch_replica_identity_only)
+ {
+ TupleTableSlot *localslot = NULL;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ Relation localrel = relinfo->ri_RelationDesc;
+ EPQState epqstate;
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+ (void)FindReplTupleInLocalRel(edata, localrel,
+
&relmapentry->remoterel,
+
relmapentry->localindexoid,
+
remoteslot, &localslot);
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
+
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot,
is_prefetching());
+ }
+ if (is_prefetching())
+ {
+ lr_prefetch_inserts += 1;
+ }
}
/*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
apply_handle_tuple_routing(edata,
remoteslot,
&newtup, CMD_UPDATE);
else
- apply_handle_update_internal(edata, edata->targetRelInfo,
-
remoteslot, &newtup, rel->localindexoid);
+ SAFE_APPLY(apply_handle_update_internal(edata,
edata->targetRelInfo,
+
remoteslot, &newtup, rel->localindexoid));
finish_edata(edata);
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (prefetch_replica_identity_only)
+ goto Cleanup;
+ }
+
/*
* Tuple found.
*
@@ -2722,7 +2796,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
- remoteslot);
+ remoteslot,
is_prefetching());
}
else
{
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot,
list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_delete_internal(edata, relinfo,
-
remoteslot, rel->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+
remoteslot, rel->localindexoid));
ExecCloseIndices(relinfo);
}
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(edata, localrel, remoterel,
localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ goto Cleanup;
+ }
+
/* If found delete it. */
if (found)
{
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL,
list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata,
Relation localrel,
EState *estate = edata->estate;
bool found;
+ LockTupleMode lockmode = is_prefetching() ?
prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive :
LockTupleExclusive;
+
/*
* Regardless of the top-level operation, we're performing a read here,
so
* check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata,
Relation localrel,
#endif
found = RelationFindReplTupleByIndex(localrel, localidxoid,
-
LockTupleExclusive,
+
lockmode,
remoteslot, *localslot);
}
else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, lockmode,
remoteslot, *localslot);
return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
switch (operation)
{
case CMD_INSERT:
- apply_handle_insert_internal(edata, partrelinfo,
-
remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata,
partrelinfo,
+
remoteslot_part));
break;
case CMD_DELETE:
- apply_handle_delete_internal(edata, partrelinfo,
-
remoteslot_part,
-
part_entry->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata,
partrelinfo,
+
remoteslot_part,
+
part_entry->localindexoid));
break;
case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict
reporting */
slot_store_data(newslot, part_entry,
newtup);
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict
reporting */
newslot = table_slot_create(partrel,
&estate->es_tupleTable);
slot_store_data(newslot, part_entry,
newtup);
@@ -3144,7 +3235,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
ExecSimpleRelationUpdate(partrelinfo,
estate, &epqstate,
-
localslot, remoteslot_part);
+
localslot, remoteslot_part, is_prefetching());
}
else
{
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
- apply_handle_insert_internal(edata,
partrelinfo_new,
-
remoteslot_part);
+
SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+
remoteslot_part));
}
EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr
local_lsn)
MemoryContextSwitchTo(ApplyMessageContext);
}
-
/* Update statistics of the worker. */
static void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz
send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ %
n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription;
i++)
+ {
+ prefetch_worker[i] = pa_launch_parallel_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch workers from
%d",
+ i,
max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ prefetch_worker[i]->in_use = true;
+ prefetch_worker[i]->shared->do_prefetch = true;
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
+ int32 len;
char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without
blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 &&
prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 >
prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch
buffer size to %ld", prefetch_buf_size);
+ prefetch_buf =
repalloc(prefetch_buf, prefetch_buf_size);
+ }
+
memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+
memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >=
INIT_PREFETCH_BUF_SIZE)
+ break;
+ len =
walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos <
prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream
from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len,
&prefetch_buf[prefetch_buf_pos], 4);
+
lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from
publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp =
GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into
our memory context. */
-
MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our
memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received =
start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
-
UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received,
send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested =
pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received,
reply_requested, false);
-
UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully
ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received,
reply_requested, false);
+ UpdateWorkerStats(last_received,
timestamp, true);
}
+ /* other message types are purposefully ignored
*/
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos],
4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn,
&buf, &fd);
}
}
diff --git a/src/backend/utils/misc/guc_tables.c
b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch
only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ true,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch
workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..fbe705bfe3f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -761,10 +761,10 @@ extern bool RelationFindReplTupleSeq(Relation rel,
LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
- EState
*estate, TupleTableSlot *slot);
+ EState
*estate, TupleTableSlot *slot, bool prefetch);
extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
EState
*estate, EPQState *epqstate,
-
TupleTableSlot *searchslot, TupleTableSlot *slot);
+
TupleTableSlot *searchslot, TupleTableSlot *slot, bool prefetch);
extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
EState
*estate, EPQState *epqstate,
TupleTableSlot *searchslot);
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
LockTupleNoKeyExclusive,
/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
LockTupleExclusive,
+ /* Do not lock tuple */
+ LockTupleNoLock,
+ /* Try explusive lock, silent give up in case of conflict */
+ LockTupleTryExclusive,
} LockTupleMode;
#endif /* LOCKOPTIONS_H */
diff --git a/src/include/replication/logicallauncher.h
b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
*/
PartialFileSetState fileset_state;
FileSet fileset;
+
+ /*
+ * Prefetch worker
+ */
+ bool do_prefetch;
} ParallelApplyWorkerShared;
/*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType
action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use &&
\
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
static inline bool
am_tablesync_worker(void)