This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7e035b0 Avoid boxing of checksum into a Long (#1467) 7e035b0 is described below commit 7e035b0bd5c1fcbe6cb583ae11e6674e0bddc4f0 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Mar 28 20:53:44 2018 -0700 Avoid boxing of checksum into a Long (#1467) --- .../org/apache/pulsar/broker/service/Consumer.java | 5 ++--- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../pulsar/broker/service/ReplicatorTest.java | 2 +- .../broker/service/persistent/ChecksumTest.java | 4 ++-- .../org/apache/pulsar/compaction/CompactorTest.java | 2 +- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- .../java/org/apache/pulsar/common/api/Commands.java | 21 ++++++++++++++------- .../pulsar/common/compression/CommandsTest.java | 2 +- 9 files changed, 24 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 4c5dbbc..12782c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.pulsar.common.api.Commands.readChecksum; import java.util.Collections; import java.util.Iterator; @@ -37,7 +36,7 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; + import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; @@ -225,7 +224,7 @@ public class Consumer { metadataAndPayload.retain(); // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { - readChecksum(metadataAndPayload); + Commands.skipChecksumIfPresent(metadataAndPayload); } if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 35bc315..2088035 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -176,7 +176,7 @@ public class Producer { int readerIndex = headersAndPayload.readerIndex(); try { - int checksum = readChecksum(headersAndPayload).intValue(); + int checksum = readChecksum(headersAndPayload); long computedChecksum = computeChecksum(headersAndPayload); if (checksum == computedChecksum) { return true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 08e4fb7..09c807f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -861,7 +861,7 @@ public class ReplicatorTest extends ReplicatorTestBase { ByteBuf b = msg.getHeadersAndPayload(); assertTrue(Commands.hasChecksum(b)); - int parsedChecksum = Commands.readChecksum(b).intValue(); + int parsedChecksum = Commands.readChecksum(b); int computedChecksum = Crc32cIntChecksum.computeChecksum(b); assertEquals(parsedChecksum, computedChecksum); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java index 3e6b3bd..94c3762 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java @@ -74,7 +74,7 @@ public class ChecksumTest extends BrokerTestBase { ByteBuf b = entries.get(0).getDataBuffer(); assertTrue(Commands.hasChecksum(b)); - int parsedChecksum = Commands.readChecksum(b).intValue(); + int parsedChecksum = Commands.readChecksum(b); int computedChecksum = Crc32cIntChecksum.computeChecksum(b); assertEquals(parsedChecksum, computedChecksum); @@ -95,7 +95,7 @@ public class ChecksumTest extends BrokerTestBase { ByteBuf b = msg.getHeadersAndPayload(); assertTrue(Commands.hasChecksum(b)); - int parsedChecksum = Commands.readChecksum(b).intValue(); + int parsedChecksum = Commands.readChecksum(b); int computedChecksum = Crc32cIntChecksum.computeChecksum(b); assertEquals(parsedChecksum, computedChecksum); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 1867ff4..59c46ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -207,7 +207,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest { public ByteBuf extractPayload(RawMessage m) throws Exception { ByteBuf payloadAndMetadata = m.getHeadersAndPayload(); - Commands.readChecksum(payloadAndMetadata); + Commands.skipChecksumIfPresent(payloadAndMetadata); int metadataSize = payloadAndMetadata.readInt(); // metadata size byte[] metadata = new byte[metadataSize]; payloadAndMetadata.readBytes(metadata); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c215431..bc62d3a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1026,7 +1026,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) { if (hasChecksum(headersAndPayload)) { - int checksum = readChecksum(headersAndPayload).intValue(); + int checksum = readChecksum(headersAndPayload); int computedChecksum = computeChecksum(headersAndPayload); if (checksum != computedChecksum) { log.error( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e1a505a..27757d1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -733,7 +733,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne headerFrame.skipBytes(cmdSize); // verify if checksum present if (hasChecksum(headerFrame)) { - int checksum = readChecksum(headerFrame).intValue(); + int checksum = readChecksum(headerFrame); // msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload int metadataChecksum = computeChecksum(headerFrame); long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index dcea2b2..f5470cc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -236,12 +236,19 @@ public class Commands { return buffer.getShort(buffer.readerIndex()) == magicCrc32c; } - public static Long readChecksum(ByteBuf buffer) { - if(hasChecksum(buffer)) { - buffer.skipBytes(2); //skip magic bytes - return buffer.readUnsignedInt(); - } else{ - return null; + /** + * Read the checksum and advance the reader index in the buffer. + * + * Note: This method assume the checksum presence was already verified before. + */ + public static int readChecksum(ByteBuf buffer) { + buffer.skipBytes(2); //skip magic bytes + return buffer.readInt(); + } + + public static void skipChecksumIfPresent(ByteBuf buffer) { + if (hasChecksum(buffer)) { + readChecksum(buffer); } } @@ -249,7 +256,7 @@ public class Commands { try { // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse // metadata - readChecksum(buffer); + skipChecksumIfPresent(buffer); int metadataSize = (int) buffer.readUnsignedInt(); int writerIndex = buffer.writerIndex(); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index 1827416..186e7b2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -58,7 +58,7 @@ public class CommandsTest { /*** 1. verify checksum and metadataParsing ***/ boolean hasChecksum = Commands.hasChecksum(receivedBuf); - int checksum = Commands.readChecksum(receivedBuf).intValue(); + int checksum = Commands.readChecksum(receivedBuf); // verify checksum is present -- To stop receiving notification emails like this one, please contact mme...@apache.org.