This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7a76f  Fixed refcounting when processing entries from sql (#2316)
6b7a76f is described below

commit 6b7a76fba4b1c08bf845409c6e4cabf48a454a5e
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Aug 7 09:12:47 2018 +0900

    Fixed refcounting when processing entries from sql (#2316)
---
 .../java/org/apache/pulsar/client/impl/MessageParser.java    |  5 -----
 .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java     | 12 +++++-------
 2 files changed, 5 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 8cf0328..b95b22d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -95,15 +95,10 @@ public class MessageParser {
             if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
                 final MessageImpl<?> message = new MessageImpl<>(msgId, 
msgMetadata, uncompressedPayload, null, null);
                 processor.process(msgId, message, uncompressedPayload);
-
-                uncompressedPayload.release();
-
             } else {
                 // handle batch message enqueuing; uncompressed payload has 
all messages in batch
                 receiveIndividualMessagesFromBatch(msgMetadata, 
uncompressedPayload, messageId, null, -1, processor);
-                uncompressedPayload.release();
             }
-
         } finally {
             if (uncompressedPayload != null) {
                 uncompressedPayload.release();
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 618b33b..3523228 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -52,7 +52,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
@@ -202,10 +201,9 @@ public class PulsarRecordCursor implements RecordCursor {
                 throw new RuntimeException(e);
             }
 
-            newEntries.forEach(new Consumer<Entry>() {
-                @Override
-                public void accept(Entry entry) {
-                    completedBytes += entry.getData().length;
+            newEntries.forEach(entry -> {
+                try {
+                    completedBytes += entry.getDataBuffer().readableBytes();
                     // filter entries that is not part of my split
                     if (((PositionImpl) 
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
                         try {
@@ -217,9 +215,9 @@ public class PulsarRecordCursor implements RecordCursor {
                             log.error(e, "Failed to parse message from pulsar 
topic %s", topicName.toString());
                             throw new RuntimeException(e);
                         }
-                    } else {
-                        entry.release();
                     }
+                } finally {
+                    entry.release();
                 }
             });
         }

Reply via email to