This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d159ef  Fix MessageRouter hash inconsistent on C++/Java client (#1029)
8d159ef is described below

commit 8d159efa5a4f25191fade5a9fddb45a245ef2e6b
Author: Licht Takeuchi <lich...@outlook.jp>
AuthorDate: Wed Jan 31 14:21:07 2018 +0900

    Fix MessageRouter hash inconsistent on C++/Java client (#1029)
    
    * Fix hash inconsistent on between C++ and Java clients.
    
    * Add HashingScheme to select hash function on Java client
    
    * Fix the bug of Murmur3_32Hash on C++ client
    
    * Add Javadoc on makeHash method
    
    * Use JavaStringHash as default hash on Java client
    
    * Use BoostHash as default hash on C++ client
    
    * Make hash method always returns a signed integer
    
    * Re-implement hash classes as singleton on Java client
    
    * Move hash classes from include to lib
    
    * Change constructor argument of hash classes
    
    * Remove unused headers
    
    * Remove `auto` type
    
    * Fix C++ client Hash classes so that these return non-negative signed 
integer
    This is the same behavior as Hash classes on Java client
    
    * Add tests for C++/Java client Hash
---
 .../include/pulsar/ProducerConfiguration.h         |   9 ++
 .../{RoundRobinMessageRouter.cc => BoostHash.cc}   |  19 +---
 ...{SinglePartitionMessageRouter.h => BoostHash.h} |  24 ++---
 .../lib/{SinglePartitionMessageRouter.h => Hash.h} |  28 +++---
 ...oundRobinMessageRouter.cc => JavaStringHash.cc} |  27 ++---
 ...lePartitionMessageRouter.h => JavaStringHash.h} |  25 +++--
 ...dRobinMessageRouter.cc => MessageRouterBase.cc} |  34 ++++---
 ...artitionMessageRouter.h => MessageRouterBase.h} |  26 ++---
 pulsar-client-cpp/lib/Murmur3_32Hash.cc            | 110 +++++++++++++++++++++
 pulsar-client-cpp/lib/Murmur3_32Hash.h             |  54 ++++++++++
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |   9 +-
 pulsar-client-cpp/lib/ProducerConfiguration.cc     |   9 ++
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |   2 +
 pulsar-client-cpp/lib/RoundRobinMessageRouter.cc   |   7 +-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.h    |   9 +-
 .../lib/SinglePartitionMessageRouter.cc            |   7 +-
 .../lib/SinglePartitionMessageRouter.h             |  11 ++-
 pulsar-client-cpp/tests/HashTest.cc                |  67 +++++++++++++
 .../tests/RoundRobinMessageRouterTest.cc           |   8 +-
 .../tests/SinglePartitionMessageRouterTest.cc      |   6 +-
 .../pulsar/client/api/ProducerConfiguration.java   |  20 +++-
 .../java/org/apache/pulsar/client/impl/Hash.java   |  26 ++---
 ...nMessageRouterImpl.java => JavaStringHash.java} |  23 ++---
 ...ssageRouterImpl.java => MessageRouterBase.java} |  28 +++---
 .../apache/pulsar/client/impl/Murmur3_32Hash.java  | 102 +++++++++++++++++++
 .../client/impl/PartitionedProducerImpl.java       |   4 +-
 .../impl/RoundRobinPartitionMessageRouterImpl.java |  10 +-
 .../impl/SinglePartitionMessageRouterImpl.java     |   9 +-
 .../org/apache/pulsar/client/impl/HashTest.java}   |  36 ++++---
 .../RoundRobinPartitionMessageRouterImplTest.java  |   5 +-
 .../impl/SinglePartitionMessageRouterImplTest.java |   5 +-
 31 files changed, 557 insertions(+), 202 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9d2d5f9..d12b65d 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -44,6 +44,12 @@ class ProducerConfiguration {
         RoundRobinDistribution,
         CustomPartition
     };
+    enum HashingScheme
+    {
+        Murmur3_32Hash,
+        BoostHash,
+        JavaStringHash
+    };
     ProducerConfiguration();
     ~ProducerConfiguration();
     ProducerConfiguration(const ProducerConfiguration&);
@@ -86,6 +92,9 @@ class ProducerConfiguration {
     ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& 
router);
     const MessageRoutingPolicyPtr& getMessageRouterPtr() const;
 
+    ProducerConfiguration& setHashingScheme(const HashingScheme& scheme);
+    HashingScheme getHashingScheme() const;
+
     ProducerConfiguration& setBlockIfQueueFull(bool);
     bool getBlockIfQueueFull() const;
 
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client-cpp/lib/BoostHash.cc
similarity index 57%
copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
copy to pulsar-client-cpp/lib/BoostHash.cc
index af8f49a..90876b7 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/BoostHash.cc
@@ -16,23 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "RoundRobinMessageRouter.h"
+#include "BoostHash.h"
 
 namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {}
 
-RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
+BoostHash::BoostHash() : hash() {}
 
-// override
-int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
-    // if message has a key, hash the key and return the partition
-    if (msg.hasPartitionKey()) {
-        static StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
-    }
+int32_t BoostHash::makeHash(const std::string& key) {
+    return static_cast<int32_t>(hash(key) & 
std::numeric_limits<int32_t>::max());
 }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h 
b/pulsar-client-cpp/lib/BoostHash.h
similarity index 60%
copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
copy to pulsar-client-cpp/lib/BoostHash.h
index 453f9ea..e5911fe 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/BoostHash.h
@@ -16,24 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
-#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+#ifndef BOOST_HASH_HPP_
+#define BOOST_HASH_HPP_
 
-#include <pulsar/MessageRoutingPolicy.h>
-#include <pulsar/TopicMetadata.h>
+#include "Hash.h"
+
+#include <cstdint>
+#include <string>
 #include <boost/functional/hash.hpp>
 
 namespace pulsar {
-
-class SinglePartitionMessageRouter : public MessageRoutingPolicy {
+class BoostHash : public Hash {
    public:
-    explicit SinglePartitionMessageRouter(int partitionIndex);
-    typedef boost::hash<std::string> StringHash;
-    virtual ~SinglePartitionMessageRouter();
-    virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
+    BoostHash();
+    int32_t makeHash(const std::string &key);
 
    private:
-    int selectedSinglePartition_;
+    boost::hash<std::string> hash;
 };
 }  // namespace pulsar
-#endif  // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+
+#endif /* BOOST_HASH_HPP_ */
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h 
b/pulsar-client-cpp/lib/Hash.h
similarity index 57%
copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
copy to pulsar-client-cpp/lib/Hash.h
index 453f9ea..f278478 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/Hash.h
@@ -16,24 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
-#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+#ifndef HASH_HPP_
+#define HASH_HPP_
 
-#include <pulsar/MessageRoutingPolicy.h>
-#include <pulsar/TopicMetadata.h>
-#include <boost/functional/hash.hpp>
+#include <cstdint>
+#include <string>
 
 namespace pulsar {
-
-class SinglePartitionMessageRouter : public MessageRoutingPolicy {
+class Hash {
    public:
-    explicit SinglePartitionMessageRouter(int partitionIndex);
-    typedef boost::hash<std::string> StringHash;
-    virtual ~SinglePartitionMessageRouter();
-    virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
-
-   private:
-    int selectedSinglePartition_;
+    /**
+     * Generate the hash of a given String
+     *
+     * @return The hash of {@param key}, which is non-negative integer.
+     */
+    virtual int32_t makeHash(const std::string& key) = 0;
 };
 }  // namespace pulsar
-#endif  // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+
+#endif /* HASH_HPP_ */
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client-cpp/lib/JavaStringHash.cc
similarity index 57%
copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
copy to pulsar-client-cpp/lib/JavaStringHash.cc
index af8f49a..7579e0e 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/JavaStringHash.cc
@@ -16,23 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "RoundRobinMessageRouter.h"
+#include "JavaStringHash.h"
 
 namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {}
 
-RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
+JavaStringHash::JavaStringHash() {}
 
-// override
-int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
-    // if message has a key, hash the key and return the partition
-    if (msg.hasPartitionKey()) {
-        static StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
+int32_t JavaStringHash::makeHash(const std::string& key) {
+    uint64_t len = key.length();
+    const char* val = key.c_str();
+    uint32_t hash = 0;
+
+    for (int i = 0; i < len; i++) {
+        hash = 31 * hash + val[i];
     }
+
+    hash &= std::numeric_limits<int32_t>::max();
+
+    return hash;
 }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h 
b/pulsar-client-cpp/lib/JavaStringHash.h
similarity index 60%
copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
copy to pulsar-client-cpp/lib/JavaStringHash.h
index 453f9ea..3b01aa8 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/JavaStringHash.h
@@ -16,24 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
-#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+#ifndef JAVA_DEFAULT_HASH_HPP_
+#define JAVA_DEFAULT_HASH_HPP_
 
-#include <pulsar/MessageRoutingPolicy.h>
-#include <pulsar/TopicMetadata.h>
+#include "Hash.h"
+
+#include <cstdint>
+#include <string>
 #include <boost/functional/hash.hpp>
 
 namespace pulsar {
-
-class SinglePartitionMessageRouter : public MessageRoutingPolicy {
+class JavaStringHash : public Hash {
    public:
-    explicit SinglePartitionMessageRouter(int partitionIndex);
-    typedef boost::hash<std::string> StringHash;
-    virtual ~SinglePartitionMessageRouter();
-    virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
-
-   private:
-    int selectedSinglePartition_;
+    JavaStringHash();
+    int32_t makeHash(const std::string &key);
 };
 }  // namespace pulsar
-#endif  // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+
+#endif /* JAVA_DEFAULT_HASH_HPP_ */
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client-cpp/lib/MessageRouterBase.cc
similarity index 55%
copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
copy to pulsar-client-cpp/lib/MessageRouterBase.cc
index af8f49a..c0824f9 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/MessageRouterBase.cc
@@ -16,23 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "RoundRobinMessageRouter.h"
+#include "MessageRouterBase.h"
 
-namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {}
-
-RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
+#include "BoostHash.h"
+#include "JavaStringHash.h"
+#include "Murmur3_32Hash.h"
 
-// override
-int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
-    // if message has a key, hash the key and return the partition
-    if (msg.hasPartitionKey()) {
-        static StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
+namespace pulsar {
+MessageRouterBase::MessageRouterBase(ProducerConfiguration::HashingScheme 
hashingScheme) {
+    switch (hashingScheme) {
+        case ProducerConfiguration::BoostHash:
+            hash = HashPtr(new BoostHash());
+            break;
+        case ProducerConfiguration::JavaStringHash:
+            hash = HashPtr(new JavaStringHash());
+            break;
+        case ProducerConfiguration::Murmur3_32Hash:
+        default:
+            hash = HashPtr(new Murmur3_32Hash());
+            break;
     }
 }
-}  // namespace pulsar
+}  // namespace pulsar
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h 
b/pulsar-client-cpp/lib/MessageRouterBase.h
similarity index 60%
copy from pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
copy to pulsar-client-cpp/lib/MessageRouterBase.h
index 453f9ea..ca458d2 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/MessageRouterBase.h
@@ -16,24 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
-#define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+#ifndef PULSAR_CPP_MESSAGEROUTERBASE_H
+#define PULSAR_CPP_MESSAGEROUTERBASE_H
+
+#include <boost/interprocess/smart_ptr/unique_ptr.hpp>
+#include <boost/checked_delete.hpp>
 
 #include <pulsar/MessageRoutingPolicy.h>
-#include <pulsar/TopicMetadata.h>
-#include <boost/functional/hash.hpp>
+#include <pulsar/ProducerConfiguration.h>
+#include "Hash.h"
 
 namespace pulsar {
+typedef boost::interprocess::unique_ptr<Hash, boost::checked_deleter<Hash> > 
HashPtr;
 
-class SinglePartitionMessageRouter : public MessageRoutingPolicy {
+class MessageRouterBase : public MessageRoutingPolicy {
    public:
-    explicit SinglePartitionMessageRouter(int partitionIndex);
-    typedef boost::hash<std::string> StringHash;
-    virtual ~SinglePartitionMessageRouter();
-    virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
+    MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme);
 
-   private:
-    int selectedSinglePartition_;
+   protected:
+    HashPtr hash;
 };
 }  // namespace pulsar
-#endif  // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
+
+#endif  // PULSAR_CPP_MESSAGEROUTERBASE_H
diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.cc 
b/pulsar-client-cpp/lib/Murmur3_32Hash.cc
new file mode 100644
index 0000000..1367514
--- /dev/null
+++ b/pulsar-client-cpp/lib/Murmur3_32Hash.cc
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//-----------------------------------------------------------------------------
+// The original MurmurHash3 was written by Austin Appleby, and is placed in the
+// public domain. This source code, implemented by Licht Takeuchi, is based on
+// the orignal MurmurHash3 source code.
+#include "Murmur3_32Hash.h"
+
+#include <boost/predef.h>
+#include <limits>
+
+#if BOOST_COMP_MSVC
+#include <stdlib.h>
+#define ROTATE_LEFT(x, y) _rotl(x, y)
+#else
+#define ROTATE_LEFT(x, y) rotate_left(x, y)
+#endif
+
+#if BOOST_ENDIAN_LITTLE_BYTE
+#define BYTESPWAP(x) (x)
+#elif BOOST_ENDIAN_BIG_BYTE
+#if BOOST_COMP_CLANG || BOOST_COMP_GNUC
+#define BYTESPWAP(x) __builtin_bswap32(x)
+#elif BOOST_COMP_MSVC
+#define BYTESPWAP(x) _byteswap_uint32(x)
+#else
+#endif
+#else
+#endif
+
+namespace pulsar {
+
+Murmur3_32Hash::Murmur3_32Hash() : seed(0) {}
+
+int32_t Murmur3_32Hash::makeHash(const std::string &key) {
+    return static_cast<int32_t>(makeHash(&key.front(), key.length()) & 
std::numeric_limits<int32_t>::max());
+}
+
+uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) {
+    const uint8_t *data = reinterpret_cast<const uint8_t *>(key);
+    const int nblocks = len / CHUNK_SIZE;
+    uint32_t h1 = seed;
+    const uint32_t *blocks = reinterpret_cast<const uint32_t *>(data + nblocks 
* CHUNK_SIZE);
+
+    for (int i = -nblocks; i != 0; i++) {
+        uint32_t k1 = BYTESPWAP(blocks[i]);
+
+        k1 = mixK1(k1);
+        h1 = mixH1(h1, k1);
+    }
+
+    uint32_t k1 = 0;
+    switch (len - nblocks * CHUNK_SIZE) {
+        case 3:
+            k1 ^= static_cast<uint32_t>(blocks[2]) << 16;
+        case 2:
+            k1 ^= static_cast<uint32_t>(blocks[1]) << 8;
+        case 1:
+            k1 ^= static_cast<uint32_t>(blocks[0]);
+    };
+
+    h1 ^= mixK1(k1);
+    h1 ^= len;
+    h1 = fmix(h1);
+
+    return h1;
+}
+
+uint32_t Murmur3_32Hash::fmix(uint32_t h) {
+    h ^= h >> 16;
+    h *= 0x85ebca6b;
+    h ^= h >> 13;
+    h *= 0xc2b2ae35;
+    h ^= h >> 16;
+
+    return h;
+}
+
+uint32_t Murmur3_32Hash::mixK1(uint32_t k1) {
+    k1 *= C1;
+    k1 = ROTATE_LEFT(k1, 15);
+    k1 *= C2;
+    return k1;
+}
+
+uint32_t Murmur3_32Hash::mixH1(uint32_t h1, uint32_t k1) {
+    h1 ^= k1;
+    h1 = ROTATE_LEFT(h1, 13);
+    return h1 * 5 + 0xe6546b64;
+}
+
+uint32_t Murmur3_32Hash::rotate_left(uint32_t x, uint8_t r) { return (x << r) 
| (x >> ((32 - r))); }
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.h 
b/pulsar-client-cpp/lib/Murmur3_32Hash.h
new file mode 100644
index 0000000..644d186
--- /dev/null
+++ b/pulsar-client-cpp/lib/Murmur3_32Hash.h
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//-----------------------------------------------------------------------------
+// The original MurmurHash3 was written by Austin Appleby, and is placed in the
+// public domain. This source code, implemented by Licht Takeuchi, is based on
+// the orignal MurmurHash3 source code.
+#ifndef MURMUR3_32_HASH_HPP_
+#define MURMUR3_32_HASH_HPP_
+
+#include "Hash.h"
+
+#include <cstdint>
+#include <string>
+
+namespace pulsar {
+
+class Murmur3_32Hash : public Hash {
+   public:
+    Murmur3_32Hash();
+
+    int32_t makeHash(const std::string& key);
+
+   private:
+    static constexpr int32_t CHUNK_SIZE = 4;
+    static constexpr uint32_t C1 = 0xcc9e2d51;
+    static constexpr uint32_t C2 = 0x1b873593;
+    uint32_t seed;
+
+    static uint32_t fmix(uint32_t h);
+    static uint32_t mixK1(uint32_t k1);
+    static uint32_t mixH1(uint32_t h1, uint32_t k1);
+    static uint32_t rotate_left(uint32_t x, uint8_t r);
+    uint32_t makeHash(const void* key, const int64_t len);
+};
+}  // namespace pulsar
+
+#endif /* MURMUR3_32_HASH_HPP_ */
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 825bcc0..2c97ab4 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -56,14 +56,14 @@ 
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client,
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
     switch (conf_.getPartitionsRoutingMode()) {
         case ProducerConfiguration::RoundRobinDistribution:
-            return boost::make_shared<RoundRobinMessageRouter>();
+            return 
boost::make_shared<RoundRobinMessageRouter>(conf_.getHashingScheme());
         case ProducerConfiguration::CustomPartition:
             return conf_.getMessageRouterPtr();
         case ProducerConfiguration::UseSinglePartition:
         default:
             unsigned int random = rand();
-            return boost::make_shared<SinglePartitionMessageRouter>(random %
-                                                                    
topicMetadata_->getNumPartitions());
+            return boost::make_shared<SinglePartitionMessageRouter>(
+                random % topicMetadata_->getNumPartitions(), 
conf_.getHashingScheme());
     }
 }
 
@@ -160,8 +160,7 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
 
 /*
  * if createProducerCallback is set, it means the closeAsync is called from 
CreateProducer API which failed to
- * create
- * one or many producers for partitions. So, we have to notify with ERROR on 
createProducerFailure
+ * create one or many producers for partitions. So, we have to notify with 
ERROR on createProducerFailure
  */
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
     int producerIndex = 0;
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 011d859..c010a9f 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -104,6 +104,15 @@ const MessageRoutingPolicyPtr& 
ProducerConfiguration::getMessageRouterPtr() cons
     return impl_->messageRouter;
 }
 
+ProducerConfiguration& ProducerConfiguration::setHashingScheme(const 
HashingScheme& scheme) {
+    impl_->hashingScheme = scheme;
+    return *this;
+}
+
+ProducerConfiguration::HashingScheme ProducerConfiguration::getHashingScheme() 
const {
+    return impl_->hashingScheme;
+}
+
 ProducerConfiguration& ProducerConfiguration::setBlockIfQueueFull(bool flag) {
     impl_->blockIfQueueFull = flag;
     return *this;
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index f211fb7..11fe7cc 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -35,6 +35,7 @@ struct ProducerConfigurationImpl {
     int maxPendingMessagesAcrossPartitions;
     ProducerConfiguration::PartitionsRoutingMode routingMode;
     MessageRoutingPolicyPtr messageRouter;
+    ProducerConfiguration::HashingScheme hashingScheme;
     bool blockIfQueueFull;
     bool batchingEnabled;
     unsigned int batchingMaxMessages;
@@ -46,6 +47,7 @@ struct ProducerConfigurationImpl {
           maxPendingMessages(1000),
           maxPendingMessagesAcrossPartitions(50000),
           routingMode(ProducerConfiguration::UseSinglePartition),
+          hashingScheme(ProducerConfiguration::BoostHash),
           blockIfQueueFull(false),
           batchingEnabled(false),
           batchingMaxMessages(1000),
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
index af8f49a..c47fb23 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
@@ -19,7 +19,8 @@
 #include "RoundRobinMessageRouter.h"
 
 namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {}
+RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme
 hashingScheme)
+    : MessageRouterBase(hashingScheme), prevPartition_(0) {}
 
 RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
 
@@ -27,12 +28,12 @@ RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
 int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
     // if message has a key, hash the key and return the partition
     if (msg.hasPartitionKey()) {
-        static StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
+        return hash->makeHash(msg.getPartitionKey()) % 
topicMetadata.getNumPartitions();
     } else {
         Lock lock(mutex_);
         // else pick the next partition
         return prevPartition_++ % topicMetadata.getNumPartitions();
     }
 }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h 
b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 1fe337b..57c27b4 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -20,14 +20,16 @@
 #define PULSAR_RR_MESSAGE_ROUTER_HEADER_
 
 #include <pulsar/MessageRoutingPolicy.h>
+#include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
-#include <boost/functional/hash.hpp>
 #include <boost/thread/mutex.hpp>
+#include "Hash.h"
+#include "MessageRouterBase.h"
 
 namespace pulsar {
-class RoundRobinMessageRouter : public MessageRoutingPolicy {
+class RoundRobinMessageRouter : public MessageRouterBase {
    public:
-    RoundRobinMessageRouter();
+    RoundRobinMessageRouter(ProducerConfiguration::HashingScheme 
hashingScheme);
     virtual ~RoundRobinMessageRouter();
     virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
 
@@ -35,7 +37,6 @@ class RoundRobinMessageRouter : public MessageRoutingPolicy {
     boost::mutex mutex_;
     unsigned int prevPartition_;
 };
-typedef boost::hash<std::string> StringHash;
 typedef boost::unique_lock<boost::mutex> Lock;
 }  // namespace pulsar
 #endif  // PULSAR_RR_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc 
b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
index fca97b5..1e5ab75 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
+++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
@@ -20,7 +20,9 @@
 
 namespace pulsar {
 SinglePartitionMessageRouter::~SinglePartitionMessageRouter() {}
-SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int 
partitionIndex) {
+SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int 
partitionIndex,
+                                                           
ProducerConfiguration::HashingScheme hashingScheme)
+    : MessageRouterBase(hashingScheme) {
     selectedSinglePartition_ = partitionIndex;
 }
 
@@ -28,8 +30,7 @@ 
SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIn
 int SinglePartitionMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
     // if message has a key, hash the key and return the partition
     if (msg.hasPartitionKey()) {
-        StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
+        return hash->makeHash(msg.getPartitionKey()) % 
topicMetadata.getNumPartitions();
     } else {
         // else pick the next partition
         return selectedSinglePartition_;
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h 
b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
index 453f9ea..409d31a 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
@@ -20,20 +20,23 @@
 #define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
 
 #include <pulsar/MessageRoutingPolicy.h>
+#include <include/pulsar/ProducerConfiguration.h>
+#include "Hash.h"
 #include <pulsar/TopicMetadata.h>
-#include <boost/functional/hash.hpp>
+#include "MessageRouterBase.h"
 
 namespace pulsar {
 
-class SinglePartitionMessageRouter : public MessageRoutingPolicy {
+class SinglePartitionMessageRouter : public MessageRouterBase {
    public:
-    explicit SinglePartitionMessageRouter(int partitionIndex);
-    typedef boost::hash<std::string> StringHash;
+    SinglePartitionMessageRouter(const int partitionIndex,
+                                 ProducerConfiguration::HashingScheme 
hashingScheme);
     virtual ~SinglePartitionMessageRouter();
     virtual int getPartition(const Message& msg, const TopicMetadata& 
topicMetadata);
 
    private:
     int selectedSinglePartition_;
 };
+
 }  // namespace pulsar
 #endif  // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/tests/HashTest.cc 
b/pulsar-client-cpp/tests/HashTest.cc
new file mode 100644
index 0000000..97dcb90
--- /dev/null
+++ b/pulsar-client-cpp/tests/HashTest.cc
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+#include <boost/functional/hash.hpp>
+
+#include "../lib/BoostHash.h"
+#include "../lib/JavaStringHash.h"
+#include "../lib/Murmur3_32Hash.h"
+
+using ::testing::AtLeast;
+using ::testing::Return;
+using ::testing::ReturnRef;
+
+using namespace pulsar;
+
+TEST(HashTest, testBoostHash) {
+    BoostHash hash;
+    boost::hash<std::string> boostHash;
+
+    std::string key1 = "key1";
+    std::string key2 = "key2";
+
+    ASSERT_EQ(boostHash(key1) & std::numeric_limits<int32_t>::max(), 
hash.makeHash(key1));
+    ASSERT_EQ(boostHash(key2) & std::numeric_limits<int32_t>::max(), 
hash.makeHash(key2));
+}
+
+TEST(HashTest, testJavaStringHash) {
+    JavaStringHash hash;
+
+    // Calculating `hashCode()` makes overflow as unsigned int32.
+    std::string key1 = "keykeykeykeykey1";
+
+    // `hashCode()` is negative as signed int32.
+    std::string key2 = "keykeykey2";
+
+    // Same as Java client
+    ASSERT_EQ(434058482, hash.makeHash(key1));
+    ASSERT_EQ(42978643, hash.makeHash(key2));
+}
+
+TEST(HashTest, testMurmur3_32Hash) {
+    Murmur3_32Hash hash;
+    std::string key1 = "key1";
+    std::string key2 = "key2";
+
+    // Same value as Java client
+    ASSERT_EQ(462881061, hash.makeHash(key1));
+    ASSERT_EQ(1936800180, hash.makeHash(key2));
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc 
b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
index 9542353..431dd6b 100644
--- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
+++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
@@ -17,8 +17,10 @@
  * under the License.
  */
 #include <pulsar/Client.h>
+#include <pulsar/ProducerConfiguration.h>
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
+#include <boost/functional/hash.hpp>
 
 #include "tests/mocks/GMockMessage.h"
 
@@ -37,8 +39,8 @@ TEST(RoundRobinMessageRouterTest, 
DISABLED_getPartitionWithoutPartitionKey) {
     const int numPartitions1 = 5;
     const int numPartitions2 = 3;
 
-    RoundRobinMessageRouter router1;
-    RoundRobinMessageRouter router2;
+    RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash);
+    RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash);
 
     GMockMessage message;
     EXPECT_CALL(message, 
hasPartitionKey()).Times(20).WillRepeatedly(Return(false));
@@ -52,7 +54,7 @@ TEST(RoundRobinMessageRouterTest, 
DISABLED_getPartitionWithoutPartitionKey) {
 TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
     const int numPartitons = 1234;
 
-    RoundRobinMessageRouter router;
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash);
 
     std::string partitionKey1 = "key1";
     std::string partitionKey2 = "key2";
diff --git a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc 
b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc
index 9457acc..b62d286 100644
--- a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc
+++ b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc
@@ -17,6 +17,8 @@
  * under the License.
  */
 #include <pulsar/Client.h>
+#include <pulsar/ProducerConfiguration.h>
+#include <boost/functional/hash.hpp>
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
 
@@ -36,7 +38,7 @@ using namespace pulsar;
 TEST(SinglePartitionMessageRouterTest, 
DISABLED_getPartitionWithoutPartitionKey) {
     const int selectedPartition = 1234;
 
-    SinglePartitionMessageRouter router(selectedPartition);
+    SinglePartitionMessageRouter router(selectedPartition, 
ProducerConfiguration::BoostHash);
 
     GMockMessage message;
     EXPECT_CALL(message, hasPartitionKey()).Times(1).WillOnce(Return(false));
@@ -48,7 +50,7 @@ TEST(SinglePartitionMessageRouterTest, 
DISABLED_getPartitionWithoutPartitionKey)
 TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
     const int numPartitons = 1234;
 
-    SinglePartitionMessageRouter router(1);
+    SinglePartitionMessageRouter router(1, ProducerConfiguration::BoostHash);
 
     std::string partitionKey1 = "key1";
     std::string partitionKey2 = "key2";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 08b73c2..678e3d5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -45,6 +45,7 @@ public class ProducerConfiguration implements Serializable {
     private int maxPendingMessages = 1000;
     private int maxPendingMessagesAcrossPartitions = 50000;
     private MessageRoutingMode messageRouteMode = 
MessageRoutingMode.SinglePartition;
+    private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
     private MessageRouter customMessageRouter = null;
     private long batchingMaxPublishDelayMs = 10;
     private int batchingMaxMessages = 1000;
@@ -64,6 +65,11 @@ public class ProducerConfiguration implements Serializable {
         SinglePartition, RoundRobinPartition, CustomPartition
     }
 
+    public enum HashingScheme {
+        JavaStringHash,
+        Murmur3_32Hash
+    }
+
     private ProducerCryptoFailureAction cryptoFailureAction = 
ProducerCryptoFailureAction.FAIL;
 
     /**
@@ -136,6 +142,15 @@ public class ProducerConfiguration implements Serializable 
{
         return this;
     }
 
+    public HashingScheme getHashingScheme() {
+        return hashingScheme;
+    }
+
+    public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) 
{
+        this.hashingScheme = hashingScheme;
+        return this;
+    }
+
     /**
      *
      * @return the maximum number of pending messages allowed across all the 
partitions
@@ -171,7 +186,7 @@ public class ProducerConfiguration implements Serializable {
      * message queue is full.
      * <p>
      * Default is <code>false</code>. If set to <code>false</code>, send 
operations will immediately fail with
-     * {@link ProducerQueueIsFullError} when there is no space left in pending 
queue.
+     * {@link PulsarClientException.ProducerQueueIsFullError} when there is no 
space left in pending queue.
      *
      * @param blockIfQueueFull
      *            whether to block {@link Producer#send} and {@link 
Producer#sendAsync} operations on queue full
@@ -481,7 +496,8 @@ public class ProducerConfiguration implements Serializable {
             ProducerConfiguration other = (ProducerConfiguration) obj;
             return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs)
                     && Objects.equal(maxPendingMessages, 
other.maxPendingMessages)
-                    && Objects.equal(this.messageRouteMode, 
other.messageRouteMode);
+                    && Objects.equal(this.messageRouteMode, 
other.messageRouteMode)
+                    && Objects.equal(this.hashingScheme, other.hashingScheme);
         }
 
         return false;
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
similarity index 54%
copy from pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
index af8f49a..31c771a 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java
@@ -16,23 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include "RoundRobinMessageRouter.h"
+package org.apache.pulsar.client.impl;
 
-namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {}
-
-RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
-
-// override
-int RoundRobinMessageRouter::getPartition(const Message& msg, const 
TopicMetadata& topicMetadata) {
-    // if message has a key, hash the key and return the partition
-    if (msg.hasPartitionKey()) {
-        static StringHash hash;
-        return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
-    }
+public interface Hash {
+    /**
+     * Generate the hash of a given String
+     *
+     * @return The hash of {@param s}, which is non-negative integer.
+     */
+    int makeHash(String s);
 }
-}  // namespace pulsar
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
similarity index 55%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
index e1ab878..2f6fc28 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java
@@ -18,26 +18,17 @@
  */
 package org.apache.pulsar.client.impl;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.TopicMetadata;
+public class JavaStringHash implements Hash {
+    private static final JavaStringHash instance = new JavaStringHash();
 
-public class SinglePartitionMessageRouterImpl implements MessageRouter {
+    private JavaStringHash(){ }
 
-    private final int partitionIndex;
-
-    public SinglePartitionMessageRouterImpl(int partitionIndex) {
-        this.partitionIndex = partitionIndex;
+    public static Hash getInstance() {
+        return instance;
     }
 
     @Override
-    public int choosePartition(Message msg, TopicMetadata metadata) {
-        // If the message has a key, it supersedes the single partition 
routing policy
-        if (msg.hasKey()) {
-            return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % 
metadata.numPartitions());
-        }
-
-        return partitionIndex;
+    public int makeHash(String s) {
+        return s.hashCode() & Integer.MAX_VALUE;
     }
-
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
similarity index 58%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
index e1ab878..4825dab 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
@@ -18,26 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.ProducerConfiguration;
 
-public class SinglePartitionMessageRouterImpl implements MessageRouter {
+public abstract class MessageRouterBase implements MessageRouter {
+    protected final Hash hash;
 
-    private final int partitionIndex;
-
-    public SinglePartitionMessageRouterImpl(int partitionIndex) {
-        this.partitionIndex = partitionIndex;
-    }
-
-    @Override
-    public int choosePartition(Message msg, TopicMetadata metadata) {
-        // If the message has a key, it supersedes the single partition 
routing policy
-        if (msg.hasKey()) {
-            return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % 
metadata.numPartitions());
+    MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) {
+        switch (hashingScheme) {
+            case JavaStringHash:
+                this.hash = JavaStringHash.getInstance();
+                break;
+            case Murmur3_32Hash:
+            default:
+                this.hash = Murmur3_32Hash.getInstance();
         }
-
-        return partitionIndex;
     }
-
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
new file mode 100644
index 0000000..80552da
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * The original MurmurHash3 was written by Austin Appleby, and is placed in the
+ * public domain. This source code, implemented by Licht Takeuchi, is based on
+ * the orignal MurmurHash3 source code.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.primitives.UnsignedBytes;
+
+public class Murmur3_32Hash implements Hash {
+    private static final Murmur3_32Hash instance = new Murmur3_32Hash();
+
+    private static final int CHUNK_SIZE = 4;
+    private static final int C1 = 0xcc9e2d51;
+    private static final int C2 = 0x1b873593;
+    private final int seed;
+
+    private Murmur3_32Hash() {
+        seed = 0;
+    }
+
+    public static Hash getInstance() {
+        return instance;
+    }
+
+    @Override
+    public int makeHash(String s) {
+        return makeHash(s.getBytes(StandardCharsets.UTF_8)) & 
Integer.MAX_VALUE;
+    }
+
+    private int makeHash(byte[] bytes) {
+        int len = bytes.length;
+        int reminder = len % CHUNK_SIZE;
+        int h1 = seed;
+
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+        byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+        while (byteBuffer.remaining() >= CHUNK_SIZE) {
+            int k1 = byteBuffer.getInt();
+
+            k1 = mixK1(k1);
+            h1 = mixH1(h1, k1);
+        }
+
+        int k1 = 0;
+        for (int i = 0; i < reminder; i++) {
+            k1 ^= UnsignedBytes.toInt(byteBuffer.get()) << (i * 8);
+        }
+
+        h1 ^= mixK1(k1);
+        h1 ^= len;
+        h1 = fmix(h1);
+
+        return h1;
+    }
+
+    private int fmix(int h) {
+        h ^= h >>> 16;
+        h *= 0x85ebca6b;
+        h ^= h >>> 13;
+        h *= 0xc2b2ae35;
+        h ^= h >>> 16;
+
+        return h;
+    }
+
+    private int mixK1(int k1) {
+        k1 *= C1;
+        k1 = Integer.rotateLeft(k1, 15);
+        k1 *= C2;
+        return k1;
+    }
+
+    private int mixH1(int h1, int k1) {
+        h1 ^= k1;
+        h1 = Integer.rotateLeft(h1, 13);
+        return h1 * 5 + 0xe6546b64;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 04269f3..fe9ae6c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -75,12 +75,12 @@ public class PartitionedProducerImpl extends ProducerBase {
             messageRouter = customMessageRouter;
             break;
         case RoundRobinPartition:
-            messageRouter = new RoundRobinPartitionMessageRouterImpl();
+            messageRouter = new 
RoundRobinPartitionMessageRouterImpl(conf.getHashingScheme());
             break;
         case SinglePartition:
         default:
             messageRouter = new SinglePartitionMessageRouterImpl(
-                
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()));
+                
ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), 
conf.getHashingScheme());
         }
 
         return messageRouter;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index fc56500..a7c25c1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -21,16 +21,17 @@ package org.apache.pulsar.client.impl;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.TopicMetadata;
 
-public class RoundRobinPartitionMessageRouterImpl implements MessageRouter {
+public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
 
     private static final 
AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> 
PARTITION_INDEX_UPDATER =
             
AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class,
 "partitionIndex");
     private volatile int partitionIndex = 0;
 
-    public RoundRobinPartitionMessageRouterImpl() {
+    public 
RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme 
hashingScheme) {
+        super(hashingScheme);
         PARTITION_INDEX_UPDATER.set(this, 0);
     }
 
@@ -38,8 +39,9 @@ public class RoundRobinPartitionMessageRouterImpl implements 
MessageRouter {
     public int choosePartition(Message msg, TopicMetadata topicMetadata) {
         // If the message has a key, it supersedes the round robin routing 
policy
         if (msg.hasKey()) {
-            return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % 
topicMetadata.numPartitions());
+            return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
         }
+
         return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & 
Integer.MAX_VALUE) % topicMetadata.numPartitions());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index e1ab878..3a0cd54 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
@@ -19,14 +19,15 @@
 package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.TopicMetadata;
 
-public class SinglePartitionMessageRouterImpl implements MessageRouter {
+public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
 
     private final int partitionIndex;
 
-    public SinglePartitionMessageRouterImpl(int partitionIndex) {
+    public SinglePartitionMessageRouterImpl(int partitionIndex, 
ProducerConfiguration.HashingScheme hashingScheme) {
+        super(hashingScheme);
         this.partitionIndex = partitionIndex;
     }
 
@@ -34,7 +35,7 @@ public class SinglePartitionMessageRouterImpl implements 
MessageRouter {
     public int choosePartition(Message msg, TopicMetadata metadata) {
         // If the message has a key, it supersedes the single partition 
routing policy
         if (msg.hasKey()) {
-            return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % 
metadata.numPartitions());
+            return hash.makeHash(msg.getKey()) % metadata.numPartitions();
         }
 
         return partitionIndex;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java
similarity index 53%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
copy to pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java
index e1ab878..f53205d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java
@@ -18,26 +18,32 @@
  */
 package org.apache.pulsar.client.impl;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.TopicMetadata;
+import org.testng.annotations.Test;
 
-public class SinglePartitionMessageRouterImpl implements MessageRouter {
+import static org.testng.Assert.*;
 
-    private final int partitionIndex;
+public class HashTest {
+    @Test
+    public void javaStringHashTest() {
+        Hash h = JavaStringHash.getInstance();
 
-    public SinglePartitionMessageRouterImpl(int partitionIndex) {
-        this.partitionIndex = partitionIndex;
-    }
+        // Calculating `hashCode()` makes overflow as unsigned int32.
+        String key1 = "keykeykeykeykey1";
 
-    @Override
-    public int choosePartition(Message msg, TopicMetadata metadata) {
-        // If the message has a key, it supersedes the single partition 
routing policy
-        if (msg.hasKey()) {
-            return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % 
metadata.numPartitions());
-        }
+        // `hashCode()` is negative as signed int32.
+        String key2 = "keykeykey2";
 
-        return partitionIndex;
+        // Same value as C++ client
+        assertEquals(434058482, h.makeHash(key1));
+        assertEquals(42978643, h.makeHash(key2));
     }
 
+    @Test
+    public void murmur3_32HashTest() {
+        Hash h = Murmur3_32Hash.getInstance();
+
+        // Same value as C++ client
+        assertEquals(462881061, h.makeHash("key1"));
+        assertEquals(1936800180, h.makeHash("key2"));
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
index e3cb6c0..d0e4d51 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
@@ -23,6 +23,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.testng.annotations.Test;
 
 /**
@@ -35,7 +36,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
         Message msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl();
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
         for (int i = 0; i < 10; i++) {
             assertEquals(i % 5, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
         }
@@ -52,7 +53,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl();
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, 
metadata));
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
index 7b5ea4e..c8ea0b0 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.testng.annotations.Test;
 
 /**
@@ -35,7 +36,7 @@ public class SinglePartitionMessageRouterImplTest {
         Message msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        SinglePartitionMessageRouterImpl router = new 
SinglePartitionMessageRouterImpl(1234);
+        SinglePartitionMessageRouterImpl router = new 
SinglePartitionMessageRouterImpl(1234, 
ProducerConfiguration.HashingScheme.JavaStringHash);
         assertEquals(1234, router.choosePartition(msg, new 
TopicMetadataImpl(2468)));
     }
 
@@ -50,7 +51,7 @@ public class SinglePartitionMessageRouterImplTest {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        SinglePartitionMessageRouterImpl router = new 
SinglePartitionMessageRouterImpl(1234);
+        SinglePartitionMessageRouterImpl router = new 
SinglePartitionMessageRouterImpl(1234, 
ProducerConfiguration.HashingScheme.JavaStringHash);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, 
metadata));

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to