sijie closed pull request #2591: Fix: Compaction with last deleted keys not 
completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710249..425e04921b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
     @Override
     protected CompletableFuture<Long> doCompaction(RawReader reader, 
BookKeeper bk) {
         return phaseOne(reader).thenCompose(
-                (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
r.latestForKey, bk));
     }
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
                     } else {
                         log.info("Commencing phase one of compaction for {}, 
reading to {}",
                                  reader.getTopic(), lastMessageId);
-                        phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
+                        phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastMessageId, latestForKey,
+                                loopPromise);
                     }
                 });
         return loopPromise;
@@ -98,6 +99,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
 
     private void phaseOneLoop(RawReader reader,
                               Optional<MessageId> firstMessageId,
+                              Optional<MessageId> toMessageId,
                               MessageId lastMessageId,
                               Map<String,MessageId> latestForKey,
                               CompletableFuture<PhaseOneResult> loopPromise) {
@@ -114,6 +116,7 @@ private void phaseOneLoop(RawReader reader,
                             return;
                         }
                         MessageId id = m.getMessageId();
+                        boolean deletedMessage = false;
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
                                 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ private void phaseOneLoop(RawReader reader,
                         } else {
                             Pair<String,Integer> keyAndSize = 
extractKeyAndSize(m);
                             if (keyAndSize != null) {
-                                latestForKey.put(keyAndSize.getLeft(), id);
+                                if(keyAndSize.getRight() > 0) {
+                                    latestForKey.put(keyAndSize.getLeft(), 
id);    
+                                } else {
+                                    deletedMessage = true;
+                                    latestForKey.remove(keyAndSize.getLeft());
+                                }
                             }
                         }
 
+                        MessageId first = firstMessageId.orElse(deletedMessage 
? null : id);
+                        MessageId to = deletedMessage ? 
toMessageId.orElse(null) : id;
                         if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new 
PhaseOneResult(firstMessageId.orElse(id),
-                                                                    id, 
latestForKey));
+                            loopPromise.complete(new PhaseOneResult(first, to, 
lastMessageId, latestForKey));
                         } else {
                             phaseOneLoop(reader,
-                                         
Optional.of(firstMessageId.orElse(id)),
+                                         Optional.ofNullable(first),
+                                         Optional.ofNullable(to),
                                          lastMessageId,
                                          latestForKey, loopPromise);
                         }
@@ -153,40 +163,38 @@ private void 
scheduleTimeout(CompletableFuture<RawMessage> future) {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
-                                             Map<String,MessageId> 
latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", 
reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", 
to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to 
{}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, 
latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,
-                                                         Map<String, 
MessageId> latestForKey,
-                                                         BookKeeper bk, 
LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, 
BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
         reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new 
CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, 
ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+            Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+            CompletableFuture<Void> loopPromise = new 
CompletableFuture<Void>();
+            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
+            return loopPromise;
+        }).thenCompose((v) -> closeLedger(ledger))
+                .thenCompose((v) -> 
reader.acknowledgeCumulativeAsync(lastReadId,
+                        ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, 
ledger.getId())))
+                .whenComplete((res, exception) -> {
                     if (exception != null) {
-                        deleteLedger(bk, ledger)
-                            .whenComplete((res2, exception2) -> {
-                                    if (exception2 != null) {
-                                        log.warn("Cleanup of ledger {} for 
failed", ledger, exception2);
-                                    }
-                                    // complete with original exception
-                                    promise.completeExceptionally(exception);
-                                });
+                        deleteLedger(bk, ledger).whenComplete((res2, 
exception2) -> {
+                            if (exception2 != null) {
+                                log.warn("Cleanup of ledger {} for failed", 
ledger, exception2);
+                            }
+                            // complete with original exception
+                            promise.completeExceptionally(exception);
+                        });
                     } else {
                         promise.complete(ledger.getId());
                     }
@@ -217,13 +225,23 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map<String, MessageId>
                         }
                     } else {
                         Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                        MessageId msg;
                         if (keyAndSize == null) { // pass through messages 
without a key
                             messageToAdd = Optional.of(m);
-                        } else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
-                                   && keyAndSize.getRight() > 0) {
+                        } else if ((msg = 
latestForKey.get(keyAndSize.getLeft())) != null 
+                                && msg.equals(id)) { // consider message only 
if present into latestForKey map
+                            if (keyAndSize.getRight() <= 0) {
+                                promise.completeExceptionally(new 
IllegalArgumentException(
+                                        "Compaction phase found empty record 
from sorted key-map"));
+                            }
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
+                            // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so, not
+                            // present under latestForKey. Complete the 
compaction.
+                            if (to.equals(id)) {
+                                promise.complete(null);
+                            }
                         }
                     }
 
@@ -330,12 +348,14 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map<String, MessageId>
 
     private static class PhaseOneResult {
         final MessageId from;
-        final MessageId to;
+        final MessageId to; // last undeleted messageId
+        final MessageId lastReadId; // last read messageId
         final Map<String,MessageId> latestForKey;
 
-        PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> 
latestForKey) {
+        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, 
Map<String,MessageId> latestForKey) {
             this.from = from;
             this.to = to;
+            this.lastReadId = lastReadId;
             this.latestForKey = latestForKey;
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 95a10fab21..32c93b4626 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -60,6 +60,9 @@
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -1212,4 +1215,57 @@ public void testEmptyPayloadDeletesWhenEncrypted() 
throws Exception {
             Assert.assertEquals(new String(message5.getData()), 
"my-message-4");
         }
     }
+    
+    @Test(timeOut = 20000)
+    public void testCompactionWithLastDeletedKey() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("3").value("3".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        Set<String> expected = Sets.newHashSet("3");
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertTrue(expected.remove(m.getKey()));
+        }
+    }
+    
+    @Test(timeOut = 20000)
+    public void testEmptyCompactionLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to