[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1071326907 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: I use a way to clean up the old overlap buckets when recovering buckets, so that we can persist the newly merged bucket first, then delete the original bucket. The approach is as follows: If there is range `[1...10]`、`[30...40]`、`[1...40]`, then it will only remain `[1...40]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1071326907 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: I use a way to clean up the old overlap buckets when recovering buckets, so that we can persist the newly merged bucket first, then delete the original bucket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1070985106 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: > Can we mark the old segment as deleting or merging? So that we can continue the merge operation after the broker crashes. This will introduce more state in metadata, we can improve it when we really need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1070929495 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; + +@NotThreadSafe +public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { Review Comment: Sorry, I missed this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069264203 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -144,7 +155,12 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException { } try { -FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); +FutureUtil.waitForAll(futures).whenComplete((__, ex) -> { +toBeDeletedBucketMap.forEach((k, immutableBucket) -> { +immutableBuckets.remove(k); +immutableBucket.asyncDeleteBucketSnapshot(); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069262861 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -308,6 +375,10 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa try { bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { if (CollectionUtils.isEmpty(indexList)) { +synchronized (immutableBuckets) { + immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); +} +bucket.asyncDeleteBucketSnapshot(); Review Comment: Add exception log in `asyncDeleteBucketSnapshot` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1069231457 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: > When is the index rebuilt? If index is lost, the tracker will rebuild index when reread by the managed cursor. > Should we unload the topic to rebuild the index if this operation fails? Yes, we can unload the topic to rebuild the index -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066889404 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; + +@NotThreadSafe +public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { + +private final List segmentListA; +private final List segmentListB; + +private int segmentListACursor = 0; +private int segmentListBCursor = 0; +private int segmentACursor = 0; +private int segmentBCursor = 0; + +private CombinedSegmentDelayedIndexQueue(List segmentListA, + List segmentListB) { +this.segmentListA = segmentListA; +this.segmentListB = segmentListB; +} + +public static CombinedSegmentDelayedIndexQueue wrap( +List segmentListA, +List segmentListB) { +return new CombinedSegmentDelayedIndexQueue(segmentListA, segmentListB); +} + +@Override +public boolean isEmpty() { +return segmentListACursor >= segmentListA.size() && segmentListBCursor >= segmentListB.size(); +} + +@Override +public DelayedIndex peek() { +return getValue(false); +} + +@Override +public DelayedIndex pop() { +return getValue(true); +} + +private DelayedIndex getValue(boolean needAdvanceCursor) { +while (segmentListACursor < segmentListA.size() +&& segmentACursor >= segmentListA.get(segmentListACursor).getIndexesCount()) { +segmentListACursor++; +} +while (segmentListBCursor < segmentListB.size() +&& segmentBCursor >= segmentListB.get(segmentListBCursor).getIndexesCount()) { +segmentListBCursor++; +} Review Comment: > Sorry, I'm not clear about this. If we want to skip the empty segments, could we check whether the index count is zero or not? You are right, This logic is a little redundant, should only check whether the index count is zero or not. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; + +@NotThreadSafe +public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { + +private final List segmentListA; +private final List segmentListB; + +private int segmentListACursor = 0; +private int segmentListBCursor = 0; +private int segmentACursor = 0; +private int segmentBCursor = 0; + +private CombinedSegmentDelayedIndexQueue(List segmentListA, +
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066718099 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; + +@NotThreadSafe +public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { + +private final List segmentListA; +private final List segmentListB; + +private int segmentListACursor = 0; +private int segmentListBCursor = 0; +private int segmentACursor = 0; +private int segmentBCursor = 0; + +private CombinedSegmentDelayedIndexQueue(List segmentListA, + List segmentListB) { +this.segmentListA = segmentListA; +this.segmentListB = segmentListB; +} + +public static CombinedSegmentDelayedIndexQueue wrap( +List segmentListA, +List segmentListB) { +return new CombinedSegmentDelayedIndexQueue(segmentListA, segmentListB); +} + +@Override +public boolean isEmpty() { +return segmentListACursor >= segmentListA.size() && segmentListBCursor >= segmentListB.size(); +} + +@Override +public DelayedIndex peek() { +return getValue(false); +} + +@Override +public DelayedIndex pop() { +return getValue(true); +} + +private DelayedIndex getValue(boolean needAdvanceCursor) { +while (segmentListACursor < segmentListA.size() +&& segmentACursor >= segmentListA.get(segmentListACursor).getIndexesCount()) { +segmentListACursor++; +} +while (segmentListBCursor < segmentListB.size() +&& segmentBCursor >= segmentListB.get(segmentListBCursor).getIndexesCount()) { +segmentListBCursor++; +} Review Comment: This is for skipping empty segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708378 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: If the bucket index is lost, the tracker can rebuild the index, but if there are two overlapped buckets, it will cause data confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708787 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.Comparator; +import java.util.Objects; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; + +public interface DelayedIndexQueue { Review Comment: I will remove public modifier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066708378 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java: ## @@ -243,6 +263,53 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return true; } +private synchronized CompletableFuture asyncMergeBucketSnapshot() { +List values = immutableBuckets.asMapOfRanges().values().stream().toList(); +long minNumberMessages = Long.MAX_VALUE; +int minIndex = -1; +for (int i = 0; i + 1 < values.size(); i++) { +ImmutableBucket bucketL = values.get(i); +ImmutableBucket bucketR = values.get(i + 1); +long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; +if (numberMessages < minNumberMessages) { +minNumberMessages = (int) numberMessages; +minIndex = i; +} +} +return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); +} + +private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, + ImmutableBucket bucketB) { +immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); +immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + +CompletableFuture snapshotCreateFutureA = + bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); +CompletableFuture snapshotCreateFutureB = + bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + +return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { + CompletableFuture> futureA = +bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = +bucketB.getRemainSnapshotSegment(); +return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) +.thenCompose(combinedDelayedIndexQueue -> { +CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); +CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + +return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { Review Comment: If the bucket index is lost, the tracker needs to rebuild the index, but if there are two intersecting buckets, it will cause data confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] coderzc commented on a diff in pull request #19138: [feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8
coderzc commented on code in PR #19138: URL: https://github.com/apache/pulsar/pull/19138#discussion_r1066704678 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; +import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; + +@NotThreadSafe +public class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue { Review Comment: I will remove `public` modifier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org