This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d50617 Fix: Compaction with last deleted keys not completing compaction (#2591) 8d50617 is described below commit 8d5061779e7a869e6e5981d2010e671ed0afa322 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Sep 18 03:09:50 2018 -0700 Fix: Compaction with last deleted keys not completing compaction (#2591) ### Motivation Right now, topic-compaction ignores the message-id with empty payload but if the last message in the ledger has empty payload then compactor doesn't complete the compaction because compactor ignores last message and doesn't complete the result-future so, caller never sees complete result. ### Modifications - Compactor calculates` from` and `to` position for compacted ledger according to last non-deleted active key. - Compactor handles tail deleted keys from the ledger and completes compaction process gracefully. ### Result compactor can successfully compact ledger whose last message is also deleted. --- .../pulsar/compaction/TwoPhaseCompactor.java | 90 +++++++++++++--------- .../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++++ 2 files changed, 111 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index cc3f710..425e049 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -76,7 +76,7 @@ public class TwoPhaseCompactor extends Compactor { @Override protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) { return phaseOne(reader).thenCompose( - (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk)); + (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); } private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) { @@ -90,7 +90,8 @@ public class TwoPhaseCompactor extends Compactor { } else { log.info("Commencing phase one of compaction for {}, reading to {}", reader.getTopic(), lastMessageId); - phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise); + phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastMessageId, latestForKey, + loopPromise); } }); return loopPromise; @@ -98,6 +99,7 @@ public class TwoPhaseCompactor extends Compactor { private void phaseOneLoop(RawReader reader, Optional<MessageId> firstMessageId, + Optional<MessageId> toMessageId, MessageId lastMessageId, Map<String,MessageId> latestForKey, CompletableFuture<PhaseOneResult> loopPromise) { @@ -114,6 +116,7 @@ public class TwoPhaseCompactor extends Compactor { return; } MessageId id = m.getMessageId(); + boolean deletedMessage = false; if (RawBatchConverter.isReadableBatch(m)) { try { RawBatchConverter.extractIdsAndKeys(m) @@ -125,16 +128,23 @@ public class TwoPhaseCompactor extends Compactor { } else { Pair<String,Integer> keyAndSize = extractKeyAndSize(m); if (keyAndSize != null) { - latestForKey.put(keyAndSize.getLeft(), id); + if(keyAndSize.getRight() > 0) { + latestForKey.put(keyAndSize.getLeft(), id); + } else { + deletedMessage = true; + latestForKey.remove(keyAndSize.getLeft()); + } } } + MessageId first = firstMessageId.orElse(deletedMessage ? null : id); + MessageId to = deletedMessage ? toMessageId.orElse(null) : id; if (id.compareTo(lastMessageId) == 0) { - loopPromise.complete(new PhaseOneResult(firstMessageId.orElse(id), - id, latestForKey)); + loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey)); } else { phaseOneLoop(reader, - Optional.of(firstMessageId.orElse(id)), + Optional.ofNullable(first), + Optional.ofNullable(to), lastMessageId, latestForKey, loopPromise); } @@ -153,40 +163,38 @@ public class TwoPhaseCompactor extends Compactor { }); } - private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, - Map<String,MessageId> latestForKey, BookKeeper bk) { + private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, + Map<String, MessageId> latestForKey, BookKeeper bk) { Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", reader.getTopic().getBytes(UTF_8), - "compactedTo", to.toByteArray()); + "compactedTo", to.toByteArray()); return createLedger(bk, metadata).thenCompose((ledger) -> { - log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", - reader.getTopic(), from, to, latestForKey.size(), ledger.getId()); - return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger); - }); + log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", + reader.getTopic(), from, to, latestForKey.size(), ledger.getId()); + return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger); + }); } private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, - Map<String, MessageId> latestForKey, - BookKeeper bk, LedgerHandle ledger) { + MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) { CompletableFuture<Long> promise = new CompletableFuture<>(); reader.seekAsync(from).thenCompose((v) -> { - Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); - CompletableFuture<Void> loopPromise = new CompletableFuture<Void>(); - phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise); - return loopPromise; - }).thenCompose((v) -> closeLedger(ledger)) - .thenCompose((v) -> reader.acknowledgeCumulativeAsync( - to, ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId()))) - .whenComplete((res, exception) -> { + Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); + CompletableFuture<Void> loopPromise = new CompletableFuture<Void>(); + phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise); + return loopPromise; + }).thenCompose((v) -> closeLedger(ledger)) + .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId, + ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId()))) + .whenComplete((res, exception) -> { if (exception != null) { - deleteLedger(bk, ledger) - .whenComplete((res2, exception2) -> { - if (exception2 != null) { - log.warn("Cleanup of ledger {} for failed", ledger, exception2); - } - // complete with original exception - promise.completeExceptionally(exception); - }); + deleteLedger(bk, ledger).whenComplete((res2, exception2) -> { + if (exception2 != null) { + log.warn("Cleanup of ledger {} for failed", ledger, exception2); + } + // complete with original exception + promise.completeExceptionally(exception); + }); } else { promise.complete(ledger.getId()); } @@ -217,13 +225,23 @@ public class TwoPhaseCompactor extends Compactor { } } else { Pair<String,Integer> keyAndSize = extractKeyAndSize(m); + MessageId msg; if (keyAndSize == null) { // pass through messages without a key messageToAdd = Optional.of(m); - } else if (latestForKey.get(keyAndSize.getLeft()).equals(id) - && keyAndSize.getRight() > 0) { + } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null + && msg.equals(id)) { // consider message only if present into latestForKey map + if (keyAndSize.getRight() <= 0) { + promise.completeExceptionally(new IllegalArgumentException( + "Compaction phase found empty record from sorted key-map")); + } messageToAdd = Optional.of(m); } else { m.close(); + // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not + // present under latestForKey. Complete the compaction. + if (to.equals(id)) { + promise.complete(null); + } } } @@ -330,12 +348,14 @@ public class TwoPhaseCompactor extends Compactor { private static class PhaseOneResult { final MessageId from; - final MessageId to; + final MessageId to; // last undeleted messageId + final MessageId lastReadId; // last read messageId final Map<String,MessageId> latestForKey; - PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> latestForKey) { + PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map<String,MessageId> latestForKey) { this.from = from; this.to = to; + this.lastReadId = lastReadId; this.latestForKey = latestForKey; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 95a10fa..32c93b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -60,6 +60,9 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; + +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -1212,4 +1215,57 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(new String(message5.getData()), "my-message-4"); } } + + @Test(timeOut = 20000) + public void testCompactionWithLastDeletedKey() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value("1".getBytes()).send(); + producer.newMessage().key("2").value("2".getBytes()).send(); + producer.newMessage().key("3").value("3".getBytes()).send(); + producer.newMessage().key("1").value("".getBytes()).send(); + producer.newMessage().key("2").value("".getBytes()).send(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + Set<String> expected = Sets.newHashSet("3"); + // consumer with readCompacted enabled only get compacted entries + try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); + assertTrue(expected.remove(m.getKey())); + } + } + + @Test(timeOut = 20000) + public void testEmptyCompactionLedger() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value("1".getBytes()).send(); + producer.newMessage().key("2").value("2".getBytes()).send(); + producer.newMessage().key("1").value("".getBytes()).send(); + producer.newMessage().key("2").value("".getBytes()).send(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS); + assertNull(m); + } + } + }