This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d159ef Fix MessageRouter hash inconsistent on C++/Java client (#1029) 8d159ef is described below commit 8d159efa5a4f25191fade5a9fddb45a245ef2e6b Author: Licht Takeuchi <lich...@outlook.jp> AuthorDate: Wed Jan 31 14:21:07 2018 +0900 Fix MessageRouter hash inconsistent on C++/Java client (#1029) * Fix hash inconsistent on between C++ and Java clients. * Add HashingScheme to select hash function on Java client * Fix the bug of Murmur3_32Hash on C++ client * Add Javadoc on makeHash method * Use JavaStringHash as default hash on Java client * Use BoostHash as default hash on C++ client * Make hash method always returns a signed integer * Re-implement hash classes as singleton on Java client * Move hash classes from include to lib * Change constructor argument of hash classes * Remove unused headers * Remove `auto` type * Fix C++ client Hash classes so that these return non-negative signed integer This is the same behavior as Hash classes on Java client * Add tests for C++/Java client Hash --- .../include/pulsar/ProducerConfiguration.h | 9 ++ .../{RoundRobinMessageRouter.cc => BoostHash.cc} | 19 +--- ...{SinglePartitionMessageRouter.h => BoostHash.h} | 24 ++--- .../lib/{SinglePartitionMessageRouter.h => Hash.h} | 28 +++--- ...oundRobinMessageRouter.cc => JavaStringHash.cc} | 27 ++--- ...lePartitionMessageRouter.h => JavaStringHash.h} | 25 +++-- ...dRobinMessageRouter.cc => MessageRouterBase.cc} | 34 ++++--- ...artitionMessageRouter.h => MessageRouterBase.h} | 26 ++--- pulsar-client-cpp/lib/Murmur3_32Hash.cc | 110 +++++++++++++++++++++ pulsar-client-cpp/lib/Murmur3_32Hash.h | 54 ++++++++++ pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 9 +- pulsar-client-cpp/lib/ProducerConfiguration.cc | 9 ++ pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 2 + pulsar-client-cpp/lib/RoundRobinMessageRouter.cc | 7 +- pulsar-client-cpp/lib/RoundRobinMessageRouter.h | 9 +- .../lib/SinglePartitionMessageRouter.cc | 7 +- .../lib/SinglePartitionMessageRouter.h | 11 ++- pulsar-client-cpp/tests/HashTest.cc | 67 +++++++++++++ .../tests/RoundRobinMessageRouterTest.cc | 8 +- .../tests/SinglePartitionMessageRouterTest.cc | 6 +- .../pulsar/client/api/ProducerConfiguration.java | 20 +++- .../java/org/apache/pulsar/client/impl/Hash.java | 26 ++--- ...nMessageRouterImpl.java => JavaStringHash.java} | 23 ++--- ...ssageRouterImpl.java => MessageRouterBase.java} | 28 +++--- .../apache/pulsar/client/impl/Murmur3_32Hash.java | 102 +++++++++++++++++++ .../client/impl/PartitionedProducerImpl.java | 4 +- .../impl/RoundRobinPartitionMessageRouterImpl.java | 10 +- .../impl/SinglePartitionMessageRouterImpl.java | 9 +- .../org/apache/pulsar/client/impl/HashTest.java} | 36 ++++--- .../RoundRobinPartitionMessageRouterImplTest.java | 5 +- .../impl/SinglePartitionMessageRouterImplTest.java | 5 +- 31 files changed, 557 insertions(+), 202 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 9d2d5f9..d12b65d 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -44,6 +44,12 @@ class ProducerConfiguration { RoundRobinDistribution, CustomPartition }; + enum HashingScheme + { + Murmur3_32Hash, + BoostHash, + JavaStringHash + }; ProducerConfiguration(); ~ProducerConfiguration(); ProducerConfiguration(const ProducerConfiguration&); @@ -86,6 +92,9 @@ class ProducerConfiguration { ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router); const MessageRoutingPolicyPtr& getMessageRouterPtr() const; + ProducerConfiguration& setHashingScheme(const HashingScheme& scheme); + HashingScheme getHashingScheme() const; + ProducerConfiguration& setBlockIfQueueFull(bool); bool getBlockIfQueueFull() const; diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/BoostHash.cc similarity index 57% copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc copy to pulsar-client-cpp/lib/BoostHash.cc index af8f49a..90876b7 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/BoostHash.cc @@ -16,23 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -#include "RoundRobinMessageRouter.h" +#include "BoostHash.h" namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} -RoundRobinMessageRouter::~RoundRobinMessageRouter() {} +BoostHash::BoostHash() : hash() {} -// override -int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { - // if message has a key, hash the key and return the partition - if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); - } else { - Lock lock(mutex_); - // else pick the next partition - return prevPartition_++ % topicMetadata.getNumPartitions(); - } +int32_t BoostHash::makeHash(const std::string& key) { + return static_cast<int32_t>(hash(key) & std::numeric_limits<int32_t>::max()); } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/BoostHash.h similarity index 60% copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h copy to pulsar-client-cpp/lib/BoostHash.h index 453f9ea..e5911fe 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/BoostHash.h @@ -16,24 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ -#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ +#ifndef BOOST_HASH_HPP_ +#define BOOST_HASH_HPP_ -#include <pulsar/MessageRoutingPolicy.h> -#include <pulsar/TopicMetadata.h> +#include "Hash.h" + +#include <cstdint> +#include <string> #include <boost/functional/hash.hpp> namespace pulsar { - -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class BoostHash : public Hash { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash<std::string> StringHash; - virtual ~SinglePartitionMessageRouter(); - virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); + BoostHash(); + int32_t makeHash(const std::string &key); private: - int selectedSinglePartition_; + boost::hash<std::string> hash; }; } // namespace pulsar -#endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ + +#endif /* BOOST_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/Hash.h similarity index 57% copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h copy to pulsar-client-cpp/lib/Hash.h index 453f9ea..f278478 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/Hash.h @@ -16,24 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ -#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ +#ifndef HASH_HPP_ +#define HASH_HPP_ -#include <pulsar/MessageRoutingPolicy.h> -#include <pulsar/TopicMetadata.h> -#include <boost/functional/hash.hpp> +#include <cstdint> +#include <string> namespace pulsar { - -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class Hash { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash<std::string> StringHash; - virtual ~SinglePartitionMessageRouter(); - virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); - - private: - int selectedSinglePartition_; + /** + * Generate the hash of a given String + * + * @return The hash of {@param key}, which is non-negative integer. + */ + virtual int32_t makeHash(const std::string& key) = 0; }; } // namespace pulsar -#endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ + +#endif /* HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/JavaStringHash.cc similarity index 57% copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc copy to pulsar-client-cpp/lib/JavaStringHash.cc index af8f49a..7579e0e 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/JavaStringHash.cc @@ -16,23 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -#include "RoundRobinMessageRouter.h" +#include "JavaStringHash.h" namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} -RoundRobinMessageRouter::~RoundRobinMessageRouter() {} +JavaStringHash::JavaStringHash() {} -// override -int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { - // if message has a key, hash the key and return the partition - if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); - } else { - Lock lock(mutex_); - // else pick the next partition - return prevPartition_++ % topicMetadata.getNumPartitions(); +int32_t JavaStringHash::makeHash(const std::string& key) { + uint64_t len = key.length(); + const char* val = key.c_str(); + uint32_t hash = 0; + + for (int i = 0; i < len; i++) { + hash = 31 * hash + val[i]; } + + hash &= std::numeric_limits<int32_t>::max(); + + return hash; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/JavaStringHash.h similarity index 60% copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h copy to pulsar-client-cpp/lib/JavaStringHash.h index 453f9ea..3b01aa8 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/JavaStringHash.h @@ -16,24 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ -#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ +#ifndef JAVA_DEFAULT_HASH_HPP_ +#define JAVA_DEFAULT_HASH_HPP_ -#include <pulsar/MessageRoutingPolicy.h> -#include <pulsar/TopicMetadata.h> +#include "Hash.h" + +#include <cstdint> +#include <string> #include <boost/functional/hash.hpp> namespace pulsar { - -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class JavaStringHash : public Hash { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash<std::string> StringHash; - virtual ~SinglePartitionMessageRouter(); - virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); - - private: - int selectedSinglePartition_; + JavaStringHash(); + int32_t makeHash(const std::string &key); }; } // namespace pulsar -#endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ + +#endif /* JAVA_DEFAULT_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/MessageRouterBase.cc similarity index 55% copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc copy to pulsar-client-cpp/lib/MessageRouterBase.cc index af8f49a..c0824f9 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/MessageRouterBase.cc @@ -16,23 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -#include "RoundRobinMessageRouter.h" +#include "MessageRouterBase.h" -namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} - -RoundRobinMessageRouter::~RoundRobinMessageRouter() {} +#include "BoostHash.h" +#include "JavaStringHash.h" +#include "Murmur3_32Hash.h" -// override -int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { - // if message has a key, hash the key and return the partition - if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); - } else { - Lock lock(mutex_); - // else pick the next partition - return prevPartition_++ % topicMetadata.getNumPartitions(); +namespace pulsar { +MessageRouterBase::MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme) { + switch (hashingScheme) { + case ProducerConfiguration::BoostHash: + hash = HashPtr(new BoostHash()); + break; + case ProducerConfiguration::JavaStringHash: + hash = HashPtr(new JavaStringHash()); + break; + case ProducerConfiguration::Murmur3_32Hash: + default: + hash = HashPtr(new Murmur3_32Hash()); + break; } } -} // namespace pulsar +} // namespace pulsar \ No newline at end of file diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/MessageRouterBase.h similarity index 60% copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h copy to pulsar-client-cpp/lib/MessageRouterBase.h index 453f9ea..ca458d2 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/MessageRouterBase.h @@ -16,24 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ -#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ +#ifndef PULSAR_CPP_MESSAGEROUTERBASE_H +#define PULSAR_CPP_MESSAGEROUTERBASE_H + +#include <boost/interprocess/smart_ptr/unique_ptr.hpp> +#include <boost/checked_delete.hpp> #include <pulsar/MessageRoutingPolicy.h> -#include <pulsar/TopicMetadata.h> -#include <boost/functional/hash.hpp> +#include <pulsar/ProducerConfiguration.h> +#include "Hash.h" namespace pulsar { +typedef boost::interprocess::unique_ptr<Hash, boost::checked_deleter<Hash> > HashPtr; -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class MessageRouterBase : public MessageRoutingPolicy { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash<std::string> StringHash; - virtual ~SinglePartitionMessageRouter(); - virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); + MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme); - private: - int selectedSinglePartition_; + protected: + HashPtr hash; }; } // namespace pulsar -#endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ + +#endif // PULSAR_CPP_MESSAGEROUTERBASE_H diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.cc b/pulsar-client-cpp/lib/Murmur3_32Hash.cc new file mode 100644 index 0000000..1367514 --- /dev/null +++ b/pulsar-client-cpp/lib/Murmur3_32Hash.cc @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//----------------------------------------------------------------------------- +// The original MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. This source code, implemented by Licht Takeuchi, is based on +// the orignal MurmurHash3 source code. +#include "Murmur3_32Hash.h" + +#include <boost/predef.h> +#include <limits> + +#if BOOST_COMP_MSVC +#include <stdlib.h> +#define ROTATE_LEFT(x, y) _rotl(x, y) +#else +#define ROTATE_LEFT(x, y) rotate_left(x, y) +#endif + +#if BOOST_ENDIAN_LITTLE_BYTE +#define BYTESPWAP(x) (x) +#elif BOOST_ENDIAN_BIG_BYTE +#if BOOST_COMP_CLANG || BOOST_COMP_GNUC +#define BYTESPWAP(x) __builtin_bswap32(x) +#elif BOOST_COMP_MSVC +#define BYTESPWAP(x) _byteswap_uint32(x) +#else +#endif +#else +#endif + +namespace pulsar { + +Murmur3_32Hash::Murmur3_32Hash() : seed(0) {} + +int32_t Murmur3_32Hash::makeHash(const std::string &key) { + return static_cast<int32_t>(makeHash(&key.front(), key.length()) & std::numeric_limits<int32_t>::max()); +} + +uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) { + const uint8_t *data = reinterpret_cast<const uint8_t *>(key); + const int nblocks = len / CHUNK_SIZE; + uint32_t h1 = seed; + const uint32_t *blocks = reinterpret_cast<const uint32_t *>(data + nblocks * CHUNK_SIZE); + + for (int i = -nblocks; i != 0; i++) { + uint32_t k1 = BYTESPWAP(blocks[i]); + + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + uint32_t k1 = 0; + switch (len - nblocks * CHUNK_SIZE) { + case 3: + k1 ^= static_cast<uint32_t>(blocks[2]) << 16; + case 2: + k1 ^= static_cast<uint32_t>(blocks[1]) << 8; + case 1: + k1 ^= static_cast<uint32_t>(blocks[0]); + }; + + h1 ^= mixK1(k1); + h1 ^= len; + h1 = fmix(h1); + + return h1; +} + +uint32_t Murmur3_32Hash::fmix(uint32_t h) { + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +uint32_t Murmur3_32Hash::mixK1(uint32_t k1) { + k1 *= C1; + k1 = ROTATE_LEFT(k1, 15); + k1 *= C2; + return k1; +} + +uint32_t Murmur3_32Hash::mixH1(uint32_t h1, uint32_t k1) { + h1 ^= k1; + h1 = ROTATE_LEFT(h1, 13); + return h1 * 5 + 0xe6546b64; +} + +uint32_t Murmur3_32Hash::rotate_left(uint32_t x, uint8_t r) { return (x << r) | (x >> ((32 - r))); } +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.h b/pulsar-client-cpp/lib/Murmur3_32Hash.h new file mode 100644 index 0000000..644d186 --- /dev/null +++ b/pulsar-client-cpp/lib/Murmur3_32Hash.h @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//----------------------------------------------------------------------------- +// The original MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. This source code, implemented by Licht Takeuchi, is based on +// the orignal MurmurHash3 source code. +#ifndef MURMUR3_32_HASH_HPP_ +#define MURMUR3_32_HASH_HPP_ + +#include "Hash.h" + +#include <cstdint> +#include <string> + +namespace pulsar { + +class Murmur3_32Hash : public Hash { + public: + Murmur3_32Hash(); + + int32_t makeHash(const std::string& key); + + private: + static constexpr int32_t CHUNK_SIZE = 4; + static constexpr uint32_t C1 = 0xcc9e2d51; + static constexpr uint32_t C2 = 0x1b873593; + uint32_t seed; + + static uint32_t fmix(uint32_t h); + static uint32_t mixK1(uint32_t k1); + static uint32_t mixH1(uint32_t h1, uint32_t k1); + static uint32_t rotate_left(uint32_t x, uint8_t r); + uint32_t makeHash(const void* key, const int64_t len); +}; +} // namespace pulsar + +#endif /* MURMUR3_32_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 825bcc0..2c97ab4 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -56,14 +56,14 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { case ProducerConfiguration::RoundRobinDistribution: - return boost::make_shared<RoundRobinMessageRouter>(); + return boost::make_shared<RoundRobinMessageRouter>(conf_.getHashingScheme()); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: default: unsigned int random = rand(); - return boost::make_shared<SinglePartitionMessageRouter>(random % - topicMetadata_->getNumPartitions()); + return boost::make_shared<SinglePartitionMessageRouter>( + random % topicMetadata_->getNumPartitions(), conf_.getHashingScheme()); } } @@ -160,8 +160,7 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const { /* * if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to - * create - * one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure + * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure */ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { int producerIndex = 0; diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc index 011d859..c010a9f 100644 --- a/pulsar-client-cpp/lib/ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -104,6 +104,15 @@ const MessageRoutingPolicyPtr& ProducerConfiguration::getMessageRouterPtr() cons return impl_->messageRouter; } +ProducerConfiguration& ProducerConfiguration::setHashingScheme(const HashingScheme& scheme) { + impl_->hashingScheme = scheme; + return *this; +} + +ProducerConfiguration::HashingScheme ProducerConfiguration::getHashingScheme() const { + return impl_->hashingScheme; +} + ProducerConfiguration& ProducerConfiguration::setBlockIfQueueFull(bool flag) { impl_->blockIfQueueFull = flag; return *this; diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index f211fb7..11fe7cc 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -35,6 +35,7 @@ struct ProducerConfigurationImpl { int maxPendingMessagesAcrossPartitions; ProducerConfiguration::PartitionsRoutingMode routingMode; MessageRoutingPolicyPtr messageRouter; + ProducerConfiguration::HashingScheme hashingScheme; bool blockIfQueueFull; bool batchingEnabled; unsigned int batchingMaxMessages; @@ -46,6 +47,7 @@ struct ProducerConfigurationImpl { maxPendingMessages(1000), maxPendingMessagesAcrossPartitions(50000), routingMode(ProducerConfiguration::UseSinglePartition), + hashingScheme(ProducerConfiguration::BoostHash), blockIfQueueFull(false), batchingEnabled(false), batchingMaxMessages(1000), diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc index af8f49a..c47fb23 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc @@ -19,7 +19,8 @@ #include "RoundRobinMessageRouter.h" namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} +RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme) + : MessageRouterBase(hashingScheme), prevPartition_(0) {} RoundRobinMessageRouter::~RoundRobinMessageRouter() {} @@ -27,12 +28,12 @@ RoundRobinMessageRouter::~RoundRobinMessageRouter() {} int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { // if message has a key, hash the key and return the partition if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); + return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); } else { Lock lock(mutex_); // else pick the next partition return prevPartition_++ % topicMetadata.getNumPartitions(); } } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h index 1fe337b..57c27b4 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h @@ -20,14 +20,16 @@ #define PULSAR_RR_MESSAGE_ROUTER_HEADER_ #include <pulsar/MessageRoutingPolicy.h> +#include <pulsar/ProducerConfiguration.h> #include <pulsar/TopicMetadata.h> -#include <boost/functional/hash.hpp> #include <boost/thread/mutex.hpp> +#include "Hash.h" +#include "MessageRouterBase.h" namespace pulsar { -class RoundRobinMessageRouter : public MessageRoutingPolicy { +class RoundRobinMessageRouter : public MessageRouterBase { public: - RoundRobinMessageRouter(); + RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme); virtual ~RoundRobinMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); @@ -35,7 +37,6 @@ class RoundRobinMessageRouter : public MessageRoutingPolicy { boost::mutex mutex_; unsigned int prevPartition_; }; -typedef boost::hash<std::string> StringHash; typedef boost::unique_lock<boost::mutex> Lock; } // namespace pulsar #endif // PULSAR_RR_MESSAGE_ROUTER_HEADER_ diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc index fca97b5..1e5ab75 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc +++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc @@ -20,7 +20,9 @@ namespace pulsar { SinglePartitionMessageRouter::~SinglePartitionMessageRouter() {} -SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex) { +SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex, + ProducerConfiguration::HashingScheme hashingScheme) + : MessageRouterBase(hashingScheme) { selectedSinglePartition_ = partitionIndex; } @@ -28,8 +30,7 @@ SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIn int SinglePartitionMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { // if message has a key, hash the key and return the partition if (msg.hasPartitionKey()) { - StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); + return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); } else { // else pick the next partition return selectedSinglePartition_; diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h index 453f9ea..409d31a 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h @@ -20,20 +20,23 @@ #define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ #include <pulsar/MessageRoutingPolicy.h> +#include <include/pulsar/ProducerConfiguration.h> +#include "Hash.h" #include <pulsar/TopicMetadata.h> -#include <boost/functional/hash.hpp> +#include "MessageRouterBase.h" namespace pulsar { -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class SinglePartitionMessageRouter : public MessageRouterBase { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash<std::string> StringHash; + SinglePartitionMessageRouter(const int partitionIndex, + ProducerConfiguration::HashingScheme hashingScheme); virtual ~SinglePartitionMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); private: int selectedSinglePartition_; }; + } // namespace pulsar #endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ diff --git a/pulsar-client-cpp/tests/HashTest.cc b/pulsar-client-cpp/tests/HashTest.cc new file mode 100644 index 0000000..97dcb90 --- /dev/null +++ b/pulsar-client-cpp/tests/HashTest.cc @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <pulsar/Client.h> +#include <gtest/gtest.h> +#include <gmock/gmock.h> +#include <boost/functional/hash.hpp> + +#include "../lib/BoostHash.h" +#include "../lib/JavaStringHash.h" +#include "../lib/Murmur3_32Hash.h" + +using ::testing::AtLeast; +using ::testing::Return; +using ::testing::ReturnRef; + +using namespace pulsar; + +TEST(HashTest, testBoostHash) { + BoostHash hash; + boost::hash<std::string> boostHash; + + std::string key1 = "key1"; + std::string key2 = "key2"; + + ASSERT_EQ(boostHash(key1) & std::numeric_limits<int32_t>::max(), hash.makeHash(key1)); + ASSERT_EQ(boostHash(key2) & std::numeric_limits<int32_t>::max(), hash.makeHash(key2)); +} + +TEST(HashTest, testJavaStringHash) { + JavaStringHash hash; + + // Calculating `hashCode()` makes overflow as unsigned int32. + std::string key1 = "keykeykeykeykey1"; + + // `hashCode()` is negative as signed int32. + std::string key2 = "keykeykey2"; + + // Same as Java client + ASSERT_EQ(434058482, hash.makeHash(key1)); + ASSERT_EQ(42978643, hash.makeHash(key2)); +} + +TEST(HashTest, testMurmur3_32Hash) { + Murmur3_32Hash hash; + std::string key1 = "key1"; + std::string key2 = "key2"; + + // Same value as Java client + ASSERT_EQ(462881061, hash.makeHash(key1)); + ASSERT_EQ(1936800180, hash.makeHash(key2)); +} \ No newline at end of file diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc index 9542353..431dd6b 100644 --- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc +++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc @@ -17,8 +17,10 @@ * under the License. */ #include <pulsar/Client.h> +#include <pulsar/ProducerConfiguration.h> #include <gtest/gtest.h> #include <gmock/gmock.h> +#include <boost/functional/hash.hpp> #include "tests/mocks/GMockMessage.h" @@ -37,8 +39,8 @@ TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { const int numPartitions1 = 5; const int numPartitions2 = 3; - RoundRobinMessageRouter router1; - RoundRobinMessageRouter router2; + RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash); + RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash); GMockMessage message; EXPECT_CALL(message, hasPartitionKey()).Times(20).WillRepeatedly(Return(false)); @@ -52,7 +54,7 @@ TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) { const int numPartitons = 1234; - RoundRobinMessageRouter router; + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash); std::string partitionKey1 = "key1"; std::string partitionKey2 = "key2"; diff --git a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc index 9457acc..b62d286 100644 --- a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc +++ b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc @@ -17,6 +17,8 @@ * under the License. */ #include <pulsar/Client.h> +#include <pulsar/ProducerConfiguration.h> +#include <boost/functional/hash.hpp> #include <gtest/gtest.h> #include <gmock/gmock.h> @@ -36,7 +38,7 @@ using namespace pulsar; TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { const int selectedPartition = 1234; - SinglePartitionMessageRouter router(selectedPartition); + SinglePartitionMessageRouter router(selectedPartition, ProducerConfiguration::BoostHash); GMockMessage message; EXPECT_CALL(message, hasPartitionKey()).Times(1).WillOnce(Return(false)); @@ -48,7 +50,7 @@ TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithPartitionKey) { const int numPartitons = 1234; - SinglePartitionMessageRouter router(1); + SinglePartitionMessageRouter router(1, ProducerConfiguration::BoostHash); std::string partitionKey1 = "key1"; std::string partitionKey2 = "key2"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 08b73c2..678e3d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -45,6 +45,7 @@ public class ProducerConfiguration implements Serializable { private int maxPendingMessages = 1000; private int maxPendingMessagesAcrossPartitions = 50000; private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition; + private HashingScheme hashingScheme = HashingScheme.JavaStringHash; private MessageRouter customMessageRouter = null; private long batchingMaxPublishDelayMs = 10; private int batchingMaxMessages = 1000; @@ -64,6 +65,11 @@ public class ProducerConfiguration implements Serializable { SinglePartition, RoundRobinPartition, CustomPartition } + public enum HashingScheme { + JavaStringHash, + Murmur3_32Hash + } + private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL; /** @@ -136,6 +142,15 @@ public class ProducerConfiguration implements Serializable { return this; } + public HashingScheme getHashingScheme() { + return hashingScheme; + } + + public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) { + this.hashingScheme = hashingScheme; + return this; + } + /** * * @return the maximum number of pending messages allowed across all the partitions @@ -171,7 +186,7 @@ public class ProducerConfiguration implements Serializable { * message queue is full. * <p> * Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with - * {@link ProducerQueueIsFullError} when there is no space left in pending queue. + * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue. * * @param blockIfQueueFull * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full @@ -481,7 +496,8 @@ public class ProducerConfiguration implements Serializable { ProducerConfiguration other = (ProducerConfiguration) obj; return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs) && Objects.equal(maxPendingMessages, other.maxPendingMessages) - && Objects.equal(this.messageRouteMode, other.messageRouteMode); + && Objects.equal(this.messageRouteMode, other.messageRouteMode) + && Objects.equal(this.hashingScheme, other.hashingScheme); } return false; diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java similarity index 54% copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java index af8f49a..31c771a 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java @@ -16,23 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -#include "RoundRobinMessageRouter.h" +package org.apache.pulsar.client.impl; -namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} - -RoundRobinMessageRouter::~RoundRobinMessageRouter() {} - -// override -int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { - // if message has a key, hash the key and return the partition - if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); - } else { - Lock lock(mutex_); - // else pick the next partition - return prevPartition_++ % topicMetadata.getNumPartitions(); - } +public interface Hash { + /** + * Generate the hash of a given String + * + * @return The hash of {@param s}, which is non-negative integer. + */ + int makeHash(String s); } -} // namespace pulsar diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java similarity index 55% copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java index e1ab878..2f6fc28 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java @@ -18,26 +18,17 @@ */ package org.apache.pulsar.client.impl; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; -import org.apache.pulsar.client.api.TopicMetadata; +public class JavaStringHash implements Hash { + private static final JavaStringHash instance = new JavaStringHash(); -public class SinglePartitionMessageRouterImpl implements MessageRouter { + private JavaStringHash(){ } - private final int partitionIndex; - - public SinglePartitionMessageRouterImpl(int partitionIndex) { - this.partitionIndex = partitionIndex; + public static Hash getInstance() { + return instance; } @Override - public int choosePartition(Message msg, TopicMetadata metadata) { - // If the message has a key, it supersedes the single partition routing policy - if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions()); - } - - return partitionIndex; + public int makeHash(String s) { + return s.hashCode() & Integer.MAX_VALUE; } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java similarity index 58% copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java index e1ab878..4825dab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java @@ -18,26 +18,20 @@ */ package org.apache.pulsar.client.impl; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRouter; -import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.client.api.ProducerConfiguration; -public class SinglePartitionMessageRouterImpl implements MessageRouter { +public abstract class MessageRouterBase implements MessageRouter { + protected final Hash hash; - private final int partitionIndex; - - public SinglePartitionMessageRouterImpl(int partitionIndex) { - this.partitionIndex = partitionIndex; - } - - @Override - public int choosePartition(Message msg, TopicMetadata metadata) { - // If the message has a key, it supersedes the single partition routing policy - if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions()); + MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) { + switch (hashingScheme) { + case JavaStringHash: + this.hash = JavaStringHash.getInstance(); + break; + case Murmur3_32Hash: + default: + this.hash = Murmur3_32Hash.getInstance(); } - - return partitionIndex; } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java new file mode 100644 index 0000000..80552da --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * The original MurmurHash3 was written by Austin Appleby, and is placed in the + * public domain. This source code, implemented by Licht Takeuchi, is based on + * the orignal MurmurHash3 source code. + */ +package org.apache.pulsar.client.impl; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import com.google.common.primitives.UnsignedBytes; + +public class Murmur3_32Hash implements Hash { + private static final Murmur3_32Hash instance = new Murmur3_32Hash(); + + private static final int CHUNK_SIZE = 4; + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + private final int seed; + + private Murmur3_32Hash() { + seed = 0; + } + + public static Hash getInstance() { + return instance; + } + + @Override + public int makeHash(String s) { + return makeHash(s.getBytes(StandardCharsets.UTF_8)) & Integer.MAX_VALUE; + } + + private int makeHash(byte[] bytes) { + int len = bytes.length; + int reminder = len % CHUNK_SIZE; + int h1 = seed; + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + + while (byteBuffer.remaining() >= CHUNK_SIZE) { + int k1 = byteBuffer.getInt(); + + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + int k1 = 0; + for (int i = 0; i < reminder; i++) { + k1 ^= UnsignedBytes.toInt(byteBuffer.get()) << (i * 8); + } + + h1 ^= mixK1(k1); + h1 ^= len; + h1 = fmix(h1); + + return h1; + } + + private int fmix(int h) { + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + h ^= h >>> 16; + + return h; + } + + private int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + return h1 * 5 + 0xe6546b64; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 04269f3..fe9ae6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -75,12 +75,12 @@ public class PartitionedProducerImpl extends ProducerBase { messageRouter = customMessageRouter; break; case RoundRobinPartition: - messageRouter = new RoundRobinPartitionMessageRouterImpl(); + messageRouter = new RoundRobinPartitionMessageRouterImpl(conf.getHashingScheme()); break; case SinglePartition: default: messageRouter = new SinglePartitionMessageRouterImpl( - ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions())); + ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme()); } return messageRouter; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index fc56500..a7c25c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -21,16 +21,17 @@ package org.apache.pulsar.client.impl; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.TopicMetadata; -public class RoundRobinPartitionMessageRouterImpl implements MessageRouter { +public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase { private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex"); private volatile int partitionIndex = 0; - public RoundRobinPartitionMessageRouterImpl() { + public RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme hashingScheme) { + super(hashingScheme); PARTITION_INDEX_UPDATER.set(this, 0); } @@ -38,8 +39,9 @@ public class RoundRobinPartitionMessageRouterImpl implements MessageRouter { public int choosePartition(Message msg, TopicMetadata topicMetadata) { // If the message has a key, it supersedes the round robin routing policy if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % topicMetadata.numPartitions()); + return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions(); } + return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % topicMetadata.numPartitions()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java index e1ab878..3a0cd54 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java @@ -19,14 +19,15 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.TopicMetadata; -public class SinglePartitionMessageRouterImpl implements MessageRouter { +public class SinglePartitionMessageRouterImpl extends MessageRouterBase { private final int partitionIndex; - public SinglePartitionMessageRouterImpl(int partitionIndex) { + public SinglePartitionMessageRouterImpl(int partitionIndex, ProducerConfiguration.HashingScheme hashingScheme) { + super(hashingScheme); this.partitionIndex = partitionIndex; } @@ -34,7 +35,7 @@ public class SinglePartitionMessageRouterImpl implements MessageRouter { public int choosePartition(Message msg, TopicMetadata metadata) { // If the message has a key, it supersedes the single partition routing policy if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions()); + return hash.makeHash(msg.getKey()) % metadata.numPartitions(); } return partitionIndex; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java similarity index 53% copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java copy to pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java index e1ab878..f53205d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java @@ -18,26 +18,32 @@ */ package org.apache.pulsar.client.impl; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; -import org.apache.pulsar.client.api.TopicMetadata; +import org.testng.annotations.Test; -public class SinglePartitionMessageRouterImpl implements MessageRouter { +import static org.testng.Assert.*; - private final int partitionIndex; +public class HashTest { + @Test + public void javaStringHashTest() { + Hash h = JavaStringHash.getInstance(); - public SinglePartitionMessageRouterImpl(int partitionIndex) { - this.partitionIndex = partitionIndex; - } + // Calculating `hashCode()` makes overflow as unsigned int32. + String key1 = "keykeykeykeykey1"; - @Override - public int choosePartition(Message msg, TopicMetadata metadata) { - // If the message has a key, it supersedes the single partition routing policy - if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions()); - } + // `hashCode()` is negative as signed int32. + String key2 = "keykeykey2"; - return partitionIndex; + // Same value as C++ client + assertEquals(434058482, h.makeHash(key1)); + assertEquals(42978643, h.makeHash(key2)); } + @Test + public void murmur3_32HashTest() { + Hash h = Murmur3_32Hash.getInstance(); + + // Same value as C++ client + assertEquals(462881061, h.makeHash("key1")); + assertEquals(1936800180, h.makeHash("key2")); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java index e3cb6c0..d0e4d51 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java @@ -23,6 +23,7 @@ import static org.powermock.api.mockito.PowerMockito.when; import static org.testng.Assert.assertEquals; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.testng.annotations.Test; /** @@ -35,7 +36,7 @@ public class RoundRobinPartitionMessageRouterImplTest { Message msg = mock(Message.class); when(msg.getKey()).thenReturn(null); - RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(); + RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash); for (int i = 0; i < 10; i++) { assertEquals(i % 5, router.choosePartition(msg, new TopicMetadataImpl(5))); } @@ -52,7 +53,7 @@ public class RoundRobinPartitionMessageRouterImplTest { when(msg2.hasKey()).thenReturn(true); when(msg2.getKey()).thenReturn(key2); - RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(); + RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash); TopicMetadataImpl metadata = new TopicMetadataImpl(100); assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata)); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java index 7b5ea4e..c8ea0b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.testng.annotations.Test; /** @@ -35,7 +36,7 @@ public class SinglePartitionMessageRouterImplTest { Message msg = mock(Message.class); when(msg.getKey()).thenReturn(null); - SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234); + SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash); assertEquals(1234, router.choosePartition(msg, new TopicMetadataImpl(2468))); } @@ -50,7 +51,7 @@ public class SinglePartitionMessageRouterImplTest { when(msg2.hasKey()).thenReturn(true); when(msg2.getKey()).thenReturn(key2); - SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234); + SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash); TopicMetadataImpl metadata = new TopicMetadataImpl(100); assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata)); -- To stop receiving notification emails like this one, please contact mme...@apache.org.