[GitHub] rdhabalia commented on a change in pull request #2591: Fix: Compaction with last deleted keys not completing compaction

2018-09-17 Thread GitBox
rdhabalia commented on a change in pull request #2591: Fix: Compaction with 
last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591#discussion_r218200167
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##
 @@ -217,10 +229,11 @@ private void phaseTwoLoop(RawReader reader, MessageId 
to, Map
 }
 } else {
 Pair keyAndSize = extractKeyAndSize(m);
+MessageId msg;
 if (keyAndSize == null) { // pass through messages 
without a key
 messageToAdd = Optional.of(m);
-} else if 
(latestForKey.get(keyAndSize.getLeft()).equals(id)
-   && keyAndSize.getRight() > 0) {
+} else if ((msg = 
latestForKey.get(keyAndSize.getLeft())) != null && msg.equals(id)
+&& keyAndSize.getRight() > 0) { // consider 
message only if present into latestForKey map
 
 Review comment:
   that's correct. it's redundant so, I think I will remove it as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rdhabalia commented on a change in pull request #2591: Fix: Compaction with last deleted keys not completing compaction

2018-09-17 Thread GitBox
rdhabalia commented on a change in pull request #2591: Fix: Compaction with 
last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591#discussion_r218200057
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##
 @@ -153,40 +163,42 @@ private void 
scheduleTimeout(CompletableFuture future) {
 });
 }
 
-private CompletableFuture phaseTwo(RawReader reader, MessageId from, 
MessageId to,
- Map 
latestForKey, BookKeeper bk) {
+private CompletableFuture phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
+Map latestForKey, BookKeeper bk) {
 Map metadata = ImmutableMap.of("compactedTopic", 
reader.getTopic().getBytes(UTF_8),
-   "compactedTo", 
to.toByteArray());
+"compactedTo", to.toByteArray());
 return createLedger(bk, metadata).thenCompose((ledger) -> {
-log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
- reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
-return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
-});
+log.info("Commencing phase two of compaction for {}, from {} to 
{}, compacting {} keys to ledger {}",
+reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
+return phaseTwoSeekThenLoop(reader, from, to, lastReadId, 
latestForKey, bk, ledger);
+});
 }
 
 private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,
- Map latestForKey,
- BookKeeper bk, 
LedgerHandle ledger) {
+MessageId lastReadId, Map latestForKey, 
BookKeeper bk, LedgerHandle ledger) {
 CompletableFuture promise = new CompletableFuture<>();
 
-reader.seekAsync(from).thenCompose((v) -> {
-Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-CompletableFuture loopPromise = new 
CompletableFuture();
-phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
-return loopPromise;
-}).thenCompose((v) -> closeLedger(ledger))
-.thenCompose((v) -> reader.acknowledgeCumulativeAsync(
- to, 
ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId(
-.whenComplete((res, exception) -> {
+boolean emptyCompactedLedger = to == null;
 
 Review comment:
   sure, will add it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services