This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6b7a76f Fixed refcounting when processing entries from sql (#2316) 6b7a76f is described below commit 6b7a76fba4b1c08bf845409c6e4cabf48a454a5e Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Aug 7 09:12:47 2018 +0900 Fixed refcounting when processing entries from sql (#2316) --- .../java/org/apache/pulsar/client/impl/MessageParser.java | 5 ----- .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 12 +++++------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java index 8cf0328..b95b22d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java @@ -95,15 +95,10 @@ public class MessageParser { if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) { final MessageImpl<?> message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload, null, null); processor.process(msgId, message, uncompressedPayload); - - uncompressedPayload.release(); - } else { // handle batch message enqueuing; uncompressed payload has all messages in batch receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, null, -1, processor); - uncompressedPayload.release(); } - } finally { if (uncompressedPayload != null) { uncompressedPayload.release(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index 618b33b..3523228 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -52,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.type.BigintType.BIGINT; @@ -202,10 +201,9 @@ public class PulsarRecordCursor implements RecordCursor { throw new RuntimeException(e); } - newEntries.forEach(new Consumer<Entry>() { - @Override - public void accept(Entry entry) { - completedBytes += entry.getData().length; + newEntries.forEach(entry -> { + try { + completedBytes += entry.getDataBuffer().readableBytes(); // filter entries that is not part of my split if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) { try { @@ -217,9 +215,9 @@ public class PulsarRecordCursor implements RecordCursor { log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString()); throw new RuntimeException(e); } - } else { - entry.release(); } + } finally { + entry.release(); } }); }