This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9c6e124 Log phases in two phase compaction (#1335) 9c6e124 is described below commit 9c6e124e2077c6a59d81b3b4de7866b9ac79a998 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed Mar 7 01:43:03 2018 +0100 Log phases in two phase compaction (#1335) Log when each phase in two phase compaction starts so that if it stalls we have something to debug by. --- .../src/main/java/org/apache/pulsar/client/api/RawReader.java | 7 +++++++ .../java/org/apache/pulsar/client/impl/RawReaderImpl.java | 11 +++++++++++ .../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java | 9 +++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 7f7cd40..f9d297f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -38,6 +38,13 @@ public interface RawReader { } /** + * Get the topic for the reader + * + * @return topic for the reader + */ + String getTopic(); + + /** * Seek to a location in the topic. After the seek, the first message read will be the one with * with the specified message ID. * @param messageId the message ID to seek to diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 75cbbbc..61a1fc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -60,6 +60,12 @@ public class RawReaderImpl implements RawReader { } @Override + public String getTopic() { + return consumerConfiguration.getTopicNames().stream() + .findFirst().orElse(null); + } + + @Override public CompletableFuture<Void> seekAsync(MessageId messageId) { return consumer.seekAsync(messageId); } @@ -84,6 +90,11 @@ public class RawReaderImpl implements RawReader { return consumer.getLastMessageIdAsync(); } + @Override + public String toString() { + return "RawReader(topic=" + getTopic() + ")"; + } + static class RawConsumerImpl extends ConsumerImpl<byte[]> { final BlockingQueue<RawMessageAndCnx> incomingRawMessages; final Queue<CompletableFuture<RawMessage>> pendingRawReceives; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 7ae788c..3e2d259 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -83,6 +83,7 @@ public class TwoPhaseCompactor extends Compactor { if (exception != null) { loopPromise.completeExceptionally(exception); } else { + log.info("Commencing phase one of compaction for {}, reading to {}", reader, lastMessageId); phaseOneLoop(reader, Optional.empty(), lastMessageId, latestForKey, loopPromise); } }); @@ -136,8 +137,12 @@ public class TwoPhaseCompactor extends Compactor { private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, Map<String,MessageId> latestForKey, BookKeeper bk) { - return createLedger(bk).thenCompose( - (ledger) -> phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger)); + + return createLedger(bk).thenCompose((ledger) -> { + log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", + reader, from, to, latestForKey.size(), ledger.getId()); + return phaseTwoSeekThenLoop(reader, from, to, latestForKey, bk, ledger); + }); } private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, -- To stop receiving notification emails like this one, please contact mme...@apache.org.