Shekharrajak commented on code in PR #22572:
URL: https://github.com/apache/kafka/pull/22572#discussion_r3412305517
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombiner.java:
##########
@@ -20,385 +20,170 @@
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Comparator;
import java.util.List;
-import java.util.Objects;
-import java.util.TreeSet;
+import java.util.TreeMap;
+/**
+ * Combines an existing list of {@link PersisterStateBatch} entries with a
list of newly produced
+ * entries and returns the shortest non-overlapping, {@code (deliveryState,
deliveryCount)}-distinct
+ * cover of the union, clipped at {@code startOffset} (SPSO).
+ *
+ * <p>The merge is performed by an event-driven sweep-line over the union of
both inputs. Each
+ * input batch contributes one BEGIN event at {@code firstOffset} and one END
event at
+ * {@code lastOffset + 1}. Events are processed in offset order (END before
BEGIN at the same
+ * offset). A counted ordered map tracks the currently active priorities; the
first key defines the
+ * {@code (state, count)} that wins on the current sub-range. Successive
sub-ranges with identical
+ * {@code (state, count)} are coalesced on the fly.
+ *
+ * <p>Complexity: {@code O((n + k) log p)} where {@code n} is the total number
of input batches,
+ * {@code k} is the number of overlap transitions encountered, and {@code p}
is the number of
+ * distinct {@code (deliveryState, deliveryCount)} priorities.
+ */
public class PersisterStateBatchCombiner {
- private List<PersisterStateBatch> combinedBatchList; // link between
pruning and merging
+ private static final Comparator<BatchPriority> PRIORITY_DESC = (a, b) -> {
+ int cmpCount = Short.compare(b.deliveryCount(), a.deliveryCount());
+ if (cmpCount != 0) {
+ return cmpCount;
+ }
+ return Byte.compare(b.deliveryState(), a.deliveryState());
+ };
+
+ private final List<PersisterStateBatch> batchesSoFar;
+ private final List<PersisterStateBatch> newBatches;
private final long startOffset;
- private TreeSet<PersisterStateBatch> sortedBatches;
- private List<PersisterStateBatch> finalBatchList; // final list is built
here
- private final List<PersisterStateBatch> nonOverlappingBuffer = new
ArrayList<>(); // reused per findMergeCandidatePair call
public PersisterStateBatchCombiner(
List<PersisterStateBatch> batchesSoFar,
List<PersisterStateBatch> newBatches,
long startOffset
) {
- initializeCombinedList(batchesSoFar, newBatches);
- int estimatedResultSize = (combinedBatchList.size() * 3) / 2; //
heuristic size - 50% overallocation
- finalBatchList = new ArrayList<>(estimatedResultSize);
+ this.batchesSoFar = batchesSoFar == null ? List.of() : batchesSoFar;
+ this.newBatches = newBatches == null ? List.of() : newBatches;
this.startOffset = startOffset;
}
- private void initializeCombinedList(List<PersisterStateBatch>
batchesSoFar, List<PersisterStateBatch> newBatches) {
- boolean soFarEmpty = batchesSoFar == null || batchesSoFar.isEmpty();
- boolean newBatchesEmpty = newBatches == null || newBatches.isEmpty();
-
- if (soFarEmpty && newBatchesEmpty) {
- combinedBatchList = new ArrayList<>();
- } else if (soFarEmpty) {
- combinedBatchList = new ArrayList<>(newBatches); // new list as
the original one could be unmodifiable
- } else if (newBatchesEmpty) {
- combinedBatchList = new ArrayList<>(batchesSoFar); // new list as
the original one could be unmodifiable
- } else {
- combinedBatchList = new ArrayList<>(batchesSoFar.size() +
newBatches.size());
- combinedBatchList.addAll(batchesSoFar);
- combinedBatchList.addAll(newBatches);
- }
- }
-
/**
- * Algorithm: Merge current state batches and new batches into a single
non-overlapping batch list.
- * Input: batchesSoFar, newBatches, startOffset
- * Output: combined list with non-overlapping batches (finalBatchList)
- * <p>
- * - Add both currentBatches and newBatches into a single list
combinedBatchList
- * - if combinedBatchList.size() <= 1 return combinedBatchList
- * <p>
- * - Remove/prune any batches from the combinedBatchList:
- * - if batch.lastOffset < startOffset then remove batch from
combinedBatchList
- * - else if batch.firstOffset > startOffset then we will keep the batch
- * - else if batch.firstOffset <= startOffset <= batch.lastOffset then
keep [startOffset, batch.lastOffset] part only and discard rest.
- * <p>
- * - create a treeset sortedBatches using pruned combinedBatchList
- * - find first 2 mergeable batches in sortedBatches set, say, prev and
candidate.
- * - remove any non-overlapping batches from sortedBatches encountered
during the find operation and add them to a finalBatchList
- * - do repeat until a mergeable pair is not found:
- * - based on various conditions of offset overlap and batch state
differences combine the batches or
- * create new batches, if required, and add to the sortedBatches.
- * - find first 2 mergeable batches in sortedBatches set, say, prev and
candidate.
- * - remove any non-mergeable batches from sortedBatches encountered
during the find operation and add them to a finalBatchList
- * - done
- * - return the finalBatchList
- *
- * @return list of {@link PersisterStateBatch} representing
non-overlapping combined batches
+ * Produces the merged, pruned, non-overlapping batch list.
*/
public List<PersisterStateBatch> combineStateBatches() {
- pruneBatches();
- mergeBatches();
- return finalBatchList;
- }
-
- private void mergeBatches() {
- if (combinedBatchList.size() < 2) {
- finalBatchList = combinedBatchList;
- return;
- }
-
- sortedBatches = new TreeSet<>(combinedBatchList);
-
- MergeCandidatePair overlapState = getMergeCandidatePair();
-
- while (overlapState != MergeCandidatePair.EMPTY) {
- PersisterStateBatch prev = overlapState.prev();
- PersisterStateBatch candidate = overlapState.candidate();
-
- // remove both previous and candidate for easier
- // assessment about adding batches to sortedBatches
- sortedBatches.remove(prev);
- sortedBatches.remove(candidate);
-
- int cmp = compareBatchDeliveryInfo(candidate, prev);
- if (cmp == 0) { // same state and overlap or contiguous
- // overlap and same state (prev.firstOffset <=
candidate.firstOffset) due to sort
- // covers:
- // case: 1 2 3 4 5
6 7 (contiguous)
- // prev: ------ ------- ------- -------
------- -------- -------
- // candidate: ------ ---- ---------- ---
---- ------- -------
- handleSameStateMerge(prev, candidate); // pair can be
contiguous or overlapping
- } else {
- // If we reach here then it is guaranteed that the batch pair
is overlapping and
- // non-contiguous because getMergeCandidatePair only returns
contiguous pair if
- // the constituents have the same delivery count and state.
- // covers:
- // case: 1 2* 3 4 5
6 7*
- // prev: ------ ------- ------- -------
------- -------- ------
- // candidate: ------ ---- --------- ----
---- ------- -------
- // max batches: 1 2 2 3
2 2 2
- // min batches: 1 1 1 1
1 2 1
- // * not possible with treeset
- handleDiffStateOverlap(prev, candidate);
- }
- overlapState = getMergeCandidatePair();
+ List<PersisterStateBatch> pruned = prune();
+ if (pruned.isEmpty()) {
+ return pruned;
}
- finalBatchList.addAll(sortedBatches); // some non overlapping
batches might have remained
- }
-
- /**
- * Compares the non-offset state of 2 batches i.e. the deliveryCount and
deliverState.
- * <p>
- * Uses standard compareTo contract x < y => +int, x > y => -int, x == y
=> 0
- *
- * @param b1 - {@link PersisterStateBatch} to compare
- * @param b2 - {@link PersisterStateBatch} to compare
- * @return int representing comparison result.
- */
- private int compareBatchDeliveryInfo(PersisterStateBatch b1,
PersisterStateBatch b2) {
- int deltaCount = Short.compare(b1.deliveryCount(), b2.deliveryCount());
-
- // Delivery state could be:
- // 0 - AVAILABLE (non-terminal)
- // 1 - ACQUIRED - should not be persisted yet
- // 2 - ACKNOWLEDGED (terminal)
- // 3 - ARCHIVING - not implemented in KIP-932 - non-terminal - leads
only to ARCHIVED
- // 4 - ARCHIVED (terminal)
-
- if (deltaCount == 0) { // same delivery count
- return Byte.compare(b1.deliveryState(), b2.deliveryState());
+ if (pruned.size() == 1) {
+ return pruned;
}
- return deltaCount;
+ return sweepMerge(pruned);
}
/**
- * Accepts a sorted set of state batches and finds the first 2 batches
which can be merged.
- * Merged implies that they have some offsets in common or, they are
contiguous with the same state.
- * <p>
- * Any non-mergeable batches prefixing a good mergeable pair are removed
from the sortedBatches.
- * For example:
- * ----- ---- ----- ----- -----
- * ------
- * <---------------> <-------->
- * non-overlapping 1st overlapping pair
- *
- * @return object representing the overlap state
+ * Drops or clips ranges below {@code startOffset}. Returns a new list;
never modifies inputs.
*/
- private MergeCandidatePair getMergeCandidatePair() {
- if (sortedBatches == null || sortedBatches.isEmpty()) {
- return MergeCandidatePair.EMPTY;
- }
- Iterator<PersisterStateBatch> iter = sortedBatches.iterator();
- PersisterStateBatch prev = iter.next();
- nonOverlappingBuffer.clear();
- while (iter.hasNext()) {
- PersisterStateBatch candidate = iter.next();
- if (candidate.firstOffset() <= prev.lastOffset() || // overlap
- prev.lastOffset() + 1 == candidate.firstOffset() &&
compareBatchDeliveryInfo(prev, candidate) == 0) { // contiguous
- updateBatchContainers(nonOverlappingBuffer);
- return new MergeCandidatePair(
- prev,
- candidate
- );
+ private List<PersisterStateBatch> prune() {
+ int estimate = batchesSoFar.size() + newBatches.size();
+ List<PersisterStateBatch> out = new ArrayList<>(estimate);
+ addPruned(out, batchesSoFar);
+ addPruned(out, newBatches);
+ return out;
+ }
+
+ private void addPruned(List<PersisterStateBatch> out,
List<PersisterStateBatch> src) {
+ for (PersisterStateBatch b : src) {
+ if (startOffset != -1 && b.lastOffset() < startOffset) {
+ // batch fully expired
+ continue;
+ }
+ if (startOffset == -1 || b.firstOffset() >= startOffset) {
+ out.add(b);
+ } else {
+ // start offset intersects batch -> clip
+ out.add(new PersisterStateBatch(startOffset, b.lastOffset(),
b.deliveryState(), b.deliveryCount()));
}
- nonOverlappingBuffer.add(prev);
- prev = candidate;
}
-
- updateBatchContainers(nonOverlappingBuffer);
- return MergeCandidatePair.EMPTY;
- }
-
- private void updateBatchContainers(List<PersisterStateBatch>
nonOverlappingBatches) {
- sortedBatches.removeAll(nonOverlappingBatches);
- finalBatchList.addAll(nonOverlappingBatches);
}
/**
- * Accepts a list of {@link PersisterStateBatch} and checks:
- * - last offset is < start offset => batch is removed
- * - first offset > start offset => batch is preserved
- * - start offset intersects the batch => part of batch before start
offset is removed and
- * the part after it is preserved.
+ * Event-driven sweep. Linear time after the initial event sort.
*/
- private void pruneBatches() {
- if (startOffset == -1 || combinedBatchList.isEmpty()) {
- return;
- }
- List<PersisterStateBatch> retainedBatches = new
ArrayList<>(combinedBatchList.size());
- combinedBatchList.forEach(batch -> {
- if (batch.lastOffset() < startOffset) {
- // batch is expired, skip current iteration
- // -------
- // | -> start offset
- return;
- }
-
- if (batch.firstOffset() >= startOffset) {
- // complete batch is valid
- // ---------
- // | -> start offset
- retainedBatches.add(batch);
- } else {
- // start offset intersects batch
- // ---------
- // | -> start offset
- retainedBatches.add(new PersisterStateBatch(startOffset,
batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
+ private List<PersisterStateBatch> sweepMerge(List<PersisterStateBatch>
batches) {
+ int n = batches.size();
+ Event[] events = new Event[n * 2];
+ for (int i = 0; i < n; i++) {
+ PersisterStateBatch b = batches.get(i);
+ BatchPriority priority = BatchPriority.from(b);
+ events[i * 2] = new Event(b.firstOffset(), true, priority);
+ events[i * 2 + 1] = new Event(b.lastOffset() + 1, false, priority);
+ }
+ // END (isBegin=false) sorts before BEGIN (isBegin=true) at the same
offset so that
+ // contiguous same-state ranges meeting at offset X collapse to a
single emit.
+ java.util.Arrays.sort(events, (e1, e2) -> {
Review Comment:
smaller offset first and If two events have the same offset, END goes
before BEGIN because:
false = END
true = BEGIN
Boolean.compare(false, true) puts END first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]