This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c6e124  Log phases in two phase compaction (#1335)
9c6e124 is described below

commit 9c6e124e2077c6a59d81b3b4de7866b9ac79a998
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 7 01:43:03 2018 +0100

    Log phases in two phase compaction (#1335)
    
    Log when each phase in two phase compaction starts so that if it
    stalls we have something to debug by.
---
 .../src/main/java/org/apache/pulsar/client/api/RawReader.java |  7 +++++++
 .../java/org/apache/pulsar/client/impl/RawReaderImpl.java     | 11 +++++++++++
 .../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java  |  9 +++++++--
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 7f7cd40..f9d297f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -38,6 +38,13 @@ public interface RawReader {
     }
 
     /**
+     * Get the topic for the reader
+     *
+     * @return topic for the reader
+     */
+    String getTopic();
+
+    /**
      * Seek to a location in the topic. After the seek, the first message read 
will be the one with
      * with the specified message ID.
      * @param messageId the message ID to seek to
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 75cbbbc..61a1fc2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -60,6 +60,12 @@ public class RawReaderImpl implements RawReader {
     }
 
     @Override
+    public String getTopic() {
+        return consumerConfiguration.getTopicNames().stream()
+            .findFirst().orElse(null);
+    }
+
+    @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         return consumer.seekAsync(messageId);
     }
@@ -84,6 +90,11 @@ public class RawReaderImpl implements RawReader {
         return consumer.getLastMessageIdAsync();
     }
 
+    @Override
+    public String toString() {
+        return "RawReader(topic=" + getTopic() + ")";
+    }
+
     static class RawConsumerImpl extends ConsumerImpl<byte[]> {
         final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 7ae788c..3e2d259 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -83,6 +83,7 @@ public class TwoPhaseCompactor extends Compactor {
                     if (exception != null) {
                         loopPromise.completeExceptionally(exception);
                     } else {
+                        log.info("Commencing phase one of compaction for {}, 
reading to {}", reader, lastMessageId);
                         phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
                     }
                 });
@@ -136,8 +137,12 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
                                              Map<String,MessageId> 
latestForKey, BookKeeper bk) {
-        return createLedger(bk).thenCompose(
-                (ledger) -> phaseTwoSeekThenLoop(reader, from, to, 
latestForKey, bk, ledger));
+
+        return createLedger(bk).thenCompose((ledger) -> {
+                log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
+                         reader, from, to, latestForKey.size(), 
ledger.getId());
+                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
+            });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to