This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 552249d Ensure the checksum is not stripped after validation in the broker (#1195) 552249d is described below commit 552249d9b79411b0972967156b3c99a8bbb858c3 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Feb 8 14:04:37 2018 -0800 Ensure the checksum is not stripped after validation in the broker (#1195) * Ensure the checksum is not stripped after validation in the broker * Fixed issue in C++ consumer to adjust size after reading the checksum * Fixed formatting * Added missing client protocol version check --- .../org/apache/pulsar/broker/service/Consumer.java | 2 +- .../org/apache/pulsar/broker/service/Producer.java | 4 +- .../pulsar/broker/service/ReplicatorTest.java | 31 ++++++ .../broker/service/persistent/ChecksumTest.java | 105 +++++++++++++++++++++ pulsar-client-cpp/lib/ClientConnection.cc | 14 ++- pulsar-client-cpp/lib/ClientConnection.h | 3 +- .../apache/pulsar/common/api/proto/PulsarApi.java | 3 + pulsar-common/src/main/proto/PulsarApi.proto | 1 + 8 files changed, 155 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index cf796d3..77c36e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -202,7 +202,7 @@ public class Consumer { // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release metadataAndPayload.retain(); // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification - if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) { + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { readChecksum(metadataAndPayload); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 72b12d6..e964598 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -168,11 +168,11 @@ public class Producer { } private boolean verifyChecksum(ByteBuf headersAndPayload) { - if (hasChecksum(headersAndPayload)) { - int checksum = readChecksum(headersAndPayload).intValue(); int readerIndex = headersAndPayload.readerIndex(); + try { + int checksum = readChecksum(headersAndPayload).intValue(); long computedChecksum = computeChecksum(headersAndPayload); if (checksum == computedChecksum) { return true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index f1394d1..cbef1d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -50,13 +50,17 @@ import org.apache.pulsar.broker.namespace.OwnedBundle; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.checksum.utils.Crc32cChecksum; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -73,6 +77,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.collections.Lists; +import io.netty.buffer.ByteBuf; + /** * Starts 2 brokers that are in 2 different clusters */ @@ -822,7 +828,32 @@ public class ReplicatorTest extends ReplicatorTestBase { producer1.close(); consumer1.close(); consumer2.close(); + } + + @Test(timeOut = 30000) + public void verifyChecksumAfterReplication() throws Exception { + final String topicName = "persistent://pulsar/global/ns/checksumAfterReplication"; + + PulsarClient c1 = PulsarClient.create(url1.toString()); + Producer p1 = c1.createProducer(topicName); + + PulsarClient c2 = PulsarClient.create(url2.toString()); + RawReader reader2 = RawReader.create(c2, topicName, "sub").get(); + + p1.send("Hello".getBytes()); + + RawMessage msg = reader2.readNextAsync().get(); + + ByteBuf b = msg.getHeadersAndPayload(); + + assertTrue(Commands.hasChecksum(b)); + int parsedChecksum = Commands.readChecksum(b).intValue(); + int computedChecksum = Crc32cChecksum.computeChecksum(b); + + assertEquals(parsedChecksum, computedChecksum); + p1.close(); + reader2.closeAsync().get(); } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java new file mode 100644 index 0000000..0d23ab2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.checksum.utils.Crc32cChecksum; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.common.api.Commands; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import io.netty.buffer.ByteBuf; + +/** + */ +@Test +public class ChecksumTest extends BrokerTestBase { + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void verifyChecksumStoredInManagedLedger() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/topic0"; + + Producer producer = pulsarClient.createProducer(topicName); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + + ManagedLedger ledger = topic.getManagedLedger(); + ManagedCursor cursor = ledger.openCursor("test"); + + producer.send("Hello".getBytes()); + + List<Entry> entries = cursor.readEntriesOrWait(1); + assertEquals(entries.size(), 1); + + ByteBuf b = entries.get(0).getDataBuffer(); + + assertTrue(Commands.hasChecksum(b)); + int parsedChecksum = Commands.readChecksum(b).intValue(); + int computedChecksum = Crc32cChecksum.computeChecksum(b); + assertEquals(parsedChecksum, computedChecksum); + + entries.get(0).release(); + producer.close(); + } + + @Test + public void verifyChecksumSentToConsumer() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/topic-1"; + + Producer producer = pulsarClient.createProducer(topicName); + RawReader reader = RawReader.create(pulsarClient, topicName, "sub").get(); + + producer.send("Hello".getBytes()); + + RawMessage msg = reader.readNextAsync().get(); + + ByteBuf b = msg.getHeadersAndPayload(); + assertTrue(Commands.hasChecksum(b)); + int parsedChecksum = Commands.readChecksum(b).intValue(); + int computedChecksum = Crc32cChecksum.computeChecksum(b); + assertEquals(parsedChecksum, computedChecksum); + + producer.close(); + reader.closeAsync().get(); + } + +} diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 08d5572..fc773fe 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -471,7 +471,8 @@ void ClientConnection::processIncomingBuffer() { MessageMetadata msgMetadata; // read checksum - bool isChecksumValid = verifyChecksum(incomingBuffer_, incomingCmd_); + uint32_t remainingBytes = frameSize - (cmdSize + 4); + bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd_); uint32_t metadataSize = incomingBuffer_.readUnsignedInt(); if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) { @@ -485,8 +486,9 @@ void ClientConnection::processIncomingBuffer() { } incomingBuffer_.consume(metadataSize); + remainingBytes -= (4 + metadataSize); - uint32_t payloadSize = frameSize - (cmdSize + 4) - (metadataSize + 4); + uint32_t payloadSize = remainingBytes; SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize); incomingBuffer_.consume(payloadSize); handleIncomingMessage(incomingCmd_.message(), isChecksumValid, msgMetadata, payload); @@ -518,13 +520,17 @@ void ClientConnection::processIncomingBuffer() { readNextCommand(); } -bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, proto::BaseCommand& incomingCmd_) { +bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes, + proto::BaseCommand& incomingCmd_) { int readerIndex = incomingBuffer_.readerIndex(); bool isChecksumValid = true; + if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) { uint32_t storedChecksum = incomingBuffer_.readUnsignedInt(); + remainingBytes -= (2 + 4) /* subtract size of checksum itself */; + // compute metadata-payload checksum - int metadataPayloadSize = incomingBuffer_.readableBytes(); + int metadataPayloadSize = remainingBytes; uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize); // verify checksum isChecksumValid = (storedChecksum == computedChecksum); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 6b0d884..47f4994 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -166,7 +166,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> void handleRead(const boost::system::error_code& err, size_t bytesTransferred, uint32_t minReadSize); void processIncomingBuffer(); - bool verifyChecksum(SharedBuffer& incomingBuffer_, proto::BaseCommand& incomingCmd_); + bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes, + proto::BaseCommand& incomingCmd_); void handleIncomingCommand(); void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 8b1ee0e..2c13125 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -198,6 +198,7 @@ public final class PulsarApi { v8(8, 8), v9(9, 9), v10(10, 10), + v11(11, 11), ; public static final int v0_VALUE = 0; @@ -211,6 +212,7 @@ public final class PulsarApi { public static final int v8_VALUE = 8; public static final int v9_VALUE = 9; public static final int v10_VALUE = 10; + public static final int v11_VALUE = 11; public final int getNumber() { return value; } @@ -228,6 +230,7 @@ public final class PulsarApi { case 8: return v8; case 9: return v9; case 10: return v10; + case 11: return v11; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index db2eec4..964fd52 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -134,6 +134,7 @@ enum ProtocolVersion { v8 = 8; // Added CommandConsumerStats - Client fetches broker side consumer stats v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker + v11 = 11;// C++ consumers before this version are not correctly handling the checksum field } message CommandConnect { -- To stop receiving notification emails like this one, please contact mme...@apache.org.