Author: slebresne Date: Mon May 23 08:51:54 2011 New Revision: 1126389 URL: http://svn.apache.org/viewvc?rev=1126389&view=rev Log: Assert ranges are not overlapping in AbstractBounds.normalize patch by stuhood; reviewed by slebresne for CASSANDRA-2641
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/dht/AbstractBounds.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1126389&r1=1126388&r2=1126389&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon May 23 08:51:54 2011 @@ -15,6 +15,7 @@ * add DROP INDEX support to CLI (CASSANDRA-2616) * don't perform HH to client-mode [storageproxy] nodes (CASSANDRA-2668) * Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659) + * Assert ranges are not overlapping in AB.normalize (CASSANDRA-2641) 0.8.0-final Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/dht/AbstractBounds.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=1126389&r1=1126388&r2=1126389&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/dht/AbstractBounds.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/dht/AbstractBounds.java Mon May 23 08:51:54 2011 @@ -90,13 +90,21 @@ public abstract class AbstractBounds imp /** * @return A copy of the given list of non-intersecting bounds with all bounds unwrapped, sorted by bound.left. + * This method does not allow overlapping ranges as input. */ public static List<AbstractBounds> normalize(Collection<? extends AbstractBounds> bounds) { // unwrap all List<AbstractBounds> output = new ArrayList<AbstractBounds>(); + AbstractBounds previous = null; for (AbstractBounds bound : bounds) - output.addAll(bound.unwrap()); + { + List<AbstractBounds> unwrapped = bound.unwrap(); + assert previous == null || previous.right.compareTo(unwrapped.get(0).left) <= 0 : + "Overlapping ranges passed to normalize: see CASSANDRA-2461: " + previous + " and " + unwrapped; + output.addAll(unwrapped); + previous = unwrapped.get(unwrapped.size() - 1); + } // sort by left Collections.sort(output, new Comparator<AbstractBounds>() Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1126389&r1=1126388&r2=1126389&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Mon May 23 08:51:54 2011 @@ -115,90 +115,90 @@ public class StreamingTransferTest exten @Test public void testTransferTableMultiple() throws Exception { - // write a temporary SSTable, but don't register it + // write temporary SSTables, but don't register them Set<String> content = new HashSet<String>(); - content.add("transfer1"); - content.add("transfer2"); - content.add("transfer3"); + content.add("test"); + content.add("test2"); + content.add("test3"); SSTableReader sstable = SSTableUtils.prepare().write(content); String tablename = sstable.getTableName(); String cfname = sstable.getColumnFamilyName(); - Set<String> content2 = new HashSet<String>(); - content2.add("test"); - content2.add("test2"); - content2.add("test3"); - SSTableReader sstable2 = SSTableUtils.prepare().write(content2); + content = new HashSet<String>(); + content.add("transfer1"); + content.add("transfer2"); + content.add("transfer3"); + SSTableReader sstable2 = SSTableUtils.prepare().write(content); // transfer the first and last key IPartitioner p = StorageService.getPartitioner(); List<Range> ranges = new ArrayList<Range>(); - ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("transfer1")))); - ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("test2")), p.getMinimumToken())); + ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("test")))); + ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("transfer2")), p.getMinimumToken())); StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null); StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP); session.await(); - // confirm that the SSTable was transferred and registered + // confirm that the sstables were transferred and registered and that 2 keys arrived ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname); List<Row> rows = Util.getRangeSlice(cfstore); - assertEquals(6, rows.size()); + assertEquals(2, rows.size()); assert rows.get(0).key.key.equals(ByteBufferUtil.bytes("test")); - assert rows.get(3).key.key.equals(ByteBufferUtil.bytes("transfer1")); + assert rows.get(1).key.key.equals(ByteBufferUtil.bytes("transfer3")); assert rows.get(0).cf.getColumnsMap().size() == 1; - assert rows.get(3).cf.getColumnsMap().size() == 1; + assert rows.get(1).cf.getColumnsMap().size() == 1; - // these keys fall outside of the ranges and should not be transferred. - assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), new QueryPath("Standard1"))); - assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer3"), new QueryPath("Standard1"))); - - // and that the index and filter were properly recovered - assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test"), new QueryPath("Standard1"))); - assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), new QueryPath("Standard1"))); + // these keys fall outside of the ranges and should not be transferred + assert null == cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), new QueryPath("Standard1"))); + assert null == cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), new QueryPath("Standard1"))); + assert null == cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test2"), new QueryPath("Standard1"))); + assert null == cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test3"), new QueryPath("Standard1"))); } @Test public void testTransferOfMultipleColumnFamilies() throws Exception { - String keyspace = "Keyspace1"; + String keyspace = "KeyCacheSpace"; IPartitioner p = StorageService.getPartitioner(); String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" }; List<SSTableReader> ssTableReaders = new ArrayList<SSTableReader>(); - // ranges to transfer - List<Range> ranges = new ArrayList<Range>(); - + NavigableMap<DecoratedKey,String> keys = new TreeMap<DecoratedKey,String>(); for (String cf : columnFamilies) { Set<String> content = new HashSet<String>(); - content.add("data-" + cf + "-1"); content.add("data-" + cf + "-2"); content.add("data-" + cf + "-3"); - SSTableUtils.Context context = SSTableUtils.prepare().ks(keyspace).cf(cf); - ssTableReaders.add(context.write(content)); - ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("data-" + cf + "-3")))); + + // collect dks for each string key + for (String str : content) + keys.put(Util.dk(str), cf); } + // transfer the first and last keys + Map.Entry<DecoratedKey,String> first = keys.firstEntry(); + Map.Entry<DecoratedKey,String> last = keys.lastEntry(); + Map.Entry<DecoratedKey,String> secondtolast = keys.lowerEntry(last.getKey()); + List<Range> ranges = new ArrayList<Range>(); + ranges.add(new Range(p.getMinimumToken(), first.getKey().token)); + // the left hand side of the range is exclusive, so we transfer from the second-to-last token + ranges.add(new Range(secondtolast.getKey().token, p.getMinimumToken())); + StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null); StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP); session.await(); - for (String cf : columnFamilies) + // check that only two keys were transferred + for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last)) { - ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(cf); + ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(entry.getValue()); List<Row> rows = Util.getRangeSlice(store); - - assert rows.size() >= 3; - - for (int i = 0; i < 3; i++) - { - String expectedKey = "data-" + cf + "-" + (i + 1); - assertEquals(p.decorateKey(ByteBufferUtil.bytes(expectedKey)), rows.get(i).key); - } + assertEquals(rows.toString(), 1, rows.size()); + assertEquals(entry.getKey(), rows.get(0).key); } } }