Author: jbellis Date: Tue Jun 28 17:59:21 2011 New Revision: 1140760 URL: http://svn.apache.org/viewvc?rev=1140760&view=rev Log: revert r1133167 (#2280)
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jun 28 17:59:21 2011 @@ -50,14 +50,12 @@ * throttle migration replay (CASSANDRA-2714) * optimize column serializer creation (CASSANDRA-2716) * Added support for making bootstrap retry if nodes flap (CASSANDRA-2644) - * Added statusthrift to nodetool to report if thrift server is running - (CASSANDRA-2722) + * Added statusthrift to nodetool to report if thrift server is running (CASSANDRA-2722) * Fixed rows being cached if they do not exist (CASSANDRA-2723) * fix truncate/compaction race (CASSANDRA-2673) * Support passing tableName and cfName to RowCacheProviders (CASSANDRA-2702) * workaround large resultsets causing large allocation retention by nio sockets (CASSANDRA-2654) - * restrict repair streaming to specific columnfamilies (CASSANDRA-2280) * fix nodetool ring use with Ec2Snitch (CASSANDRA-2733) * fix inconsistency window during bootstrap (CASSANDRA-833) * fix removing columns and subcolumns that are supressed by a row or Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java Tue Jun 28 17:59:21 2011 @@ -253,7 +253,7 @@ public class Table } /** - * @return A list of open SSTableReaders + * @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these). */ public List<SSTableReader> getAllSSTables() { Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Tue Jun 28 17:59:21 2011 @@ -58,9 +58,7 @@ import org.cliffc.high_scale_lib.NonBloc public final class MessagingService implements MessagingServiceMBean { public static final int VERSION_07 = 1; - public static final int VERSION_080 = 2; - public static final int version_ = 81; - + public static final int version_ = 2; //TODO: make this parameter dynamic somehow. Not sure if config is appropriate. private SerializerType serializerType_ = SerializerType.BINARY; Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Jun 28 17:59:21 2011 @@ -494,7 +494,7 @@ public class AntiEntropyService StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback); StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES); // request ranges from the remote node - StreamIn.requestRanges(request.endpoint, request.cf.left, Collections.singletonList(cfstore), differences, callback, OperationType.AES); + StreamIn.requestRanges(request.endpoint, request.cf.left, differences, callback, OperationType.AES); } catch(Exception e) { Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Tue Jun 28 17:59:21 2011 @@ -2329,7 +2329,7 @@ public class StorageService implements I public void run() { // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP); + StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback, OperationType.UNBOOTSTRAP); } }); } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Jun 28 17:59:21 2011 @@ -24,11 +24,7 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; import java.util.Collection; -import java.util.Collections; -import java.util.ArrayList; -import java.util.List; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.gms.Gossiper; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -51,29 +47,22 @@ public class StreamIn { private static Logger logger = LoggerFactory.getLogger(StreamIn.class); - /** Request ranges for all column families in the given keyspace. */ - public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type) - { - requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type); - } - /** - * Request ranges to be transferred from specific CFs + * Request ranges to be transferred from source to local node */ - public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range> ranges, Runnable callback, OperationType type) + public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type) { assert ranges.size() > 0; if (logger.isDebugEnabled()) logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); StreamInSession session = StreamInSession.create(source, callback); - StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getLocalAddress(), - ranges, - tableName, - columnFamilies, - session.getSessionId(), - type); - Message message = srm.getMessage(Gossiper.instance.getVersion(source)); + Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), + ranges, + tableName, + session.getSessionId(), + type) + .getMessage(Gossiper.instance.getVersion(source)); MessagingService.instance().sendOneWay(message, source); } @@ -89,5 +78,5 @@ public class StreamIn Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, remote.desc.version)); return new PendingFile(localdesc, remote); - } + } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java Tue Jun 28 17:59:21 2011 @@ -22,21 +22,21 @@ package org.apache.cassandra.streaming; import java.io.IOError; import java.io.IOException; import java.net.InetAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -78,48 +78,74 @@ public class StreamOut private static Logger logger = LoggerFactory.getLogger(StreamOut.class); /** - * Stream the given ranges to the target endpoint from each CF in the given keyspace. + * Split out files for all tables on disk locally for each range and then stream them to the target endpoint. */ - public static void transferRanges(InetAddress target, Table table, Collection<Range> ranges, Runnable callback, OperationType type) + public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback, OperationType type) { - StreamOutSession session = StreamOutSession.create(table.name, target, callback); - transferRanges(session, table.getColumnFamilyStores(), ranges, type); + assert ranges.size() > 0; + + // this is so that this target shows up as a destination while anticompaction is happening. + StreamOutSession session = StreamOutSession.create(tableName, target, callback); + + logger.info("Beginning transfer to {}", target); + logger.debug("Ranges are {}", StringUtils.join(ranges, ",")); + + try + { + Table table = flushSSTable(tableName); + // send the matching portion of every sstable in the keyspace + transferSSTables(session, table.getAllSSTables(), ranges, type); + } + catch (IOException e) + { + throw new IOError(e); + } } /** - * Flushes matching column families from the given keyspace, or all columnFamilies - * if the cf list is empty. + * (1) dump all the memtables to disk. + * (2) determine the minimal file sections we need to send for the given ranges + * (3) transfer the data. */ - private static void flushSSTables(Iterable<ColumnFamilyStore> stores) throws IOException + private static Table flushSSTable(String tableName) throws IOException { - logger.info("Flushing memtables for {}...", stores); - List<Future<?>> flushes; - flushes = new ArrayList<Future<?>>(); - for (ColumnFamilyStore cfstore : stores) - { - Future<?> flush = cfstore.forceFlush(); - if (flush != null) - flushes.add(flush); + Table table = Table.open(tableName); + logger.info("Flushing memtables for {}...", tableName); + for (Future<?> f : table.flush()) + { + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } } - FBUtilities.waitOnFutures(flushes); + return table; } /** - * Stream the given ranges to the target endpoint from each of the given CFs. + * Split out files for all tables on disk locally for each range and then stream them to the target endpoint. */ - public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range> ranges, OperationType type) + public static void transferRangesForRequest(StreamOutSession session, Collection<Range> ranges, OperationType type) { assert ranges.size() > 0; logger.info("Beginning transfer to {}", session.getHost()); logger.debug("Ranges are {}", StringUtils.join(ranges, ",")); + try { - flushSSTables(cfses); - Iterable<SSTableReader> sstables = Collections.emptyList(); - for (ColumnFamilyStore cfStore : cfses) - sstables = Iterables.concat(sstables, cfStore.getSSTables()); - transferSSTables(session, sstables, ranges, type); + Table table = flushSSTable(session.table); + // send the matching portion of every sstable in the keyspace + List<PendingFile> pending = createPendingFiles(table.getAllSSTables(), ranges, type); + session.addFilesToStream(pending); + session.begin(); } catch (IOException e) { @@ -128,10 +154,9 @@ public class StreamOut } /** - * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint. - * You should probably call transferRanges instead. + * Transfers matching portions of a group of sstables from a single table to the target endpoint. */ - public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException + public static void transferSSTables(StreamOutSession session, Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException { List<PendingFile> pending = createPendingFiles(sstables, ranges, type); @@ -142,7 +167,7 @@ public class StreamOut } // called prior to sending anything. - private static List<PendingFile> createPendingFiles(Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) + private static List<PendingFile> createPendingFiles(Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type) { List<PendingFile> pending = new ArrayList<PendingFile>(); for (SSTableReader sstable : sstables) @@ -153,7 +178,7 @@ public class StreamOut continue; pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type)); } - logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables)); + logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size()); return pending; } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Jun 28 17:59:21 2011 @@ -23,13 +23,10 @@ package org.apache.cassandra.streaming; import java.io.*; import java.net.InetAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; -import com.google.common.collect.Iterables; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.ICompactSerializer; @@ -69,15 +66,13 @@ class StreamRequestMessage implements Me // if these are specified, file shoud not be. protected final Collection<Range> ranges; protected final String table; - protected final Iterable<ColumnFamilyStore> columnFamilies; protected final OperationType type; - StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type) + StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, long sessionId, OperationType type) { this.target = target; this.ranges = ranges; this.table = table; - this.columnFamilies = columnFamilies; this.sessionId = sessionId; this.type = type; file = null; @@ -91,7 +86,6 @@ class StreamRequestMessage implements Me this.type = file.type; ranges = null; table = null; - columnFamilies = null; } public Message getMessage(Integer version) @@ -116,8 +110,6 @@ class StreamRequestMessage implements Me { sb.append(table); sb.append("@"); - sb.append(columnFamilies.toString()); - sb.append("@"); sb.append(target); sb.append("------->"); for ( Range range : ranges ) @@ -154,16 +146,8 @@ class StreamRequestMessage implements Me { AbstractBounds.serializer().serialize(range, dos); } - if (version > MessagingService.VERSION_07) dos.writeUTF(srm.type.name()); - - if (version > MessagingService.VERSION_080) - { - dos.writeInt(Iterables.size(srm.columnFamilies)); - for (ColumnFamilyStore cfs : srm.columnFamilies) - dos.writeInt(cfs.metadata.cfId); - } } } @@ -189,16 +173,7 @@ class StreamRequestMessage implements Me OperationType type = OperationType.RESTORE_REPLICA_COUNT; if (version > MessagingService.VERSION_07) type = OperationType.valueOf(dis.readUTF()); - - List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>(); - if (version > MessagingService.VERSION_080) - { - int cfsSize = dis.readInt(); - for (int i = 0; i < cfsSize; ++i) - stores.add(Table.open(table).getColumnFamilyStore(dis.readInt())); - } - - return new StreamRequestMessage(target, ranges, table, stores, sessionId, type); + return new StreamRequestMessage(target, ranges, table, sessionId, type); } } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Jun 28 17:59:21 2011 @@ -51,7 +51,7 @@ public class StreamRequestVerbHandler im logger.debug(srm.toString()); StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(), srm.sessionId); - StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type); + StreamOut.transferRangesForRequest(session, srm.ranges, srm.type); } catch (IOException ex) { Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Tue Jun 28 17:59:21 2011 @@ -22,7 +22,6 @@ package org.apache.cassandra.streaming; import org.apache.cassandra.AbstractSerializationsTester; -import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryPath; @@ -42,7 +41,9 @@ import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; public class SerializationsTest extends AbstractSerializationsTester { @@ -145,8 +146,7 @@ public class SerializationsTest extends Collection<Range> ranges = new ArrayList<Range>(); for (int i = 0; i < 5; i++) ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5))))); - List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1")); - StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT); + StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L, OperationType.RESTORE_REPLICA_COUNT); StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L); StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L); Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1140760&r1=1140759&r2=1140760&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Jun 28 17:59:21 2011 @@ -23,10 +23,12 @@ import static junit.framework.Assert.ass import static org.apache.cassandra.Util.column; import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.columniterator.IdentityQueryFilter;