This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d50617  Fix: Compaction with last deleted keys not completing 
compaction (#2591)
8d50617 is described below

commit 8d5061779e7a869e6e5981d2010e671ed0afa322
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Sep 18 03:09:50 2018 -0700

    Fix: Compaction with last deleted keys not completing compaction (#2591)
    
    ### Motivation
    
    Right now, topic-compaction ignores the message-id with empty payload but 
if the last message in the ledger has empty payload then compactor doesn't 
complete the compaction because compactor ignores last message and doesn't 
complete the result-future so, caller never sees complete result.
    
    ### Modifications
    
    - Compactor calculates` from` and `to` position for compacted ledger 
according to last non-deleted active key.
    - Compactor handles tail deleted keys from the ledger and completes 
compaction process gracefully.
    
    ### Result
    
    compactor can successfully compact ledger whose last message is also 
deleted.
---
 .../pulsar/compaction/TwoPhaseCompactor.java       | 90 +++++++++++++---------
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++++
 2 files changed, 111 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index cc3f710..425e049 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -76,7 +76,7 @@ public class TwoPhaseCompactor extends Compactor {
     @Override
     protected CompletableFuture<Long> doCompaction(RawReader reader, 
BookKeeper bk) {
         return phaseOne(reader).thenCompose(
-                (r) -> phaseTwo(reader, r.from, r.to, r.latestForKey, bk));
+                (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, 
r.latestForKey, bk));
     }
 
     private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
@@ -90,7 +90,8 @@ public class TwoPhaseCompactor extends Compactor {
                     } else {
                         log.info("Commencing phase one of compaction for {}, 
reading to {}",
                                  reader.getTopic(), lastMessageId);
-                        phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
+                        phaseOneLoop(reader, Optional.empty(), 
Optional.empty(), lastMessageId, latestForKey,
+                                loopPromise);
                     }
                 });
         return loopPromise;
@@ -98,6 +99,7 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void phaseOneLoop(RawReader reader,
                               Optional<MessageId> firstMessageId,
+                              Optional<MessageId> toMessageId,
                               MessageId lastMessageId,
                               Map<String,MessageId> latestForKey,
                               CompletableFuture<PhaseOneResult> loopPromise) {
@@ -114,6 +116,7 @@ public class TwoPhaseCompactor extends Compactor {
                             return;
                         }
                         MessageId id = m.getMessageId();
+                        boolean deletedMessage = false;
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
                                 RawBatchConverter.extractIdsAndKeys(m)
@@ -125,16 +128,23 @@ public class TwoPhaseCompactor extends Compactor {
                         } else {
                             Pair<String,Integer> keyAndSize = 
extractKeyAndSize(m);
                             if (keyAndSize != null) {
-                                latestForKey.put(keyAndSize.getLeft(), id);
+                                if(keyAndSize.getRight() > 0) {
+                                    latestForKey.put(keyAndSize.getLeft(), 
id);    
+                                } else {
+                                    deletedMessage = true;
+                                    latestForKey.remove(keyAndSize.getLeft());
+                                }
                             }
                         }
 
+                        MessageId first = firstMessageId.orElse(deletedMessage 
? null : id);
+                        MessageId to = deletedMessage ? 
toMessageId.orElse(null) : id;
                         if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new 
PhaseOneResult(firstMessageId.orElse(id),
-                                                                    id, 
latestForKey));
+                            loopPromise.complete(new PhaseOneResult(first, to, 
lastMessageId, latestForKey));
                         } else {
                             phaseOneLoop(reader,
-                                         
Optional.of(firstMessageId.orElse(id)),
+                                         Optional.ofNullable(first),
+                                         Optional.ofNullable(to),
                                          lastMessageId,
                                          latestForKey, loopPromise);
                         }
@@ -153,40 +163,38 @@ public class TwoPhaseCompactor extends Compactor {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
-                                             Map<String,MessageId> 
latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", 
reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", 
to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to 
{}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, 
latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,
-                                                         Map<String, 
MessageId> latestForKey,
-                                                         BookKeeper bk, 
LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, 
BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
         reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new 
CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, 
ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+            Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+            CompletableFuture<Void> loopPromise = new 
CompletableFuture<Void>();
+            phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
+            return loopPromise;
+        }).thenCompose((v) -> closeLedger(ledger))
+                .thenCompose((v) -> 
reader.acknowledgeCumulativeAsync(lastReadId,
+                        ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, 
ledger.getId())))
+                .whenComplete((res, exception) -> {
                     if (exception != null) {
-                        deleteLedger(bk, ledger)
-                            .whenComplete((res2, exception2) -> {
-                                    if (exception2 != null) {
-                                        log.warn("Cleanup of ledger {} for 
failed", ledger, exception2);
-                                    }
-                                    // complete with original exception
-                                    promise.completeExceptionally(exception);
-                                });
+                        deleteLedger(bk, ledger).whenComplete((res2, 
exception2) -> {
+                            if (exception2 != null) {
+                                log.warn("Cleanup of ledger {} for failed", 
ledger, exception2);
+                            }
+                            // complete with original exception
+                            promise.completeExceptionally(exception);
+                        });
                     } else {
                         promise.complete(ledger.getId());
                     }
@@ -217,13 +225,23 @@ public class TwoPhaseCompactor extends Compactor {
                         }
                     } else {
                         Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                        MessageId msg;
                         if (keyAndSize == null) { // pass through messages 
without a key
                             messageToAdd = Optional.of(m);
-                        } else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
-                                   && keyAndSize.getRight() > 0) {
+                        } else if ((msg = 
latestForKey.get(keyAndSize.getLeft())) != null 
+                                && msg.equals(id)) { // consider message only 
if present into latestForKey map
+                            if (keyAndSize.getRight() <= 0) {
+                                promise.completeExceptionally(new 
IllegalArgumentException(
+                                        "Compaction phase found empty record 
from sorted key-map"));
+                            }
                             messageToAdd = Optional.of(m);
                         } else {
                             m.close();
+                            // Reached to last-id and phase-one found it 
deleted-message while iterating on ledger so, not
+                            // present under latestForKey. Complete the 
compaction.
+                            if (to.equals(id)) {
+                                promise.complete(null);
+                            }
                         }
                     }
 
@@ -330,12 +348,14 @@ public class TwoPhaseCompactor extends Compactor {
 
     private static class PhaseOneResult {
         final MessageId from;
-        final MessageId to;
+        final MessageId to; // last undeleted messageId
+        final MessageId lastReadId; // last read messageId
         final Map<String,MessageId> latestForKey;
 
-        PhaseOneResult(MessageId from, MessageId to, Map<String,MessageId> 
latestForKey) {
+        PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, 
Map<String,MessageId> latestForKey) {
             this.from = from;
             this.to = to;
+            this.lastReadId = lastReadId;
             this.latestForKey = latestForKey;
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 95a10fa..32c93b4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -60,6 +60,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -1212,4 +1215,57 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(new String(message5.getData()), 
"my-message-4");
         }
     }
+    
+    @Test(timeOut = 20000)
+    public void testCompactionWithLastDeletedKey() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("3").value("3".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        Set<String> expected = Sets.newHashSet("3");
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertTrue(expected.remove(m.getKey()));
+        }
+    }
+    
+    @Test(timeOut = 20000)
+    public void testEmptyCompactionLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        producer.newMessage().key("1").value("1".getBytes()).send();
+        producer.newMessage().key("2").value("2".getBytes()).send();
+        producer.newMessage().key("1").value("".getBytes()).send();
+        producer.newMessage().key("2").value("".getBytes()).send();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
 }

Reply via email to