AndrewJSchofield commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1756270704
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
Review Comment:
Given that this is processing the batches we have so far, I'm not convinced
that it is worth merging and pruning at this point. I understand that the start
offset might have changed, but surely it's already optimised by the previous
pass through.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(batchesSoFar),
+ startOffset
+ )
+ ));
+
+ // will take care of overlapping batches
+ List<PersisterStateBatch> modifiedNewBatches = new ArrayList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(newBatches),
+ startOffset
+ )
+ ));
+
+ for (PersisterStateBatch batch : modifiedNewBatches) {
+ for (int i = 0; i < batchQueue.size(); i++) {
+ PersisterStateBatch cur = batchQueue.poll();
+ // cur batch under inspection has no overlap with the new one
+ // we will need to add to our result
+ if (batch.lastOffset() < cur.firstOffset() ||
batch.firstOffset() > cur.lastOffset()) {
+ batchQueue.add(cur);
+ continue;
+ }
+
+ // Covers cases where we need to create a new interval
+ // from the current one such that they do not
+ // overlap with the new one.
+ // Following cases will not produce any new records so need
not be handled.
+ // cur: ____ ______ ______ _____
+ // new: ________ ______ _________ _________
+
+
+ // covers
+ // cur: ______ _____ _____ ______
+ // batch: ___ _____ ___ ___
+ if (batch.firstOffset() >= cur.firstOffset() &&
batch.lastOffset() <= cur.lastOffset()) {
+ // extra batch needs to be created
+ if (batch.firstOffset() > cur.firstOffset()) {
+ batchQueue.add(
+ new PersisterStateBatch(
+ cur.firstOffset(),
+ batch.firstOffset() - 1,
+ cur.deliveryState(),
+ cur.deliveryCount()
+ )
+ );
+ }
+
+ // extra batch needs to be created
+ if (batch.lastOffset() < cur.lastOffset()) {
+ batchQueue.add(
+ new PersisterStateBatch(
+ batch.lastOffset() + 1,
+ cur.lastOffset(),
+ cur.deliveryState(),
+ cur.deliveryCount()
+ )
+ );
+ }
+ } else if (batch.firstOffset() < cur.firstOffset() &&
batch.lastOffset() < cur.lastOffset()) {
+ // covers
+ // ______
Review Comment:
I like the little diagrams. I suggest adding the `cur:` and `batch:` to
these on lines 630 and 640 too.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(batchesSoFar),
+ startOffset
+ )
+ ));
+
+ // will take care of overlapping batches
+ List<PersisterStateBatch> modifiedNewBatches = new ArrayList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(newBatches),
+ startOffset
+ )
+ ));
+
+ for (PersisterStateBatch batch : modifiedNewBatches) {
+ for (int i = 0; i < batchQueue.size(); i++) {
+ PersisterStateBatch cur = batchQueue.poll();
+ // cur batch under inspection has no overlap with the new one
+ // we will need to add to our result
+ if (batch.lastOffset() < cur.firstOffset() ||
batch.firstOffset() > cur.lastOffset()) {
+ batchQueue.add(cur);
+ continue;
+ }
+
+ // Covers cases where we need to create a new interval
+ // from the current one such that they do not
+ // overlap with the new one.
+ // Following cases will not produce any new records so need
not be handled.
+ // cur: ____ ______ ______ _____
+ // new: ________ ______ _________ _________
+
+
+ // covers
+ // cur: ______ _____ _____ ______
Review Comment:
The second case here overlaps with the second case above (matching first and
last offsets). Logically that might be fine in the code, but it looks like a
contradiction in the comments.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -537,38 +539,195 @@ private static ShareGroupOffset merge(ShareGroupOffset
soFar, ShareUpdateValue n
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches,
newData.stateBatches().stream()
- .map(PersisterOffsetsStateBatch::from)
- .collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset))
- .build();
+ .map(ShareCoordinatorShard::toPersisterStateBatch)
+ .collect(Collectors.toList()), newStartOffset))
+ .build();
}
/**
- * Util method which takes in 2 collections containing {@link
PersisterOffsetsStateBatch}
+ * Util method which takes in 2 collections containing {@link
PersisterStateBatch}
* and the startOffset.
- * It removes all batches from the 1st collection which have the same
first and last offset
- * as the batches in 2nd collection. It then creates a final list of
batches which contains the
- * former result and all the batches in the 2nd collection. In set
notation (A - B) U B (we prefer batches in B
- * which have same first and last offset in A).
- * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1.
- * @param currentBatch - collection containing current soft state of
batches
- * @param newBatch - collection containing batches in incoming request
+ * This method checks any overlap between current state batches and new
state batches.
+ * Based on various conditions it creates new non-overlapping records
preferring new batches.
+ * Finally, it removes any batches where the lastOffset < startOffset, if
the startOffset > -1 and
+ * merges any contiguous intervals with same state.
+ * @param batchesSoFar - collection containing current soft state of
batches
+ * @param newBatches - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
- private static List<PersisterOffsetsStateBatch> combineStateBatches(
- Collection<PersisterOffsetsStateBatch> currentBatch,
- Collection<PersisterOffsetsStateBatch> newBatch,
+ // visibility for testing
+ static List<PersisterStateBatch> combineStateBatches(
+ List<PersisterStateBatch> batchesSoFar,
+ List<PersisterStateBatch> newBatches,
long startOffset
) {
- currentBatch.removeAll(newBatch);
- List<PersisterOffsetsStateBatch> batchesToAdd = new
LinkedList<>(currentBatch);
- batchesToAdd.addAll(newBatch);
- // Any batches where the last offset is < the current start offset
- // are now expired. We should remove them from the persister.
+ // will take care of overlapping batches
+ Queue<PersisterStateBatch> batchQueue = new LinkedList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(batchesSoFar),
+ startOffset
+ )
+ ));
+
+ // will take care of overlapping batches
+ List<PersisterStateBatch> modifiedNewBatches = new ArrayList<>(
+ mergeBatches(
+ pruneBatches(
+ getSortedList(newBatches),
+ startOffset
+ )
+ ));
+
+ for (PersisterStateBatch batch : modifiedNewBatches) {
+ for (int i = 0; i < batchQueue.size(); i++) {
+ PersisterStateBatch cur = batchQueue.poll();
+ // cur batch under inspection has no overlap with the new one
+ // we will need to add to our result
+ if (batch.lastOffset() < cur.firstOffset() ||
batch.firstOffset() > cur.lastOffset()) {
+ batchQueue.add(cur);
+ continue;
+ }
+
+ // Covers cases where we need to create a new interval
+ // from the current one such that they do not
+ // overlap with the new one.
+ // Following cases will not produce any new records so need
not be handled.
+ // cur: ____ ______ ______ _____
+ // new: ________ ______ _________ _________
Review Comment:
Probably ought to use `batch:` in the diagram. For these cases, I think the
idea is that `cur` is superseded by `batch`, so `batchQueue.add(batch)` at line
650 is sufficient to add the new batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]