rdhabalia commented on a change in pull request #2591: Fix: Compaction with
last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591#discussion_r218200057
##
File path:
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
##
@@ -153,40 +163,42 @@ private void
scheduleTimeout(CompletableFuture future) {
});
}
-private CompletableFuture phaseTwo(RawReader reader, MessageId from,
MessageId to,
- Map
latestForKey, BookKeeper bk) {
+private CompletableFuture phaseTwo(RawReader reader, MessageId from,
MessageId to, MessageId lastReadId,
+Map latestForKey, BookKeeper bk) {
Map metadata = ImmutableMap.of("compactedTopic",
reader.getTopic().getBytes(UTF_8),
- "compactedTo",
to.toByteArray());
+"compactedTo", to.toByteArray());
return createLedger(bk, metadata).thenCompose((ledger) -> {
-log.info("Commencing phase two of compaction for {}, from {}
to {}, compacting {} keys to ledger {}",
- reader.getTopic(), from, to, latestForKey.size(),
ledger.getId());
-return phaseTwoSeekThenLoop(reader, from, to, latestForKey,
bk, ledger);
-});
+log.info("Commencing phase two of compaction for {}, from {} to
{}, compacting {} keys to ledger {}",
+reader.getTopic(), from, to, latestForKey.size(),
ledger.getId());
+return phaseTwoSeekThenLoop(reader, from, to, lastReadId,
latestForKey, bk, ledger);
+});
}
private CompletableFuture phaseTwoSeekThenLoop(RawReader reader,
MessageId from, MessageId to,
- Map latestForKey,
- BookKeeper bk,
LedgerHandle ledger) {
+MessageId lastReadId, Map latestForKey,
BookKeeper bk, LedgerHandle ledger) {
CompletableFuture promise = new CompletableFuture<>();
-reader.seekAsync(from).thenCompose((v) -> {
-Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-CompletableFuture loopPromise = new
CompletableFuture();
-phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise);
-return loopPromise;
-}).thenCompose((v) -> closeLedger(ledger))
-.thenCompose((v) -> reader.acknowledgeCumulativeAsync(
- to,
ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId(
-.whenComplete((res, exception) -> {
+boolean emptyCompactedLedger = to == null;
Review comment:
sure, will add it.
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services