On Thu, Mar 20, 2025 at 3:06 PM Nisha Moond wrote: > > Attached is v6 patch with above comments addressed.
Thanks updating the patch. I have some comments: 1. The naming style of variables changed in this function seems a bit Inconsistent with existing ones, I feel we'd better use similar style, e.g., conflictSlots => conflictslots I included the suggested changes in 0001. ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - TupleTableSlot *localslot, TupleTableSlot *remoteslot, - Oid indexoid, TransactionId localxmin, - RepOriginId localorigin, TimestampTz localts) + List *conflictSlots, TupleTableSlot *remoteslot, + List *conflictIndexes, List *localxmins, + List *localorigins, List *localts) 2. I modified the documents a bit for consistency. Please see 0001 attachment. 3. I have been thinking whether the codes in ReportApplyConflict() can be improved further, e.g., avoid the extra checks in do while(). One idea could be that each caller of ReportApplyConflict() can always pass a valid list for all list-parameter(e.g., conflictIndexes, localxmins ...). And for the cases when the caller could not provide a valid item, it would save an invalid value in the list and pass it to the function. In this approach, ReportApplyConflict() seems cleaner. I am sharing a POC diff (0002) for reference, it can pass regression test, but I have not confirmed every case yet. 4. + origin = list_nth_int(localorigins, conflictNum); ... + localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs))); I personally feel this could be improved, because 1) RepOriginId, being a 16-bit value, is smaller than an int, which might not cause issues but appears somewhat odd when storing a 32-bit value within it; 2) The approach used to store 'committs' seems inelegant (and I didn't find precedents). An alternative approach is to introduce a new structure, ConflictTupleInfo, containing items like slot, origin, committs, and xmin for list integration. This way, the code could be simpler, and we can avoid the above coding. Please see 0003 for reference. (Note that some comments in this patch could be improved) Best Regards, Hou zj
From 66dab4059c010435d41fa7b548e7d4cf43e6e1be Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Thu, 20 Mar 2025 18:52:59 +0800 Subject: [PATCH] cosmetic changes --- doc/src/sgml/logical-replication.sgml | 12 ++++++------ src/backend/executor/execReplication.c | 12 ++++++------ src/backend/replication/logical/conflict.c | 8 ++++---- src/include/replication/conflict.h | 6 +++--- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 4817206af7d..518520d3ff4 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1881,12 +1881,12 @@ test_sub=# SELECT * from tab_gen_to_gen; <term><literal>multiple_unique_conflicts</literal></term> <listitem> <para> - Inserting a row or updating values of a row violates more than one - <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log - the origin and commit timestamp details of the conflicting key, - <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link> - should be enabled on the subscriber. In this case, an error will be - raised until the conflict is resolved manually. + Inserting or updating a row violates multiple + <literal>NOT DEFERRABLE</literal> unique constraints. Note that to log + the origin and commit timestamp details of conflicting keys, ensure + that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link> + is enabled on the subscriber. In this case, an error will be raised until + the conflict is resolved manually. </para> </listitem> </varlistentry> diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index efd93cda69a..a2a0ab90ab0 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -494,8 +494,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *searchslot, TupleTableSlot *remoteslot) { int conflicts = 0; - List *conflictSlots = NIL; - List *conflictIndexes = NIL; + List *conflictslots = NIL; + List *conflictindexes = NIL; List *localxmins = NIL; List *localorigins = NIL; List *localts = NIL; @@ -518,8 +518,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, * Add the conflict slot, index, and the transaction info to the * respective lists. */ - conflictSlots = lappend(conflictSlots, conflictslot); - conflictIndexes = lappend_oid(conflictIndexes, uniqueidx); + conflictslots = lappend(conflictslots, conflictslot); + conflictindexes = lappend_oid(conflictindexes, uniqueidx); localxmins = lappend_xid(localxmins, xmin); localorigins = lappend_int(localorigins, origin); localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs))); @@ -532,8 +532,8 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, if (conflicts) ReportApplyConflict(estate, resultRelInfo, ERROR, conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, - searchslot, conflictSlots, remoteslot, - conflictIndexes, localxmins, localorigins, localts); + searchslot, conflictslots, remoteslot, + conflictindexes, localxmins, localorigins, localts); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 2074ef17321..4b50467373d 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -91,13 +91,13 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, * 'searchslot' should contain the tuple used to search the local tuple to be * updated or deleted. * - * 'conflictSlots' list contain the existing local tuples, if any, that + * 'conflictslots' list contains the existing local tuples, if any, that * conflicts with the remote tuple. 'localxmins', 'localorigins', and 'localts' * provide the transaction information related to the existing local tuples. * * 'remoteslot' should contain the remote new tuple, if any. * - * The 'conflictIndexes' list represents the OIDs of the unique index that + * The 'conflictindexes' list represents the OIDs of the unique index that * triggered the constraint violation error. We use this to report the key * values for conflicting tuple. * @@ -107,8 +107,8 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictSlots, TupleTableSlot *remoteslot, - List *conflictIndexes, List *localxmins, + List *conflictslots, TupleTableSlot *remoteslot, + List *conflictindexes, List *localxmins, List *localorigins, List *localts) { int conflictNum = 0; diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index bba7f7156b5..f32c3266719 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -60,9 +60,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictSlots, + List *conflictslots, TupleTableSlot *remoteslot, - List *conflictIndexes, List *localxmin, - List *localorigin, List *localts); + List *conflictindexes, List *localxmins, + List *localorigins, List *localts); extern void InitConflictIndexes(ResultRelInfo *relInfo); #endif -- 2.30.0.windows.2
From 1b809742f1de7a87e159ac8b4f120e66ed7172ce Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Thu, 20 Mar 2025 18:55:11 +0800 Subject: [PATCH 2/3] refactor code --- src/backend/replication/logical/conflict.c | 71 +++++++++------------- src/backend/replication/logical/worker.c | 55 ++++++++++++----- 2 files changed, 68 insertions(+), 58 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 84ef4648747..9beded483d4 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -111,59 +111,36 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, List *conflictindexes, List *localxmins, List *localorigins, List *localts) { - int conflictNum = 0; Relation localrel = relinfo->ri_RelationDesc; - ListCell *slotlc = list_head(conflictSlots); StringInfoData err_detail; + ListCell *lslot; + ListCell *lindex; + ListCell *lxmin; + ListCell *lorigin; + ListCell *lts; initStringInfo(&err_detail); - do + /* + * Iterate over conflicting tuples, along with their commit timestamps, + * origins, and the conflicting indexes to assemble an errdetail() line. + */ + forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins, + lorigin, localorigins, lts, localts) { - Oid indexoid = InvalidOid; - TimestampTz committs = 0; - RepOriginId origin = InvalidRepOriginId; - TransactionId xmin = InvalidTransactionId; - TupleTableSlot *slot = NULL; - - if (slotlc) - { - Assert(localxmins && localorigins && localts); - - slot = lfirst(slotlc); - origin = list_nth_int(localorigins, conflictNum); - xmin = lfirst_xid(list_nth_cell(localxmins, conflictNum)); - committs = (TimestampTz) lfirst(list_nth_cell(localts, conflictNum)); - - slotlc = lnext(conflictSlots, slotlc); - } - - if (conflictIndexes) - indexoid = list_nth_oid(conflictIndexes, conflictNum); - - Assert(!OidIsValid(indexoid) || - CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); - - /* - * Build the error detail message containing the conflicting key and - * tuple information. The details for each conflict will be appended - * to err_detail. - */ errdetail_apply_conflict(estate, relinfo, type, searchslot, - slot, remoteslot, indexoid, - xmin, origin, committs, &err_detail); - - conflictNum++; - - } while (slotlc); + lfirst(lslot), remoteslot, + lfirst_oid(lindex), + lfirst_xid(lxmin), + lfirst_int(lorigin), + (TimestampTz) lfirst(lts), + &err_detail); + } /* Conflict stats are not gathered for multiple_unique_conflicts */ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS) pgstat_report_subscription_conflict(MySubscription->oid, type); - /* Remove the extra newline at the end of err_detail */ - err_detail.data[err_detail.len - 1] = '\0'; - ereport(elevel, errcode_apply_conflict(type), errmsg("conflict detected on relation \"%s.%s\": conflict=%s", @@ -257,7 +234,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: case CT_MULTIPLE_UNIQUE_CONFLICTS: - Assert(OidIsValid(indexoid)); + Assert(OidIsValid(indexoid) && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); if (localts) { @@ -339,7 +317,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (val_desc) appendStringInfo(&err_detail, "\n%s", val_desc); - appendStringInfo(err_msg, "%s\n", err_detail.data); + /* + * Insert a blank line to visually separate the new detail line from the + * existing ones. + */ + if (err_msg->len > 0) + appendStringInfoChar(err_msg, '\n'); + + appendStringInfo(err_msg, "%s", err_detail.data); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4a595e7906f..c19c4f938db 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2674,7 +2674,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; - TupleTableSlot *localslot; + TupleTableSlot *localslot = NULL; + Oid conflictindex = InvalidOid; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; bool found; MemoryContext oldctx; @@ -2693,10 +2697,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. @@ -2712,7 +2712,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, list_make1(localslot), newslot, - NULL, list_make1_xid(localxmin), + list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); } @@ -2735,6 +2736,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + localslot = NULL; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); @@ -2743,8 +2746,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, NULL, newslot, NULL, - NULL, NULL, NULL); + remoteslot, list_make1(localslot), newslot, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); } /* Cleanup. */ @@ -2862,6 +2868,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; + Oid conflictindex = InvalidOid; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2889,7 +2899,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata, localorigin != replorigin_session_origin) ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, remoteslot, list_make1(localslot), NULL, - NULL, list_make1_xid(localxmin), + list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); @@ -2901,12 +2912,18 @@ apply_handle_delete_internal(ApplyExecutionData *edata, } else { + localslot = NULL; + /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, NULL, NULL, NULL, NULL, NULL, NULL); + remoteslot, list_make1(localslot), NULL, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); } /* Cleanup. */ @@ -3074,9 +3091,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; + Oid conflictindex = InvalidOid; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3087,6 +3105,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + localslot = NULL; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3096,7 +3116,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_MISSING, remoteslot_part, - NULL, newslot, NULL, NULL, NULL, NULL); + list_make1(localslot), newslot, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); return; } @@ -3116,7 +3140,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot_part, list_make1(localslot), - newslot, NULL, list_make1_xid(localxmin), + newslot, list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); } -- 2.30.0.windows.2
From cf93ea5272c3ea225af1a52b979a21a1bcb00c95 Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Thu, 20 Mar 2025 19:20:17 +0800 Subject: [PATCH 3/3] add a struct --- src/backend/executor/execReplication.c | 33 +++----- src/backend/replication/logical/conflict.c | 17 ++--- src/backend/replication/logical/worker.c | 87 ++++++++-------------- src/include/replication/conflict.h | 17 ++++- 4 files changed, 59 insertions(+), 95 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index a2a0ab90ab0..bf352f69e2f 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -493,12 +493,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot) { - int conflicts = 0; - List *conflictslots = NIL; - List *conflictindexes = NIL; - List *localxmins = NIL; - List *localorigins = NIL; - List *localts = NIL; + List *conflicttuples = NIL; TupleTableSlot *conflictslot; /* Check all the unique indexes for conflicts */ @@ -508,32 +503,22 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot, &conflictslot)) { - RepOriginId origin; - TimestampTz committs; - TransactionId xmin; + ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo); - GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); + conflicttuple->slot = conflictslot; - /* - * Add the conflict slot, index, and the transaction info to the - * respective lists. - */ - conflictslots = lappend(conflictslots, conflictslot); - conflictindexes = lappend_oid(conflictindexes, uniqueidx); - localxmins = lappend_xid(localxmins, xmin); - localorigins = lappend_int(localorigins, origin); - localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs))); + GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin, + &conflicttuple->origin, &conflicttuple->ts); - conflicts++; + conflicttuples = lappend(conflicttuples, conflicttuple); } } /* Report the conflict if found */ - if (conflicts) + if (conflicttuples) ReportApplyConflict(estate, resultRelInfo, ERROR, - conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, - searchslot, conflictslots, remoteslot, - conflictindexes, localxmins, localorigins, localts); + list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, + searchslot, remoteslot, conflicttuples); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 9beded483d4..10f07e18a4b 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -107,9 +107,7 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictslots, TupleTableSlot *remoteslot, - List *conflictindexes, List *localxmins, - List *localorigins, List *localts) + TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; StringInfoData err_detail; @@ -125,15 +123,14 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, * Iterate over conflicting tuples, along with their commit timestamps, * origins, and the conflicting indexes to assemble an errdetail() line. */ - forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins, - lorigin, localorigins, lts, localts) + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { errdetail_apply_conflict(estate, relinfo, type, searchslot, - lfirst(lslot), remoteslot, - lfirst_oid(lindex), - lfirst_xid(lxmin), - lfirst_int(lorigin), - (TimestampTz) lfirst(lts), + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, &err_detail); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c19c4f938db..058c353f992 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2675,10 +2675,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot = NULL; - Oid conflictindex = InvalidOid; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; + ConflictTupleInfo conflicttuple = {0}; bool found; MemoryContext oldctx; @@ -2701,8 +2698,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2710,12 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, newslot = table_slot_create(localrel, &estate->es_tupleTable); slot_store_data(newslot, relmapentry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot, list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, newslot, + list_make1(&conflicttuple)); } /* Process and store remote tuple in the slot */ @@ -2736,8 +2733,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; - localslot = NULL; - /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); @@ -2746,11 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, newslot, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -2868,10 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; - Oid conflictindex = InvalidOid; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; + ConflictTupleInfo conflicttuple = {0}; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2887,22 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + { + conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, - remoteslot, list_make1(localslot), NULL, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, NULL, + list_make1(&conflicttuple)); + } EvalPlanQualSetSlot(&epqstate, localslot); @@ -2912,18 +2897,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata, } else { - localslot = NULL; - /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, list_make1(localslot), NULL, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, NULL, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3091,10 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; - Oid conflictindex = InvalidOid; + ConflictTupleInfo conflicttuple = {0}; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3105,8 +3081,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; - localslot = NULL; - /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3116,11 +3090,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_MISSING, remoteslot_part, - list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + newslot, list_make1(&conflicttuple)); return; } @@ -3129,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3138,12 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot_part, list_make1(localslot), - newslot, list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot_part, newslot, + list_make1(&conflicttuple)); } /* diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index f32c3266719..976f1708095 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -53,6 +53,19 @@ typedef enum #define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1) + +/* + * Information for the exiting local tuple that caused the conflict. + */ +typedef struct ConflictTupleInfo +{ + TupleTableSlot *slot; + Oid indexoid; /* conflicting index */ + TransactionId xmin; /* transaction ID that modified the existing local tuple */ + RepOriginId origin; /* which origin modified the exiting local tuple */ + TimestampTz ts; /* when the exiting local tuple was modified by the origin */ +} ConflictTupleInfo; + extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, @@ -60,9 +73,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictslots, TupleTableSlot *remoteslot, - List *conflictindexes, List *localxmins, - List *localorigins, List *localts); + List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); #endif -- 2.30.0.windows.2