Author: jbellis Date: Tue Jun 28 12:31:51 2011 New Revision: 1140567 URL: http://svn.apache.org/viewvc?rev=1140567&view=rev Log: merge from 0.7
Modified: cassandra/branches/cassandra-0.8/ (props changed) cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/contrib/ (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Propchange: cassandra/branches/cassandra-0.8/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7:1026516-1140565 /cassandra/branches/cassandra-0.7.0:1053690-1055654 /cassandra/branches/cassandra-0.8:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0:1125021-1130369 Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1140567&r1=1140566&r2=1140567&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jun 28 12:31:51 2011 @@ -84,6 +84,8 @@ * fix repair hanging if a neighbor has nothing to send (CASSANDRA-2797) * purge tombstone even if row is in only one sstable (CASSANDRA-2801) * Fix wrong purge of deleted cf during compaction (CASSANDRA-2786) + * fix race that could result in Hadoop writer failing to throw an + exception encountered after close() (CASSANDRA-2755) 0.8.0-final @@ -202,9 +204,6 @@ * reduce contention on Table.flusherLock (CASSANDRA-1954) * try harder to detect failures during streaming, cleaning up temporary files more reliably (CASSANDRA-2088) - - -0.6.13 * shut down server for OOM on a Thrift thread (CASSANDRA-2269) * fix tombstone handling in repair and sstable2json (CASSANDRA-2279) * preserve version when streaming data from old sstables (CASSANDRA-2283) Propchange: cassandra/branches/cassandra-0.8/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 -/cassandra/branches/cassandra-0.7/contrib:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/contrib:1026516-1140565 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140565 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140565 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140565 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140565 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jun 28 12:31:51 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710,1138996,1140550 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140565 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1140567&r1=1140566&r2=1140567&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Tue Jun 28 12:31:51 2011 @@ -57,9 +57,9 @@ public class DebuggableThreadPoolExecuto super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory); allowCoreThreadTimeOut(true); - // preserve task serialization. this is more complicated than it needs to be, - // since TPE rejects if queue.offer reports a full queue. we'll just - // override this with a handler that retries until it gets in. ugly, but effective. + // block task submissions until queue has room. + // this is fighting TPE's design a bit because TPE rejects if queue.offer reports a full queue. + // we'll just override this with a handler that retries until it gets in. ugly, but effective. // (there is an extensive analysis of the options here at // http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html) this.setRejectedExecutionHandler(new RejectedExecutionHandler() Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1140567&r1=1140566&r2=1140567&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Jun 28 12:31:51 2011 @@ -92,7 +92,7 @@ public class HintedHandOffManager implem public static final String HINTS_CF = "HintsColumnFamily"; private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class); - private static final int PAGE_SIZE = 10000; + private static final int PAGE_SIZE = 1024; private static final String SEPARATOR = "-"; private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody. Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1140567&r1=1140566&r2=1140567&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jun 28 12:31:51 2011 @@ -729,7 +729,7 @@ public class CompactionManager implement goodRows++; } if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) - logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html"); + logger.warn("Index file contained a different key or row size; using key from data file"); } catch (Throwable th) { Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140567&r1=1140566&r2=1140567&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Jun 28 12:31:51 2011 @@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit; import org.apache.cassandra.client.RingCache; import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; import org.apache.thrift.transport.TSocket; -import org.apache.cassandra.utils.ByteBufferUtil; /** * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> @@ -150,27 +149,33 @@ implements org.apache.hadoop.mapred.Reco @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - close((org.apache.hadoop.mapred.Reporter)null); + close(); } /** Fills the deprecated RecordWriter interface for streaming. */ @Deprecated public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException { + close(); + } + + private void close() throws IOException + { + // close all the clients before throwing anything + IOException clientException = null; for (RangeClient client : clients.values()) - client.stopNicely(); - try { - for (RangeClient client : clients.values()) + try { - client.join(); client.close(); } + catch (IOException e) + { + clientException = e; + } } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + if (clientException != null) + throw clientException; } /** @@ -186,6 +191,9 @@ implements org.apache.hadoop.mapred.Reco private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize); private volatile boolean run = true; + // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing + // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, + // when the client is closed. private volatile IOException lastException; private Cassandra.Client thriftClient; @@ -222,15 +230,25 @@ implements org.apache.hadoop.mapred.Reco } } - public void stopNicely() throws IOException + public void close() throws IOException { - if (lastException != null) - throw lastException; + // stop the run loop. this will result in closeInternal being called by the time join() finishes. run = false; interrupt(); + try + { + this.join(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + if (lastException != null) + throw lastException; } - public void close() + private void closeInternal() { if (thriftSocket != null) { @@ -287,7 +305,7 @@ implements org.apache.hadoop.mapred.Reco } catch (Exception e) { - close(); + closeInternal(); if (!iter.hasNext()) { lastException = new IOException(e); @@ -304,7 +322,7 @@ implements org.apache.hadoop.mapred.Reco } catch (Exception e) { - close(); + closeInternal(); // TException means something unexpected went wrong to that endpoint, so // we should try again to another. Other exceptions (auth or invalid request) are fatal. if ((!(e instanceof TException)) || !iter.hasNext())