[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-16 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1071326907


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   I use a way to clean up the old overlap buckets when recovering buckets, so 
that we can persist the newly merged bucket first, then delete the original 
bucket.
   
   The approach is as follows:
   If there is range `[1...10]`、`[30...40]`、`[1...40]`, then it will only 
remain `[1...40]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-16 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1071326907


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   I use a way to clean up the old overlap buckets when recovering buckets, so 
that we can persist the newly merged bucket first, then delete the original 
bucket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-16 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1070985106


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   > Can we mark the old segment as deleting or merging? So that we can 
continue the merge operation after the broker crashes.
   
   This will introduce more state in metadata, we can improve it when we really 
need it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-16 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1070929495


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {

Review Comment:
   Sorry, I missed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-13 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069264203


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -144,7 +155,12 @@ private synchronized long recoverBucketSnapshot() throws 
RuntimeException {
 }
 
 try {
-FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
+toBeDeletedBucketMap.forEach((k, immutableBucket) -> {
+immutableBuckets.remove(k);
+immutableBucket.asyncDeleteBucketSnapshot();

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-13 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069262861


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -308,6 +375,10 @@ public synchronized NavigableSet 
getScheduledMessages(int maxMessa
 try {
 
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
 if (CollectionUtils.isEmpty(indexList)) {
+synchronized (immutableBuckets) {
+
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
+}
+bucket.asyncDeleteBucketSnapshot();

Review Comment:
   Add exception log in `asyncDeleteBucketSnapshot`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-13 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069231457


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   > When is the index rebuilt?
   
   If index is lost, the tracker will rebuild index when reread by the managed 
cursor.
   
   > Should we unload the topic to rebuild the index if this operation fails?
   
   Yes, we can unload the topic to rebuild the index



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066889404


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
+
+private final List segmentListA;
+private final List segmentListB;
+
+private int segmentListACursor = 0;
+private int segmentListBCursor = 0;
+private int segmentACursor = 0;
+private int segmentBCursor = 0;
+
+private CombinedSegmentDelayedIndexQueue(List 
segmentListA,
+ List 
segmentListB) {
+this.segmentListA = segmentListA;
+this.segmentListB = segmentListB;
+}
+
+public static CombinedSegmentDelayedIndexQueue wrap(
+List segmentListA,
+List segmentListB) {
+return new CombinedSegmentDelayedIndexQueue(segmentListA, 
segmentListB);
+}
+
+@Override
+public boolean isEmpty() {
+return segmentListACursor >= segmentListA.size() && segmentListBCursor 
>= segmentListB.size();
+}
+
+@Override
+public DelayedIndex peek() {
+return getValue(false);
+}
+
+@Override
+public DelayedIndex pop() {
+return getValue(true);
+}
+
+private DelayedIndex getValue(boolean needAdvanceCursor) {
+while (segmentListACursor < segmentListA.size()
+&& segmentACursor >= 
segmentListA.get(segmentListACursor).getIndexesCount()) {
+segmentListACursor++;
+}
+while (segmentListBCursor < segmentListB.size()
+&& segmentBCursor >= 
segmentListB.get(segmentListBCursor).getIndexesCount()) {
+segmentListBCursor++;
+}

Review Comment:
   > Sorry, I'm not clear about this. If we want to skip the empty segments, 
could we check whether the index count is zero or not?
   
   You are right, This logic is a little redundant, should only check whether 
the index count is zero or not.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
+
+private final List segmentListA;
+private final List segmentListB;
+
+private int segmentListACursor = 0;
+private int segmentListBCursor = 0;
+private int segmentACursor = 0;
+private int segmentBCursor = 0;
+
+private CombinedSegmentDelayedIndexQueue(List 
segmentListA,
+

[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066718099


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+
+@NotThreadSafe
+public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
+
+private final List segmentListA;
+private final List segmentListB;
+
+private int segmentListACursor = 0;
+private int segmentListBCursor = 0;
+private int segmentACursor = 0;
+private int segmentBCursor = 0;
+
+private CombinedSegmentDelayedIndexQueue(List 
segmentListA,
+ List 
segmentListB) {
+this.segmentListA = segmentListA;
+this.segmentListB = segmentListB;
+}
+
+public static CombinedSegmentDelayedIndexQueue wrap(
+List segmentListA,
+List segmentListB) {
+return new CombinedSegmentDelayedIndexQueue(segmentListA, 
segmentListB);
+}
+
+@Override
+public boolean isEmpty() {
+return segmentListACursor >= segmentListA.size() && segmentListBCursor 
>= segmentListB.size();
+}
+
+@Override
+public DelayedIndex peek() {
+return getValue(false);
+}
+
+@Override
+public DelayedIndex pop() {
+return getValue(true);
+}
+
+private DelayedIndex getValue(boolean needAdvanceCursor) {
+while (segmentListACursor < segmentListA.size()
+&& segmentACursor >= 
segmentListA.get(segmentListACursor).getIndexesCount()) {
+segmentListACursor++;
+}
+while (segmentListBCursor < segmentListB.size()
+&& segmentBCursor >= 
segmentListB.get(segmentListBCursor).getIndexesCount()) {
+segmentListBCursor++;
+}

Review Comment:
   This is for skipping empty segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708378


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   If the bucket index is lost, the tracker can rebuild the index, but if there 
are two overlapped buckets, it will cause data confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708787


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.Comparator;
+import java.util.Objects;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+
+public interface DelayedIndexQueue {

Review Comment:
   I will remove public modifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708378


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##
@@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long 
entryId, long deliver
 return true;
 }
 
+private synchronized CompletableFuture asyncMergeBucketSnapshot() {
+List values = 
immutableBuckets.asMapOfRanges().values().stream().toList();
+long minNumberMessages = Long.MAX_VALUE;
+int minIndex = -1;
+for (int i = 0; i + 1 < values.size(); i++) {
+ImmutableBucket bucketL = values.get(i);
+ImmutableBucket bucketR = values.get(i + 1);
+long numberMessages = bucketL.numberBucketDelayedMessages + 
bucketR.numberBucketDelayedMessages;
+if (numberMessages < minNumberMessages) {
+minNumberMessages = (int) numberMessages;
+minIndex = i;
+}
+}
+return asyncMergeBucketSnapshot(values.get(minIndex), 
values.get(minIndex + 1));
+}
+
+private synchronized CompletableFuture 
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
+  
ImmutableBucket bucketB) {
+immutableBuckets.remove(Range.closed(bucketA.startLedgerId, 
bucketA.endLedgerId));
+immutableBuckets.remove(Range.closed(bucketB.startLedgerId, 
bucketB.endLedgerId));
+
+CompletableFuture snapshotCreateFutureA =
+
bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+CompletableFuture snapshotCreateFutureB =
+
bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
+
+return CompletableFuture.allOf(snapshotCreateFutureA, 
snapshotCreateFutureB).thenCompose(__ -> {
+
CompletableFuture>
 futureA =
+bucketA.getRemainSnapshotSegment();
+
CompletableFuture>
 futureB =
+bucketB.getRemainSnapshotSegment();
+return futureA.thenCombine(futureB, 
CombinedSegmentDelayedIndexQueue::wrap)
+.thenCompose(combinedDelayedIndexQueue -> {
+CompletableFuture removeAFuture = 
bucketA.asyncDeleteBucketSnapshot();
+CompletableFuture removeBFuture = 
bucketB.asyncDeleteBucketSnapshot();
+
+return CompletableFuture.allOf(removeAFuture, 
removeBFuture).thenRun(() -> {

Review Comment:
   If the bucket index is lost, the tracker needs to rebuild the index, but if 
there are two intersecting buckets, it will cause data confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8

2023-01-11 Thread GitBox


coderzc commented on code in PR #19138:
URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066704678


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+
+@NotThreadSafe
+public class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue {

Review Comment:
   I will remove `public` modifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org