Updated Branches: refs/heads/trunk dafcaeb06 -> f41684fde
add Murmur3Partitioner and make it default for new installations patch by Dave Brosius and Pavel Yaskevich; reviewed by Vijay for CASSANDRA-3772 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f41684fd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f41684fd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f41684fd Branch: refs/heads/trunk Commit: f41684fdef7a9e8628cb40f66c13a88fcf7502e3 Parents: dafcaeb Author: Pavel Yaskevich <xe...@apache.org> Authored: Tue Aug 21 23:40:31 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Tue Aug 21 23:40:31 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 4 +- .../cassandra/dht/AbstractHashedPartitioner.java | 194 +++++++++++++++ .../apache/cassandra/dht/Murmur3Partitioner.java | 53 ++++ .../apache/cassandra/dht/RandomPartitioner.java | 160 +------------ 5 files changed, 255 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 426ac7d..8fe1770 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -38,6 +38,7 @@ * (cql3) Add support for 2ndary indexes (CASSANDRA-3680) * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477) * remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487) + * add Murmur3Partitioner and make it default for new installations (CASSANDRA-3772) 1.1.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 1b89b2e..5e45961 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -70,6 +70,8 @@ authority: org.apache.cassandra.auth.AllowAllAuthority # # - RandomPartitioner distributes rows across the cluster evenly by md5. # When in doubt, this is the best option. +# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128 +# Hash Function instead of md5 # - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows # scanning rows in key order, but the ordering can generate hot spots # for sequential insertion workloads. @@ -81,7 +83,7 @@ authority: org.apache.cassandra.auth.AllowAllAuthority # # See http://wiki.apache.org/cassandra/Operations for more on # partitioners and token selection. -partitioner: org.apache.cassandra.dht.RandomPartitioner +partitioner: org.apache.cassandra.dht.Murmur3Partitioner # directories where Cassandra should store data on disk. data_file_directories: http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java new file mode 100644 index 0000000..55dfb97 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.util.*; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.Pair; + +/** + * This class is the super class of classes that generate a BigIntegerToken using hash function. + */ +public abstract class AbstractHashedPartitioner extends AbstractPartitioner<BigIntegerToken> +{ + public static final BigInteger ZERO = new BigInteger("0"); + public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1"); + public static final BigInteger MAXIMUM = new BigInteger("2").pow(127); + + private static final byte DELIMITER_BYTE = ":".getBytes()[0]; + + /** + * returns a hash of the byte buffer in the range of 0 - 2**127 as a BigInteger + * + * @param buffer the buffer to hash + * @return the BigInteger hash value + */ + protected abstract BigInteger hash(ByteBuffer buffer); + + public DecoratedKey decorateKey(ByteBuffer key) + { + return new DecoratedKey(getToken(key), key); + } + + public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk) + { + // find the delimiter position + int splitPoint = -1; + for (int i = fromdisk.position(); i < fromdisk.limit(); i++) + { + if (fromdisk.get(i) == DELIMITER_BYTE) + { + splitPoint = i; + break; + } + } + assert splitPoint != -1; + + // and decode the token and key + String token = null; + try + { + token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position()); + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } + ByteBuffer key = fromdisk.duplicate(); + key.position(splitPoint + 1); + return new DecoratedKey(new BigIntegerToken(token), key); + } + + public Token midpoint(Token ltoken, Token rtoken) + { + // the symbolic MINIMUM token should act as ZERO: the empty bit array + BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token; + BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token; + Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127); + // discard the remainder + return new BigIntegerToken(midpair.left); + } + + public BigIntegerToken getMinimumToken() + { + return MINIMUM; + } + + public BigIntegerToken getRandomToken() + { + BigInteger token = hash(GuidGenerator.guidAsBytes()); + if ( token.signum() == -1 ) + token = token.multiply(BigInteger.valueOf(-1L)); + return new BigIntegerToken(token); + } + + private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() { + public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken) + { + return ByteBuffer.wrap(bigIntegerToken.token.toByteArray()); + } + + public Token<BigInteger> fromByteArray(ByteBuffer bytes) + { + return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes))); + } + + public String toString(Token<BigInteger> bigIntegerToken) + { + return bigIntegerToken.token.toString(); + } + + public void validate(String token) throws ConfigurationException + { + try + { + BigInteger i = new BigInteger(token); + if (i.compareTo(ZERO) < 0) + throw new ConfigurationException("Token must be >= 0"); + if (i.compareTo(MAXIMUM) > 0) + throw new ConfigurationException("Token must be <= 2**127"); + } + catch (NumberFormatException e) + { + throw new ConfigurationException(e.getMessage()); + } + } + + public Token<BigInteger> fromString(String string) + { + return new BigIntegerToken(new BigInteger(string)); + } + }; + + public Token.TokenFactory<BigInteger> getTokenFactory() + { + return tokenFactory; + } + + public boolean preservesOrder() + { + return false; + } + + public BigIntegerToken getToken(ByteBuffer key) + { + if (key.remaining() == 0) + return MINIMUM; + return new BigIntegerToken(hash(key)); + } + + public Map<Token, Float> describeOwnership(List<Token> sortedTokens) + { + Map<Token, Float> ownerships = new HashMap<Token, Float>(); + Iterator i = sortedTokens.iterator(); + + // 0-case + if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); } + // 1-case + if (sortedTokens.size() == 1) { + ownerships.put((Token)i.next(), new Float(1.0)); + } + // n-case + else { + // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers. + final BigInteger ri = MAXIMUM; // (used for addition later) + final BigDecimal r = new BigDecimal(ri); // The entire range, 2**127 + Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token; // The first token and its value + Token t; BigInteger tim1 = ti; // The last token and its value (after loop) + while (i.hasNext()) { + t = (Token)i.next(); ti = ((BigIntegerToken)t).token; // The next token and its value + float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R + ownerships.put(t, x); // save (T(i) -> %age) + tim1 = ti; // -> advance loop + } + // The start token's range extends backward to the last token, which is why both were saved above. + float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue(); + ownerships.put(start, x); + } + return ownerships; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java new file mode 100644 index 0000000..775cef9 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.MurmurHash; + +/** + * This class generates a BigIntegerToken using a Murmur3 hash. + */ +public class Murmur3Partitioner extends AbstractHashedPartitioner +{ + protected BigInteger hash(ByteBuffer buffer) + { + long[] bufferHash = MurmurHash.hash3_x64_128(buffer, buffer.position(), buffer.remaining(), 0); + byte[] hashBytes = new byte[16]; + + writeLong(bufferHash[0], hashBytes, 0); + writeLong(bufferHash[1], hashBytes, 8); + // make sure it's positive, this isn't the same as abs() but doesn't effect distribution + hashBytes[0] = (byte) (hashBytes[0] & 0x7F); + return new BigInteger(hashBytes); + } + + public static void writeLong(long src, byte[] dest, int offset) + { + dest[offset] = (byte) (src >> 56); + dest[offset + 1] = (byte) (src >> 48); + dest[offset + 2] = (byte) (src >> 40); + dest[offset + 3] = (byte) (src >> 32); + dest[offset + 4] = (byte) (src >> 24); + dest[offset + 5] = (byte) (src >> 16); + dest[offset + 6] = (byte) (src >> 8); + dest[offset + 7] = (byte) (src); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index cf3855b..5b95454 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -17,170 +17,18 @@ */ package org.apache.cassandra.dht; -import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.*; -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.GuidGenerator; -import org.apache.cassandra.utils.Pair; /** - * This class generates a BigIntegerToken using MD5 hash. + * This class generates a BigIntegerToken using a MD5 hash. */ -public class RandomPartitioner extends AbstractPartitioner<BigIntegerToken> +public class RandomPartitioner extends AbstractHashedPartitioner { - public static final BigInteger ZERO = new BigInteger("0"); - public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1"); - public static final BigInteger MAXIMUM = new BigInteger("2").pow(127); - - private static final byte DELIMITER_BYTE = ":".getBytes()[0]; - - public DecoratedKey decorateKey(ByteBuffer key) - { - return new DecoratedKey(getToken(key), key); - } - - public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk) - { - // find the delimiter position - int splitPoint = -1; - for (int i = fromdisk.position(); i < fromdisk.limit(); i++) - { - if (fromdisk.get(i) == DELIMITER_BYTE) - { - splitPoint = i; - break; - } - } - assert splitPoint != -1; - - // and decode the token and key - String token = null; - try - { - token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position()); - } - catch (CharacterCodingException e) - { - throw new RuntimeException(e); - } - ByteBuffer key = fromdisk.duplicate(); - key.position(splitPoint + 1); - return new DecoratedKey(new BigIntegerToken(token), key); - } - - public Token midpoint(Token ltoken, Token rtoken) + protected BigInteger hash(ByteBuffer buffer) { - // the symbolic MINIMUM token should act as ZERO: the empty bit array - BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token; - BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token; - Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127); - // discard the remainder - return new BigIntegerToken(midpair.left); - } - - public BigIntegerToken getMinimumToken() - { - return MINIMUM; - } - - public BigIntegerToken getRandomToken() - { - BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes()); - if ( token.signum() == -1 ) - token = token.multiply(BigInteger.valueOf(-1L)); - return new BigIntegerToken(token); - } - - private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() { - public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken) - { - return ByteBuffer.wrap(bigIntegerToken.token.toByteArray()); - } - - public Token<BigInteger> fromByteArray(ByteBuffer bytes) - { - return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes))); - } - - public String toString(Token<BigInteger> bigIntegerToken) - { - return bigIntegerToken.token.toString(); - } - - public void validate(String token) throws ConfigurationException - { - try - { - BigInteger i = new BigInteger(token); - if (i.compareTo(ZERO) < 0) - throw new ConfigurationException("Token must be >= 0"); - if (i.compareTo(MAXIMUM) > 0) - throw new ConfigurationException("Token must be <= 2**127"); - } - catch (NumberFormatException e) - { - throw new ConfigurationException(e.getMessage()); - } - } - - public Token<BigInteger> fromString(String string) - { - return new BigIntegerToken(new BigInteger(string)); - } - }; - - public Token.TokenFactory<BigInteger> getTokenFactory() - { - return tokenFactory; - } - - public boolean preservesOrder() - { - return false; - } - - public BigIntegerToken getToken(ByteBuffer key) - { - if (key.remaining() == 0) - return MINIMUM; - return new BigIntegerToken(FBUtilities.hashToBigInteger(key)); - } - - public Map<Token, Float> describeOwnership(List<Token> sortedTokens) - { - Map<Token, Float> ownerships = new HashMap<Token, Float>(); - Iterator i = sortedTokens.iterator(); - - // 0-case - if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); } - // 1-case - if (sortedTokens.size() == 1) { - ownerships.put((Token)i.next(), new Float(1.0)); - } - // n-case - else { - // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers. - final BigInteger ri = MAXIMUM; // (used for addition later) - final BigDecimal r = new BigDecimal(ri); // The entire range, 2**127 - Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token; // The first token and its value - Token t; BigInteger tim1 = ti; // The last token and its value (after loop) - while (i.hasNext()) { - t = (Token)i.next(); ti = ((BigIntegerToken)t).token; // The next token and its value - float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R - ownerships.put(t, x); // save (T(i) -> %age) - tim1 = ti; // -> advance loop - } - // The start token's range extends backward to the last token, which is why both were saved above. - float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue(); - ownerships.put(start, x); - } - return ownerships; + return FBUtilities.hashToBigInteger(buffer); } }