Repository: cassandra
Updated Branches:
  refs/heads/trunk 7b24b8703 -> 8a91a1a59


RefCount native frames from netty to avoid corruption bugs

patch by tjake; reviewed by bes for CASSANDRA-7245


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4722fe70
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4722fe70
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4722fe70

Branch: refs/heads/trunk
Commit: 4722fe70aa9ae1b62772cfa1a1de58ef289445d5
Parents: b29d882
Author: Jake Luciani <j...@apache.org>
Authored: Thu Jun 5 14:15:32 2014 -0400
Committer: Jake Luciani <j...@apache.org>
Committed: Thu Jun 5 14:15:32 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 lib/netty-all-4.0.17.Final.jar                  | Bin 1613159 -> 0 bytes
 lib/netty-all-4.0.19.Final.jar                  | Bin 0 -> 1678810 bytes
 .../cql3/statements/ModificationStatement.java  |  12 +-
 .../apache/cassandra/db/CounterMutation.java    |  12 ++
 src/java/org/apache/cassandra/db/IMutation.java |  11 ++
 src/java/org/apache/cassandra/db/Mutation.java  |  37 +++-
 .../apache/cassandra/net/MessagingService.java  |  20 +-
 .../cassandra/net/ResponseVerbHandler.java      |   7 +
 .../apache/cassandra/service/QueryState.java    |  12 ++
 .../apache/cassandra/service/StorageProxy.java  | 197 ++++++++++++-------
 .../org/apache/cassandra/transport/CBUtil.java  |  14 +-
 .../org/apache/cassandra/transport/Frame.java   |  14 +-
 .../cassandra/transport/FrameCompressor.java    |   8 +-
 .../org/apache/cassandra/transport/Message.java |  13 +-
 .../org/apache/cassandra/utils/ExpiringMap.java |  10 +-
 17 files changed, 262 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eed2c09..3cea1e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Add optional keyspace to DROP INDEX statement (CASSANDRA-7314)
  * Reduce run time for CQL tests (CASSANDRA-7327)
  * Fix heap size calculation on Windows (CASSANDRA-7352)
+ * RefCount native frames from netty (CASSANDRA-7245)
 Merged from 2.0:
  * Add per-CF range read request latency metrics (CASSANDRA-7338)
  * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7e34904..15986d1 100644
--- a/build.xml
+++ b/build.xml
@@ -396,7 +396,7 @@
           <dependency groupId="com.addthis.metrics" 
artifactId="reporter-config" version="2.1.0" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" 
version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.6" 
/>
-          <dependency groupId="io.netty" artifactId="netty-all" 
version="4.0.17.Final" />
+          <dependency groupId="io.netty" artifactId="netty-all" 
version="4.0.19.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" 
version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" 
version="2.5.2" />
           <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="2.0.1" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.17.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.17.Final.jar b/lib/netty-all-4.0.17.Final.jar
deleted file mode 100644
index baaa5b8..0000000
Binary files a/lib/netty-all-4.0.17.Final.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.19.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.19.Final.jar b/lib/netty-all-4.0.19.Final.jar
new file mode 100644
index 0000000..66d58f3
Binary files /dev/null and b/lib/netty-all-4.0.19.Final.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 621006b..f0ab603 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.transport.Frame;
 import org.github.jamm.MemoryMeter;
 
 import org.apache.cassandra.auth.Permission;
@@ -497,7 +498,7 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState));
+        Collection<? extends IMutation> mutations = getMutations(options, 
false, options.getTimestamp(queryState), queryState.getSourceFrame());
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -635,10 +636,11 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(options, true, 
queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(options, true, 
queryState.getTimestamp(), queryState.getSourceFrame()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
+
             ((Mutation) mutation).apply();
         }
         return null;
@@ -654,7 +656,7 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, 
boolean local, long now, Frame sourceFrame)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -662,13 +664,15 @@ public abstract class ModificationStatement implements 
CQLStatement, MeasurableF
 
         UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, 
options, local, now);
 
-        Collection<IMutation> mutations = new ArrayList<IMutation>();
+        Collection<IMutation> mutations = new 
ArrayList<IMutation>(keys.size());
         for (ByteBuffer key: keys)
         {
             ThriftValidation.validateKey(cfm, key);
             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             Mutation mut = new Mutation(cfm.ksName, key, cf);
+            mut.setSourceFrame(sourceFrame);
+
             mutations.add(isCounter() ? new CounterMutation(mut, 
options.getConsistency()) : mut);
         }
         return mutations;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index 58889c1..95f4ce3 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -69,6 +69,18 @@ public class CounterMutation implements IMutation
         return mutation.getColumnFamilies();
     }
 
+    @Override
+    public void retain()
+    {
+        mutation.retain();
+    }
+
+    @Override
+    public void release()
+    {
+        mutation.release();
+    }
+
     public Mutation getMutation()
     {
         return mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java 
b/src/java/org/apache/cassandra/db/IMutation.java
index 44df104..3e037c3 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -30,4 +30,15 @@ public interface IMutation
     public String toString(boolean shallow);
     public void addAll(IMutation m);
     public Collection<ColumnFamily> getColumnFamilies();
+
+    /**
+     * Call to increment underlying network buffer refcount
+     * So we can avoid recycling too soon
+     */
+    public void retain();
+
+    /**
+     * Call to decrement underlying network buffer refcount
+     */
+    public void release();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index b64c675..6eb56b7 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.transport.Frame;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -34,12 +35,15 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // TODO convert this to a Builder pattern instead of encouraging M.add 
directly,
 // which is less-efficient since we have to keep a mutable HashMap around
 public class Mutation implements IMutation
 {
     public static final MutationSerializer serializer = new 
MutationSerializer();
+    private static final Logger logger = 
LoggerFactory.getLogger(Mutation.class);
 
     public static final String FORWARD_TO = "FWD_TO";
     public static final String FORWARD_FROM = "FWD_FRM";
@@ -52,6 +56,8 @@ public class Mutation implements IMutation
     // map of column family id to mutations for that column family.
     private final Map<UUID, ColumnFamily> modifications;
 
+    private Frame sourceFrame;
+
     public Mutation(String keyspaceName, ByteBuffer key)
     {
         this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
@@ -81,7 +87,10 @@ public class Mutation implements IMutation
 
     public Mutation copy()
     {
-        return new Mutation(keyspaceName, key, new HashMap<>(modifications));
+        Mutation copy = new Mutation(keyspaceName, key, new 
HashMap<>(modifications));
+        copy.setSourceFrame(getSourceFrame());
+
+        return copy;
     }
 
     public String getKeyspaceName()
@@ -104,6 +113,20 @@ public class Mutation implements IMutation
         return modifications.values();
     }
 
+    @Override
+    public void retain()
+    {
+        if (sourceFrame != null)
+            sourceFrame.retain();
+    }
+
+    @Override
+    public void release()
+    {
+        if (sourceFrame != null)
+            sourceFrame.release();
+    }
+
     public ColumnFamily getColumnFamily(UUID cfId)
     {
         return modifications.get(cfId);
@@ -206,6 +229,8 @@ public class Mutation implements IMutation
      */
     public void apply()
     {
+        assert sourceFrame == null || sourceFrame.body.refCnt() > 0;
+
         Keyspace ks = Keyspace.open(keyspaceName);
         ks.apply(this, ks.metadata.durableWrites);
     }
@@ -265,6 +290,16 @@ public class Mutation implements IMutation
         return mutation;
     }
 
+    public Frame getSourceFrame()
+    {
+        return sourceFrame;
+    }
+
+    public void setSourceFrame(Frame sourceFrame)
+    {
+        this.sourceFrame = sourceFrame;
+    }
+
     public static class MutationSerializer implements 
IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int 
version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 3e88b37..b3d6ae5 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -349,10 +349,19 @@ public final class MessagingService implements 
MessagingServiceMBean
                     });
                 }
 
-                if (expiredCallbackInfo.shouldHint())
+                Mutation mutation = (Mutation) ((WriteCallbackInfo) 
expiredCallbackInfo).sentMessage.payload;
+
+                try
+                {
+                    if (expiredCallbackInfo.shouldHint())
+                    {
+                        return StorageProxy.submitHint(mutation, 
expiredCallbackInfo.target, null);
+                    }
+                }
+                finally
                 {
-                    Mutation mutation = (Mutation) ((WriteCallbackInfo) 
expiredCallbackInfo).sentMessage.payload;
-                    return StorageProxy.submitHint(mutation, 
expiredCallbackInfo.target, null);
+                    //We serialized a hint so we don't need this mutation 
anymore
+                    mutation.release();
                 }
 
                 return null;
@@ -570,6 +579,11 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         assert message.verb == Verb.MUTATION || message.verb == 
Verb.COUNTER_MUTATION;
         int messageId = nextId();
+
+        //keep the underlying buffer around till the request completes or 
times out and
+        //a hint is stored
+        message.payload.retain();
+
         CallbackInfo previous = callbacks.put(messageId,
                                               new WriteCallbackInfo(to,
                                                                     cb,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 1d9aa98..1e1a278 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.db.IMutation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,5 +53,11 @@ public class ResponseVerbHandler implements IVerbHandler
             MessagingService.instance().maybeAddLatency(cb, message.from, 
latency);
             cb.response(message);
         }
+
+        // We don't need to track the mutation anymore since write succeeded
+        if (callbackInfo instanceof WriteCallbackInfo)
+        {
+            ((IMutation)((WriteCallbackInfo) 
callbackInfo).sentMessage.payload).release();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index 12fc392..f2e0809 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 import java.util.UUID;
 
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Frame;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -30,6 +31,7 @@ public class QueryState
     private final ClientState clientState;
     private volatile long clock;
     private volatile UUID preparedTracingSession;
+    private Frame sourceFrame;
 
     public QueryState(ClientState clientState)
     {
@@ -60,6 +62,16 @@ public class QueryState
         return clock;
     }
 
+    public Frame getSourceFrame()
+    {
+        return sourceFrame;
+    }
+
+    public void setSourceFrame(Frame sourceFrame)
+    {
+        this.sourceFrame = sourceFrame;
+    }
+
     public boolean traceNextQuery()
     {
         if (preparedTracingSession != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2cbc475..890315a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -451,12 +451,14 @@ public class StorageProxy implements StorageProxyMBean
         final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
-        List<AbstractWriteResponseHandler> responseHandlers = new 
ArrayList<AbstractWriteResponseHandler>(mutations.size());
+        List<AbstractWriteResponseHandler> responseHandlers = new 
ArrayList<>(mutations.size());
 
         try
         {
             for (IMutation mutation : mutations)
             {
+                mutation.retain();
+
                 if (mutation instanceof CounterMutation)
                 {
                     
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -517,6 +519,13 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
+            //Release the mutations we dispatched so far.
+            //An exception may be thrown at anytime.
+            //We can infer the mutations that were dispatched from this list
+            Iterator<? extends IMutation> it = mutations.iterator();
+            for (int i = 0; i < responseHandlers.size(); i++)
+                it.next().release();
+            
             writeMetrics.addNano(System.nanoTime() - startTime);
         }
     }
@@ -781,72 +790,79 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Mutation> message = null;
 
         boolean insertLocal = false;
-        for (InetAddress destination : targets)
-        {
-            // avoid OOMing due to excess hints.  we need to do this check 
even for "live" nodes, since we can
-            // still generate hints for those if it's overloaded or simply 
dead but not yet known-to-be-dead.
-            // The idea is that if we have over maxHintsInProgress hints in 
flight, this is probably due to
-            // a small number of nodes causing problems, so we should avoid 
shutting down writes completely to
-            // healthy nodes.  Any node with no hintsInProgress is considered 
healthy.
-            if (StorageMetrics.totalHintsInProgress.count() > 
maxHintsInProgress
-                && (getHintsInProgressFor(destination).get() > 0 && 
shouldHint(destination)))
-            {
-                throw new OverloadedException("Too many in flight hints: " + 
StorageMetrics.totalHintsInProgress.count());
-            }
 
-            if (FailureDetector.instance.isAlive(destination))
+
+        mutation.retain();
+        try        
+        {
+            for (InetAddress destination : targets)
             {
-                if (destination.equals(FBUtilities.getBroadcastAddress()) && 
OPTIMIZE_LOCAL_REQUESTS)
+                // avoid OOMing due to excess hints.  we need to do this check 
even for "live" nodes, since we can
+                // still generate hints for those if it's overloaded or simply 
dead but not yet known-to-be-dead.
+                // The idea is that if we have over maxHintsInProgress hints 
in flight, this is probably due to
+                // a small number of nodes causing problems, so we should 
avoid shutting down writes completely to
+                // healthy nodes.  Any node with no hintsInProgress is 
considered healthy.
+                if (StorageMetrics.totalHintsInProgress.count() > 
maxHintsInProgress
+                        && (getHintsInProgressFor(destination).get() > 0 && 
shouldHint(destination)))
                 {
-                    insertLocal = true;
+                    throw new OverloadedException("Too many in flight hints: " 
+ StorageMetrics.totalHintsInProgress.count());
                 }
-                else
+
+                if (FailureDetector.instance.isAlive(destination))
                 {
-                    // belongs on a different server
-                    if (message == null)
-                        message = mutation.createMessage();
-                    String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    // direct writes to local DC or old Cassandra versions
-                    // (1.1 knows how to forward old-style String message IDs; 
updated to int in 2.0)
-                    if (localDataCenter.equals(dc))
+                    if (destination.equals(FBUtilities.getBroadcastAddress()) 
&& OPTIMIZE_LOCAL_REQUESTS)
                     {
-                        MessagingService.instance().sendRR(message, 
destination, responseHandler, true);
-                    }
-                    else
+                        insertLocal = true;
+                    } else
                     {
-                        Collection<InetAddress> messages = (dcGroups != null) 
? dcGroups.get(dc) : null;
-                        if (messages == null)
+                        // belongs on a different server
+                        if (message == null)
+                            message = mutation.createMessage();
+                        String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+                        // direct writes to local DC or old Cassandra versions
+                        // (1.1 knows how to forward old-style String message 
IDs; updated to int in 2.0)
+                        if (localDataCenter.equals(dc))
                         {
-                            messages = new ArrayList<InetAddress>(3); // most 
DCs will have <= 3 replicas
-                            if (dcGroups == null)
-                                dcGroups = new HashMap<String, 
Collection<InetAddress>>();
-                            dcGroups.put(dc, messages);
+                            MessagingService.instance().sendRR(message, 
destination, responseHandler, true);
+                        } else
+                        {
+                            Collection<InetAddress> messages = (dcGroups != 
null) ? dcGroups.get(dc) : null;
+                            if (messages == null)
+                            {
+                                messages = new ArrayList<InetAddress>(3); // 
most DCs will have <= 3 replicas
+                                if (dcGroups == null)
+                                    dcGroups = new HashMap<String, 
Collection<InetAddress>>();
+                                dcGroups.put(dc, messages);
+                            }
+                            messages.add(destination);
                         }
-                        messages.add(destination);
                     }
+                } else
+                {
+                    if (!shouldHint(destination))
+                        continue;
+
+                    // Schedule a local hint
+                    submitHint(mutation, destination, responseHandler);
                 }
             }
-            else
+
+            if (insertLocal)
+                insertLocal(mutation, responseHandler);
+
+            if (dcGroups != null)
             {
-                if (!shouldHint(destination))
-                    continue;
+                // for each datacenter, send the message to one node to relay 
the write to other replicas
+                if (message == null)
+                    message = mutation.createMessage();
 
-                // Schedule a local hint
-                submitHint(mutation, destination, responseHandler);
+                for (Collection<InetAddress> dcTargets : dcGroups.values())
+                    sendMessagesToNonlocalDC(message, dcTargets, 
responseHandler);
             }
         }
-
-        if (insertLocal)
-            insertLocal(mutation, responseHandler);
-
-        if (dcGroups != null)
+        finally
         {
-            // for each datacenter, send the message to one node to relay the 
write to other replicas
-            if (message == null)
-                message = mutation.createMessage();
-
-            for (Collection<InetAddress> dcTargets : dcGroups.values())
-                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
+            mutation.release();
         }
     }
 
@@ -868,23 +884,30 @@ public class StorageProxy implements StorageProxyMBean
     {
         // local write that time out should be handled by LocalMutationRunnable
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
+        mutation.retain();
 
         HintRunnable runnable = new HintRunnable(target)
         {
             public void runMayThrow()
             {
-                int ttl = HintedHandOffManager.calculateHintTTL(mutation);
-                if (ttl > 0)
+                try
                 {
-                    logger.debug("Adding hint for {}", target);
-                    writeHintForMutation(mutation, System.currentTimeMillis(), 
ttl, target);
-                    // Notify the handler only for CL == ANY
-                    if (responseHandler != null && 
responseHandler.consistencyLevel == ConsistencyLevel.ANY)
-                        responseHandler.response(null);
+                    int ttl = HintedHandOffManager.calculateHintTTL(mutation);
+                    if (ttl > 0)
+                    {
+                        logger.debug("Adding hint for {}", target);
+                        writeHintForMutation(mutation, 
System.currentTimeMillis(), ttl, target);
+                        // Notify the handler only for CL == ANY
+                        if (responseHandler != null && 
responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+                            responseHandler.response(null);
+                    } else
+                    {
+                        logger.debug("Skipped writing hint for {} (ttl {})", 
target, ttl);
+                    }
                 }
-                else
+                finally
                 {
-                    logger.debug("Skipped writing hint for {} (ttl {})", 
target, ttl);
+                    mutation.release();
                 }
             }
         };
@@ -948,15 +971,24 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void insertLocal(final Mutation mutation, final 
AbstractWriteResponseHandler responseHandler)
     {
+        mutation.retain();
+
         StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new 
LocalMutationRunnable()
         {
             public void runMayThrow()
             {
-                IMutation processed = 
SinkManager.processWriteRequest(mutation);
-                if (processed != null)
+                try
+                {
+                    IMutation processed = 
SinkManager.processWriteRequest(mutation);
+                    if (processed != null)
+                    {
+                        ((Mutation) processed).apply();
+                        responseHandler.response(null);
+                    }
+                }
+                finally
                 {
-                    ((Mutation) processed).apply();
-                    responseHandler.response(null);
+                    mutation.release();
                 }
             }
         });
@@ -1062,8 +1094,11 @@ public class StorageProxy implements StorageProxyMBean
                                              final 
AbstractWriteResponseHandler responseHandler,
                                              final String localDataCenter)
     {
+        mutation.retain();
+
         return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
         {
+            @Override
             public void runMayThrow() throws OverloadedException, 
WriteTimeoutException
             {
                 IMutation processed = 
SinkManager.processWriteRequest(mutation);
@@ -1077,10 +1112,16 @@ public class StorageProxy implements StorageProxyMBean
                 responseHandler.response(null);
 
                 Set<InetAddress> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
-                                                           
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+                            
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
                 if (!remotes.isEmpty())
                     sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter);
             }
+
+            @Override
+            public void cleanup()
+            {
+                mutation.release();
+            }
         };
     }
 
@@ -2000,22 +2041,32 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
constructionTime) > DatabaseDescriptor.getTimeout(verb))
-            {
-                MessagingService.instance().incrementDroppedMessages(verb);
-                return;
-            }
-
             try
             {
-                runMayThrow();
+                if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
constructionTime) > DatabaseDescriptor.getTimeout(verb))
+                {
+                    MessagingService.instance().incrementDroppedMessages(verb);
+                    return;
+                }
+
+                try
+                {
+                    runMayThrow();
+                } catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
-            catch (Exception e)
+            finally
             {
-                throw new RuntimeException(e);
+                cleanup();
             }
         }
 
+        public void cleanup()
+        {
+        }
+
         abstract protected void runMayThrow() throws Exception;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java 
b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5e6f05..6cc6d47 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -29,16 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.AttributeKey;
+import io.netty.buffer.*;
 import io.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -52,7 +47,6 @@ import org.apache.cassandra.utils.UUIDGen;
 public abstract class CBUtil
 {
     public static final ByteBufAllocator allocator = new 
PooledByteBufAllocator(true);
-    public static final ByteBufAllocator onHeapAllocator = new 
PooledByteBufAllocator(false);
 
     private CBUtil() {}
 
@@ -303,7 +297,11 @@ public abstract class CBUtil
         if (length < 0)
             return null;
         ByteBuf slice = cb.readSlice(length);
-        return ByteBuffer.wrap(readRawBytes(slice));
+        if (slice.nioBufferCount() == 1)
+            return slice.nioBuffer();
+        else
+            return ByteBuffer.wrap(readRawBytes(slice));
+
     }
 
     public static void writeValue(byte[] bytes, ByteBuf cb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 3e66ff7..b29e80c 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -65,9 +66,14 @@ public class Frame
         this.body = body;
     }
 
-    public void release()
+    public void retain()
+    {
+        body.retain();
+    }
+
+    public boolean release()
     {
-        body.release();
+        return body.release();
     }
 
     public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ByteBuf body)
@@ -226,7 +232,9 @@ public class Frame
                 return;
 
             // extract body
-            ByteBuf body = CBUtil.allocator.buffer((int) 
bodyLength).writeBytes(buffer.duplicate().slice(idx, (int) bodyLength));
+            ByteBuf body = buffer.slice(idx, (int) bodyLength);
+            body.retain();
+            
             idx += bodyLength;
             buffer.readerIndex(idx);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java 
b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index 9617ec2..8ab735f 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -75,7 +75,7 @@ public interface FrameCompressor
         public Frame compress(Frame frame) throws IOException
         {
             byte[] input = CBUtil.readRawBytes(frame.body);
-            ByteBuf output = 
CBUtil.onHeapAllocator.buffer(Snappy.maxCompressedLength(input.length));
+            ByteBuf output = 
CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length));
 
             try
             {
@@ -103,7 +103,7 @@ public interface FrameCompressor
             if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
                 throw new ProtocolException("Provided frame does not appear to 
be Snappy compressed");
 
-            ByteBuf output = 
CBUtil.onHeapAllocator.buffer(Snappy.uncompressedLength(input));
+            ByteBuf output = 
CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input));
 
             try
             {
@@ -153,7 +153,7 @@ public interface FrameCompressor
             byte[] input = CBUtil.readRawBytes(frame.body);
 
             int maxCompressedLength = 
compressor.maxCompressedLength(input.length);
-            ByteBuf outputBuf = CBUtil.onHeapAllocator.buffer(INTEGER_BYTES + 
maxCompressedLength);
+            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + 
maxCompressedLength);
 
             byte[] output = outputBuf.array();
             int outputOffset = outputBuf.arrayOffset();
@@ -191,7 +191,7 @@ public interface FrameCompressor
                                    | ((input[2] & 0xFF) <<  8)
                                    | ((input[3] & 0xFF));
 
-            ByteBuf output = CBUtil.onHeapAllocator.buffer(uncompressedLength);
+            ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength);
 
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 9e8719e..f27d545 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -124,9 +124,9 @@ public abstract class Message
     }
 
     public final Type type;
-    protected volatile Connection connection;
-    private volatile int streamId;
-    private volatile Frame sourceFrame;
+    protected Connection connection;
+    private int streamId;
+    private Frame sourceFrame = null;
 
     protected Message(Type type)
     {
@@ -360,10 +360,8 @@ public abstract class Message
                     for (ChannelHandlerContext channel : channels)
                         channel.flush();
                     for (FlushItem item : flushed)
-                    {
-                        if (item.response.getSourceFrame().body.refCnt() > 0)
-                            item.response.getSourceFrame().release();
-                    }
+                        item.response.getSourceFrame().release();
+
                     channels.clear();
                     flushed.clear();
                     runsSinceFlush = 0;
@@ -407,6 +405,7 @@ public abstract class Message
                 assert request.connection() instanceof ServerConnection;
                 connection = (ServerConnection)request.connection();
                 QueryState qstate = 
connection.validateNewMessage(request.type, connection.getVersion(), 
request.getStreamId());
+                qstate.setSourceFrame(request.getSourceFrame());
 
                 logger.debug("Received: {}, v={}", request, 
connection.getVersion());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java 
b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..e7b626c 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -91,10 +91,12 @@ public class ExpiringMap<K, V>
                 {
                     if (entry.getValue().isReadyToDieAt(start))
                     {
-                        cache.remove(entry.getKey());
-                        n++;
-                        if (postExpireHook != null)
-                            postExpireHook.apply(Pair.create(entry.getKey(), 
entry.getValue()));
+                        if (cache.remove(entry.getKey()) != null)
+                        {
+                            n++;
+                            if (postExpireHook != null)
+                                
postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue()));
+                        }
                     }
                 }
                 logger.trace("Expired {} entries", n);

Reply via email to