Repository: cassandra Updated Branches: refs/heads/trunk 7b24b8703 -> 8a91a1a59
RefCount native frames from netty to avoid corruption bugs patch by tjake; reviewed by bes for CASSANDRA-7245 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4722fe70 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4722fe70 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4722fe70 Branch: refs/heads/trunk Commit: 4722fe70aa9ae1b62772cfa1a1de58ef289445d5 Parents: b29d882 Author: Jake Luciani <j...@apache.org> Authored: Thu Jun 5 14:15:32 2014 -0400 Committer: Jake Luciani <j...@apache.org> Committed: Thu Jun 5 14:15:32 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 2 +- lib/netty-all-4.0.17.Final.jar | Bin 1613159 -> 0 bytes lib/netty-all-4.0.19.Final.jar | Bin 0 -> 1678810 bytes .../cql3/statements/ModificationStatement.java | 12 +- .../apache/cassandra/db/CounterMutation.java | 12 ++ src/java/org/apache/cassandra/db/IMutation.java | 11 ++ src/java/org/apache/cassandra/db/Mutation.java | 37 +++- .../apache/cassandra/net/MessagingService.java | 20 +- .../cassandra/net/ResponseVerbHandler.java | 7 + .../apache/cassandra/service/QueryState.java | 12 ++ .../apache/cassandra/service/StorageProxy.java | 197 ++++++++++++------- .../org/apache/cassandra/transport/CBUtil.java | 14 +- .../org/apache/cassandra/transport/Frame.java | 14 +- .../cassandra/transport/FrameCompressor.java | 8 +- .../org/apache/cassandra/transport/Message.java | 13 +- .../org/apache/cassandra/utils/ExpiringMap.java | 10 +- 17 files changed, 262 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eed2c09..3cea1e7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * Add optional keyspace to DROP INDEX statement (CASSANDRA-7314) * Reduce run time for CQL tests (CASSANDRA-7327) * Fix heap size calculation on Windows (CASSANDRA-7352) + * RefCount native frames from netty (CASSANDRA-7245) Merged from 2.0: * Add per-CF range read request latency metrics (CASSANDRA-7338) * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 7e34904..15986d1 100644 --- a/build.xml +++ b/build.xml @@ -396,7 +396,7 @@ <dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" /> <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" /> <dependency groupId="io.airlift" artifactId="airline" version="0.6" /> - <dependency groupId="io.netty" artifactId="netty-all" version="4.0.17.Final" /> + <dependency groupId="io.netty" artifactId="netty-all" version="4.0.19.Final" /> <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" /> <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" /> http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.17.Final.jar ---------------------------------------------------------------------- diff --git a/lib/netty-all-4.0.17.Final.jar b/lib/netty-all-4.0.17.Final.jar deleted file mode 100644 index baaa5b8..0000000 Binary files a/lib/netty-all-4.0.17.Final.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.19.Final.jar ---------------------------------------------------------------------- diff --git a/lib/netty-all-4.0.19.Final.jar b/lib/netty-all-4.0.19.Final.jar new file mode 100644 index 0000000..66d58f3 Binary files /dev/null and b/lib/netty-all-4.0.19.Final.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 621006b..f0ab603 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.apache.cassandra.transport.Frame; import org.github.jamm.MemoryMeter; import org.apache.cassandra.auth.Permission; @@ -497,7 +498,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF else cl.validateForWrite(cfm.ksName); - Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState)); + Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryState.getSourceFrame()); if (!mutations.isEmpty()) StorageProxy.mutateWithTriggers(mutations, cl, false); @@ -635,10 +636,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF if (hasConditions()) throw new UnsupportedOperationException(); - for (IMutation mutation : getMutations(options, true, queryState.getTimestamp())) + for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryState.getSourceFrame())) { // We don't use counters internally. assert mutation instanceof Mutation; + ((Mutation) mutation).apply(); } return null; @@ -654,7 +656,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now) + private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, Frame sourceFrame) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildPartitionKeyNames(options); @@ -662,13 +664,15 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now); - Collection<IMutation> mutations = new ArrayList<IMutation>(); + Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size()); for (ByteBuffer key: keys) { ThriftValidation.validateKey(cfm, key); ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); addUpdateForKey(cf, key, clusteringPrefix, params); Mutation mut = new Mutation(cfm.ksName, key, cf); + mut.setSourceFrame(sourceFrame); + mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut); } return mutations; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 58889c1..95f4ce3 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -69,6 +69,18 @@ public class CounterMutation implements IMutation return mutation.getColumnFamilies(); } + @Override + public void retain() + { + mutation.retain(); + } + + @Override + public void release() + { + mutation.release(); + } + public Mutation getMutation() { return mutation; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/IMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 44df104..3e037c3 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -30,4 +30,15 @@ public interface IMutation public String toString(boolean shallow); public void addAll(IMutation m); public Collection<ColumnFamily> getColumnFamilies(); + + /** + * Call to increment underlying network buffer refcount + * So we can avoid recycling too soon + */ + public void retain(); + + /** + * Call to decrement underlying network buffer refcount + */ + public void release(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index b64c675..6eb56b7 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.transport.Frame; import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.CFMetaData; @@ -34,12 +35,15 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO convert this to a Builder pattern instead of encouraging M.add directly, // which is less-efficient since we have to keep a mutable HashMap around public class Mutation implements IMutation { public static final MutationSerializer serializer = new MutationSerializer(); + private static final Logger logger = LoggerFactory.getLogger(Mutation.class); public static final String FORWARD_TO = "FWD_TO"; public static final String FORWARD_FROM = "FWD_FRM"; @@ -52,6 +56,8 @@ public class Mutation implements IMutation // map of column family id to mutations for that column family. private final Map<UUID, ColumnFamily> modifications; + private Frame sourceFrame; + public Mutation(String keyspaceName, ByteBuffer key) { this(keyspaceName, key, new HashMap<UUID, ColumnFamily>()); @@ -81,7 +87,10 @@ public class Mutation implements IMutation public Mutation copy() { - return new Mutation(keyspaceName, key, new HashMap<>(modifications)); + Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications)); + copy.setSourceFrame(getSourceFrame()); + + return copy; } public String getKeyspaceName() @@ -104,6 +113,20 @@ public class Mutation implements IMutation return modifications.values(); } + @Override + public void retain() + { + if (sourceFrame != null) + sourceFrame.retain(); + } + + @Override + public void release() + { + if (sourceFrame != null) + sourceFrame.release(); + } + public ColumnFamily getColumnFamily(UUID cfId) { return modifications.get(cfId); @@ -206,6 +229,8 @@ public class Mutation implements IMutation */ public void apply() { + assert sourceFrame == null || sourceFrame.body.refCnt() > 0; + Keyspace ks = Keyspace.open(keyspaceName); ks.apply(this, ks.metadata.durableWrites); } @@ -265,6 +290,16 @@ public class Mutation implements IMutation return mutation; } + public Frame getSourceFrame() + { + return sourceFrame; + } + + public void setSourceFrame(Frame sourceFrame) + { + this.sourceFrame = sourceFrame; + } + public static class MutationSerializer implements IVersionedSerializer<Mutation> { public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 3e88b37..b3d6ae5 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -349,10 +349,19 @@ public final class MessagingService implements MessagingServiceMBean }); } - if (expiredCallbackInfo.shouldHint()) + Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload; + + try + { + if (expiredCallbackInfo.shouldHint()) + { + return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null); + } + } + finally { - Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload; - return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null); + //We serialized a hint so we don't need this mutation anymore + mutation.release(); } return null; @@ -570,6 +579,11 @@ public final class MessagingService implements MessagingServiceMBean { assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION; int messageId = nextId(); + + //keep the underlying buffer around till the request completes or times out and + //a hint is stored + message.payload.retain(); + CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 1d9aa98..1e1a278 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -19,6 +19,7 @@ package org.apache.cassandra.net; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.db.IMutation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,5 +53,11 @@ public class ResponseVerbHandler implements IVerbHandler MessagingService.instance().maybeAddLatency(cb, message.from, latency); cb.response(message); } + + // We don't need to track the mutation anymore since write succeeded + if (callbackInfo instanceof WriteCallbackInfo) + { + ((IMutation)((WriteCallbackInfo) callbackInfo).sentMessage.payload).release(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index 12fc392..f2e0809 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service; import java.util.UUID; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.Frame; import org.apache.cassandra.utils.FBUtilities; /** @@ -30,6 +31,7 @@ public class QueryState private final ClientState clientState; private volatile long clock; private volatile UUID preparedTracingSession; + private Frame sourceFrame; public QueryState(ClientState clientState) { @@ -60,6 +62,16 @@ public class QueryState return clock; } + public Frame getSourceFrame() + { + return sourceFrame; + } + + public void setSourceFrame(Frame sourceFrame) + { + this.sourceFrame = sourceFrame; + } + public boolean traceNextQuery() { if (preparedTracingSession != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2cbc475..890315a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -451,12 +451,14 @@ public class StorageProxy implements StorageProxyMBean final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); long startTime = System.nanoTime(); - List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>(mutations.size()); + List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<>(mutations.size()); try { for (IMutation mutation : mutations) { + mutation.retain(); + if (mutation instanceof CounterMutation) { responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter)); @@ -517,6 +519,13 @@ public class StorageProxy implements StorageProxyMBean } finally { + //Release the mutations we dispatched so far. + //An exception may be thrown at anytime. + //We can infer the mutations that were dispatched from this list + Iterator<? extends IMutation> it = mutations.iterator(); + for (int i = 0; i < responseHandlers.size(); i++) + it.next().release(); + writeMetrics.addNano(System.nanoTime() - startTime); } } @@ -781,72 +790,79 @@ public class StorageProxy implements StorageProxyMBean MessageOut<Mutation> message = null; boolean insertLocal = false; - for (InetAddress destination : targets) - { - // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can - // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. - // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to - // a small number of nodes causing problems, so we should avoid shutting down writes completely to - // healthy nodes. Any node with no hintsInProgress is considered healthy. - if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress - && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination))) - { - throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count()); - } - if (FailureDetector.instance.isAlive(destination)) + + mutation.retain(); + try + { + for (InetAddress destination : targets) { - if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can + // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. + // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to + // a small number of nodes causing problems, so we should avoid shutting down writes completely to + // healthy nodes. Any node with no hintsInProgress is considered healthy. + if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress + && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination))) { - insertLocal = true; + throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count()); } - else + + if (FailureDetector.instance.isAlive(destination)) { - // belongs on a different server - if (message == null) - message = mutation.createMessage(); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); - // direct writes to local DC or old Cassandra versions - // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) - if (localDataCenter.equals(dc)) + if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) { - MessagingService.instance().sendRR(message, destination, responseHandler, true); - } - else + insertLocal = true; + } else { - Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; - if (messages == null) + // belongs on a different server + if (message == null) + message = mutation.createMessage(); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); + // direct writes to local DC or old Cassandra versions + // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) + if (localDataCenter.equals(dc)) { - messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas - if (dcGroups == null) - dcGroups = new HashMap<String, Collection<InetAddress>>(); - dcGroups.put(dc, messages); + MessagingService.instance().sendRR(message, destination, responseHandler, true); + } else + { + Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; + if (messages == null) + { + messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas + if (dcGroups == null) + dcGroups = new HashMap<String, Collection<InetAddress>>(); + dcGroups.put(dc, messages); + } + messages.add(destination); } - messages.add(destination); } + } else + { + if (!shouldHint(destination)) + continue; + + // Schedule a local hint + submitHint(mutation, destination, responseHandler); } } - else + + if (insertLocal) + insertLocal(mutation, responseHandler); + + if (dcGroups != null) { - if (!shouldHint(destination)) - continue; + // for each datacenter, send the message to one node to relay the write to other replicas + if (message == null) + message = mutation.createMessage(); - // Schedule a local hint - submitHint(mutation, destination, responseHandler); + for (Collection<InetAddress> dcTargets : dcGroups.values()) + sendMessagesToNonlocalDC(message, dcTargets, responseHandler); } } - - if (insertLocal) - insertLocal(mutation, responseHandler); - - if (dcGroups != null) + finally { - // for each datacenter, send the message to one node to relay the write to other replicas - if (message == null) - message = mutation.createMessage(); - - for (Collection<InetAddress> dcTargets : dcGroups.values()) - sendMessagesToNonlocalDC(message, dcTargets, responseHandler); + mutation.release(); } } @@ -868,23 +884,30 @@ public class StorageProxy implements StorageProxyMBean { // local write that time out should be handled by LocalMutationRunnable assert !target.equals(FBUtilities.getBroadcastAddress()) : target; + mutation.retain(); HintRunnable runnable = new HintRunnable(target) { public void runMayThrow() { - int ttl = HintedHandOffManager.calculateHintTTL(mutation); - if (ttl > 0) + try { - logger.debug("Adding hint for {}", target); - writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target); - // Notify the handler only for CL == ANY - if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY) - responseHandler.response(null); + int ttl = HintedHandOffManager.calculateHintTTL(mutation); + if (ttl > 0) + { + logger.debug("Adding hint for {}", target); + writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target); + // Notify the handler only for CL == ANY + if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY) + responseHandler.response(null); + } else + { + logger.debug("Skipped writing hint for {} (ttl {})", target, ttl); + } } - else + finally { - logger.debug("Skipped writing hint for {} (ttl {})", target, ttl); + mutation.release(); } } }; @@ -948,15 +971,24 @@ public class StorageProxy implements StorageProxyMBean private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler) { + mutation.retain(); + StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable() { public void runMayThrow() { - IMutation processed = SinkManager.processWriteRequest(mutation); - if (processed != null) + try + { + IMutation processed = SinkManager.processWriteRequest(mutation); + if (processed != null) + { + ((Mutation) processed).apply(); + responseHandler.response(null); + } + } + finally { - ((Mutation) processed).apply(); - responseHandler.response(null); + mutation.release(); } } }); @@ -1062,8 +1094,11 @@ public class StorageProxy implements StorageProxyMBean final AbstractWriteResponseHandler responseHandler, final String localDataCenter) { + mutation.retain(); + return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION) { + @Override public void runMayThrow() throws OverloadedException, WriteTimeoutException { IMutation processed = SinkManager.processWriteRequest(mutation); @@ -1077,10 +1112,16 @@ public class StorageProxy implements StorageProxyMBean responseHandler.response(null); Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), - ImmutableSet.of(FBUtilities.getBroadcastAddress())); + ImmutableSet.of(FBUtilities.getBroadcastAddress())); if (!remotes.isEmpty()) sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter); } + + @Override + public void cleanup() + { + mutation.release(); + } }; } @@ -2000,22 +2041,32 @@ public class StorageProxy implements StorageProxyMBean public final void run() { - if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb)) - { - MessagingService.instance().incrementDroppedMessages(verb); - return; - } - try { - runMayThrow(); + if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb)) + { + MessagingService.instance().incrementDroppedMessages(verb); + return; + } + + try + { + runMayThrow(); + } catch (Exception e) + { + throw new RuntimeException(e); + } } - catch (Exception e) + finally { - throw new RuntimeException(e); + cleanup(); } } + public void cleanup() + { + } + abstract protected void runMayThrow() throws Exception; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index e5e6f05..6cc6d47 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -29,16 +29,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.util.AttributeKey; +import io.netty.buffer.*; import io.netty.util.CharsetUtil; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; @@ -52,7 +47,6 @@ import org.apache.cassandra.utils.UUIDGen; public abstract class CBUtil { public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true); - public static final ByteBufAllocator onHeapAllocator = new PooledByteBufAllocator(false); private CBUtil() {} @@ -303,7 +297,11 @@ public abstract class CBUtil if (length < 0) return null; ByteBuf slice = cb.readSlice(length); - return ByteBuffer.wrap(readRawBytes(slice)); + if (slice.nioBufferCount() == 1) + return slice.nioBuffer(); + else + return ByteBuffer.wrap(readRawBytes(slice)); + } public static void writeValue(byte[] bytes, ByteBuf cb) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 3e66ff7..b29e80c 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -65,9 +66,14 @@ public class Frame this.body = body; } - public void release() + public void retain() + { + body.retain(); + } + + public boolean release() { - body.release(); + return body.release(); } public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body) @@ -226,7 +232,9 @@ public class Frame return; // extract body - ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx, (int) bodyLength)); + ByteBuf body = buffer.slice(idx, (int) bodyLength); + body.retain(); + idx += bodyLength; buffer.readerIndex(idx); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/FrameCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java index 9617ec2..8ab735f 100644 --- a/src/java/org/apache/cassandra/transport/FrameCompressor.java +++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java @@ -75,7 +75,7 @@ public interface FrameCompressor public Frame compress(Frame frame) throws IOException { byte[] input = CBUtil.readRawBytes(frame.body); - ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.maxCompressedLength(input.length)); + ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length)); try { @@ -103,7 +103,7 @@ public interface FrameCompressor if (!Snappy.isValidCompressedBuffer(input, 0, input.length)) throw new ProtocolException("Provided frame does not appear to be Snappy compressed"); - ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.uncompressedLength(input)); + ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input)); try { @@ -153,7 +153,7 @@ public interface FrameCompressor byte[] input = CBUtil.readRawBytes(frame.body); int maxCompressedLength = compressor.maxCompressedLength(input.length); - ByteBuf outputBuf = CBUtil.onHeapAllocator.buffer(INTEGER_BYTES + maxCompressedLength); + ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + maxCompressedLength); byte[] output = outputBuf.array(); int outputOffset = outputBuf.arrayOffset(); @@ -191,7 +191,7 @@ public interface FrameCompressor | ((input[2] & 0xFF) << 8) | ((input[3] & 0xFF)); - ByteBuf output = CBUtil.onHeapAllocator.buffer(uncompressedLength); + ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 9e8719e..f27d545 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -124,9 +124,9 @@ public abstract class Message } public final Type type; - protected volatile Connection connection; - private volatile int streamId; - private volatile Frame sourceFrame; + protected Connection connection; + private int streamId; + private Frame sourceFrame = null; protected Message(Type type) { @@ -360,10 +360,8 @@ public abstract class Message for (ChannelHandlerContext channel : channels) channel.flush(); for (FlushItem item : flushed) - { - if (item.response.getSourceFrame().body.refCnt() > 0) - item.response.getSourceFrame().release(); - } + item.response.getSourceFrame().release(); + channels.clear(); flushed.clear(); runsSinceFlush = 0; @@ -407,6 +405,7 @@ public abstract class Message assert request.connection() instanceof ServerConnection; connection = (ServerConnection)request.connection(); QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); + qstate.setSourceFrame(request.getSourceFrame()); logger.debug("Received: {}, v={}", request, connection.getVersion()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/utils/ExpiringMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java index 7eec40e..e7b626c 100644 --- a/src/java/org/apache/cassandra/utils/ExpiringMap.java +++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java @@ -91,10 +91,12 @@ public class ExpiringMap<K, V> { if (entry.getValue().isReadyToDieAt(start)) { - cache.remove(entry.getKey()); - n++; - if (postExpireHook != null) - postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue())); + if (cache.remove(entry.getKey()) != null) + { + n++; + if (postExpireHook != null) + postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue())); + } } } logger.trace("Expired {} entries", n);