Author: jbellis
Date: Tue Jun 28 12:31:51 2011
New Revision: 1140567

URL: http://svn.apache.org/viewvc?rev=1140567&view=rev
Log:
merge from 0.7

Modified:
    cassandra/branches/cassandra-0.8/   (props changed)
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/contrib/   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Propchange: cassandra/branches/cassandra-0.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7:1026516-1140565
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0:1125021-1130369

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1140567&r1=1140566&r2=1140567&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jun 28 12:31:51 2011
@@ -84,6 +84,8 @@
  * fix repair hanging if a neighbor has nothing to send (CASSANDRA-2797)
  * purge tombstone even if row is in only one sstable (CASSANDRA-2801)
  * Fix wrong purge of deleted cf during compaction (CASSANDRA-2786)
+ * fix race that could result in Hadoop writer failing to throw an
+   exception encountered after close() (CASSANDRA-2755)
 
 
 0.8.0-final
@@ -202,9 +204,6 @@
  * reduce contention on Table.flusherLock (CASSANDRA-1954)
  * try harder to detect failures during streaming, cleaning up temporary
    files more reliably (CASSANDRA-2088)
-
-
-0.6.13
  * shut down server for OOM on a Thrift thread (CASSANDRA-2269)
  * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
  * preserve version when streaming data from old sstables (CASSANDRA-2283)

Propchange: cassandra/branches/cassandra-0.8/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/contrib:1026516-1140565
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140565
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140565
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140565
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140565
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369

Propchange: 
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 12:31:51 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1138710,1138996,1140550
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140565
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1140567&r1=1140566&r2=1140567&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Tue Jun 28 12:31:51 2011
@@ -57,9 +57,9 @@ public class DebuggableThreadPoolExecuto
         super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
         allowCoreThreadTimeOut(true);
 
-        // preserve task serialization.  this is more complicated than it 
needs to be,
-        // since TPE rejects if queue.offer reports a full queue.  we'll just
-        // override this with a handler that retries until it gets in.  ugly, 
but effective.
+        // block task submissions until queue has room.
+        // this is fighting TPE's design a bit because TPE rejects if 
queue.offer reports a full queue.
+        // we'll just override this with a handler that retries until it gets 
in.  ugly, but effective.
         // (there is an extensive analysis of the options here at
         //  
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
         this.setRejectedExecutionHandler(new RejectedExecutionHandler()

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1140567&r1=1140566&r2=1140567&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Tue Jun 28 12:31:51 2011
@@ -92,7 +92,7 @@ public class HintedHandOffManager implem
     public static final String HINTS_CF = "HintsColumnFamily";
 
     private static final Logger logger_ = 
LoggerFactory.getLogger(HintedHandOffManager.class);
-    private static final int PAGE_SIZE = 10000;
+    private static final int PAGE_SIZE = 1024;
     private static final String SEPARATOR = "-";
     private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be 
enough for anybody.
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1140567&r1=1140566&r2=1140567&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Tue Jun 28 12:31:51 2011
@@ -729,7 +729,7 @@ public class CompactionManager implement
                         goodRows++;
                     }
                     if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
-                        logger.warn("Row scrubbed successfully but index file 
contains a different key or row size; consider rebuilding the index as 
described in 
http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html";);
+                        logger.warn("Index file contained a different key or 
row size; using key from data file");
                 }
                 catch (Throwable th)
                 {

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1140567&r1=1140566&r2=1140567&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 Tue Jun 28 12:31:51 2011
@@ -32,15 +32,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -150,27 +149,33 @@ implements org.apache.hadoop.mapred.Reco
     @Override
     public void close(TaskAttemptContext context) throws IOException, 
InterruptedException
     {
-        close((org.apache.hadoop.mapred.Reporter)null);
+        close();
     }
 
     /** Fills the deprecated RecordWriter interface for streaming. */
     @Deprecated
     public void close(org.apache.hadoop.mapred.Reporter reporter) throws 
IOException
     {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        // close all the clients before throwing anything
+        IOException clientException = null;
         for (RangeClient client : clients.values())
-            client.stopNicely();
-        try
         {
-            for (RangeClient client : clients.values())
+            try
             {
-                client.join();
                 client.close();
             }
+            catch (IOException e)
+            {
+                clientException = e;
+            }
         }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        if (clientException != null)
+            throw clientException;
     }
 
     /**
@@ -186,6 +191,9 @@ implements org.apache.hadoop.mapred.Reco
         private final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new 
ArrayBlockingQueue<Pair<ByteBuffer,Mutation>>(queueSize);
 
         private volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record 
any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, 
or if there are no more put calls,
+        // when the client is closed.
         private volatile IOException lastException;
 
         private Cassandra.Client thriftClient;
@@ -222,15 +230,25 @@ implements org.apache.hadoop.mapred.Reco
             }
         }
 
-        public void stopNicely() throws IOException
+        public void close() throws IOException
         {
-            if (lastException != null)
-                throw lastException;
+            // stop the run loop.  this will result in closeInternal being 
called by the time join() finishes.
             run = false;
             interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
         }
 
-        public void close()
+        private void closeInternal()
         {
             if (thriftSocket != null)
             {
@@ -287,7 +305,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         if (!iter.hasNext())
                         {
                             lastException = new IOException(e);
@@ -304,7 +322,7 @@ implements org.apache.hadoop.mapred.Reco
                     }
                     catch (Exception e)
                     {
-                        close();
+                        closeInternal();
                         // TException means something unexpected went wrong to 
that endpoint, so
                         // we should try again to another.  Other exceptions 
(auth or invalid request) are fatal.
                         if ((!(e instanceof TException)) || !iter.hasNext())


Reply via email to