Repository: cassandra
Updated Branches:
  refs/heads/trunk 50ea0182e -> 642546aba


Clean up I*Sink testing hooks

patch by Aleksey Yeschenko; reviewed by Jeremiah Jordan for
CASSANDRA-8936


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/642546ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/642546ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/642546ab

Branch: refs/heads/trunk
Commit: 642546abac7527b640e228cdeb1993c53fc71582
Parents: 50ea018
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Tue Mar 17 13:10:21 2015 -0700
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Mar 17 13:10:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/net/IMessageSink.java  |  37 +++++++
 .../apache/cassandra/net/MessagingService.java  |  54 +++++-----
 .../apache/cassandra/service/StorageProxy.java  |  32 ++----
 .../org/apache/cassandra/sink/IMessageSink.java |  42 --------
 .../org/apache/cassandra/sink/IRequestSink.java |  32 ------
 .../org/apache/cassandra/sink/SinkManager.java  | 104 -------------------
 .../apache/cassandra/repair/ValidatorTest.java  |  31 +++---
 .../apache/cassandra/service/RemoveTest.java    |   3 +-
 8 files changed, 88 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/net/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IMessageSink.java 
b/src/java/org/apache/cassandra/net/IMessageSink.java
new file mode 100644
index 0000000..5150901
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IMessageSink.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IMessageSink
+{
+    /**
+     * Allow or drop an outgoing message
+     *
+     * @return true if the message is allowed, false if it should be dropped
+     */
+    boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to);
+
+    /**
+     * Allow or drop an incoming message
+     *
+     * @return true if the message is allowed, false if it should be dropped
+     */
+    boolean allowIncomingMessage(MessageIn message, int id);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 1f952a3..fb699e4 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -25,6 +25,7 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -60,7 +61,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
-import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
@@ -308,10 +308,24 @@ public final class MessagingService implements 
MessagingServiceMBean
     // protocol versions of the other nodes in the cluster
     private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
 
+    // message sinks are a testing hook
+    private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
+
+    public void addMessageSink(IMessageSink sink)
+    {
+        messageSinks.add(sink);
+    }
+
+    public void clearMessageSinks()
+    {
+        messageSinks.clear();
+    }
+
     private static class MSHandle
     {
         public static final MessagingService instance = new MessagingService();
     }
+
     public static MessagingService instance()
     {
         return MSHandle.instance;
@@ -672,17 +686,15 @@ public final class MessagingService implements 
MessagingServiceMBean
             logger.trace("Message-to-self {} going over MessagingService", 
message);
 
         // message sinks are a testing hook
-        MessageOut processedMessage = 
SinkManager.processOutboundMessage(message, id, to);
-        if (processedMessage == null)
-        {
-            return;
-        }
+        for (IMessageSink ms : messageSinks)
+            if (!ms.allowOutgoingMessage(message, id, to))
+                return;
 
         // get pooled connection (really, connection queue)
-        OutboundTcpConnection connection = getConnection(to, processedMessage);
+        OutboundTcpConnection connection = getConnection(to, message);
 
         // write it
-        connection.enqueue(processedMessage, id);
+        connection.enqueue(message, id);
     }
 
     public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)
@@ -732,13 +744,10 @@ public final class MessagingService implements 
MessagingServiceMBean
         if (state != null)
             state.trace("Message received from {}", message.from);
 
-        Verb verb = message.verb;
-        message = SinkManager.processInboundMessage(message, id);
-        if (message == null)
-        {
-            incrementRejectedMessages(verb);
-            return;
-        }
+        // message sinks are a testing hook
+        for (IMessageSink ms : messageSinks)
+            if (!ms.allowIncomingMessage(message, id))
+                return;
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
         TracingAwareExecutorService stage = 
StageManager.getStage(message.getMessageType());
@@ -855,27 +864,12 @@ public final class MessagingService implements 
MessagingServiceMBean
         return versions.containsKey(endpoint);
     }
 
-
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not 
legally be dropped";
         droppedMessages.get(verb).dropped.mark();
     }
 
-    /**
-     * Same as incrementDroppedMessages(), but allows non-droppable verbs. 
Called for IMessageSink-caused message drops.
-     */
-    private void incrementRejectedMessages(Verb verb)
-    {
-        DroppedMessageMetrics metrics = droppedMessages.get(verb);
-        if (metrics == null)
-        {
-            metrics = new DroppedMessageMetrics(verb);
-            droppedMessages.put(verb, metrics);
-        }
-        metrics.dropped.mark();
-    }
-
     private void logDroppedMessages()
     {
         boolean logTpstats = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index d667b1e..45c4c4d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
-import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
@@ -1054,19 +1053,15 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void runMayThrow()
             {
-                IMutation processed = 
SinkManager.processWriteRequest(mutation);
-                if (processed != null)
+                try
                 {
-                    try 
-                    {
-                        ((Mutation) processed).apply();
-                        responseHandler.response(null);
-                    }
-                    catch (Exception ex)
-                    {
-                        logger.error("Failed to apply mutation locally : {}", 
ex.getMessage());
-                        
responseHandler.onFailure(FBUtilities.getBroadcastAddress());
-                    }
+                    mutation.apply();
+                    responseHandler.response(null);
+                }
+                catch (Exception ex)
+                {
+                    logger.error("Failed to apply mutation locally : {}", 
ex.getMessage());
+                    
responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
         });
@@ -1177,18 +1172,13 @@ public class StorageProxy implements StorageProxyMBean
             @Override
             public void runMayThrow() throws OverloadedException, 
WriteTimeoutException
             {
-                IMutation processed = 
SinkManager.processWriteRequest(mutation);
-                if (processed == null)
-                    return;
-
-                assert processed instanceof CounterMutation;
-                CounterMutation cm = (CounterMutation) processed;
+                assert mutation instanceof CounterMutation;
 
-                Mutation result = cm.apply();
+                Mutation result = ((CounterMutation) mutation).apply();
                 responseHandler.response(null);
 
                 Set<InetAddress> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
-                            
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+                                                           
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
                 if (!remotes.isEmpty())
                     sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IMessageSink.java 
b/src/java/org/apache/cassandra/sink/IMessageSink.java
deleted file mode 100644
index 996e7ff..0000000
--- a/src/java/org/apache/cassandra/sink/IMessageSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.sink;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-public interface IMessageSink
-{
-    /**
-     * Transform or drop an outgoing message
-     *
-     * @return null if the message is dropped, or the transformed message to 
send, which may be just
-     * the original message
-     */
-    MessageOut handleMessage(MessageOut message, int id, InetAddress to);
-
-    /**
-     * Transform or drop an incoming message
-     *
-     * @return null if the message is dropped, or the transformed message to 
receive, which may be just
-     * the original message
-     */
-    MessageIn handleMessage(MessageIn message, int id, InetAddress to);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/IRequestSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java 
b/src/java/org/apache/cassandra/sink/IRequestSink.java
deleted file mode 100644
index 2873e46..0000000
--- a/src/java/org/apache/cassandra/sink/IRequestSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.sink;
-
-import org.apache.cassandra.db.IMutation;
-
-public interface IRequestSink
-{
-    /**
-     * Transform or drop a write request (represented by a Mutation).
-     *
-     * @param mutation the Mutation to be applied locally.
-     * @return null if the mutation is to be dropped, or the transformed 
mutation to apply, which may be just
-     * the original mutation.
-     */
-    IMutation handleWriteRequest(IMutation mutation);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/sink/SinkManager.java 
b/src/java/org/apache/cassandra/sink/SinkManager.java
deleted file mode 100644
index 303c107..0000000
--- a/src/java/org/apache/cassandra/sink/SinkManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.sink;
-
-import java.net.InetAddress;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-
-/**
- * a class used only for testing to avoid sending/receiving data from a junit 
test.
- * needs to be in the source tree as MessagingService calls it directly.
- */
-public class SinkManager
-{
-    private static final Set<IMessageSink> messageSinks = new 
CopyOnWriteArraySet<>();
-    private static final Set<IRequestSink> requestSinks = new 
CopyOnWriteArraySet<>();
-
-    public static void add(IMessageSink ms)
-    {
-        messageSinks.add(ms);
-    }
-
-    public static void add(IRequestSink rs)
-    {
-        requestSinks.add(rs);
-    }
-
-    public static void remove(IMessageSink ms)
-    {
-        messageSinks.remove(ms);
-    }
-
-    public static void remove(IRequestSink rs)
-    {
-        requestSinks.remove(rs);
-    }
-
-    public static void clear()
-    {
-        messageSinks.clear();
-        requestSinks.clear();
-    }
-
-    public static MessageOut processOutboundMessage(MessageOut message, int 
id, InetAddress to)
-    {
-        if (messageSinks.isEmpty())
-            return message;
-
-        for (IMessageSink ms : messageSinks)
-        {
-            message = ms.handleMessage(message, id, to);
-            if (message == null)
-                return null;
-        }
-        return message;
-    }
-
-    public static MessageIn processInboundMessage(MessageIn message, int id)
-    {
-        if (messageSinks.isEmpty())
-            return message;
-
-        for (IMessageSink ms : messageSinks)
-        {
-            message = ms.handleMessage(message, id, null);
-            if (message == null)
-                return null;
-        }
-        return message;
-    }
-
-    public static IMutation processWriteRequest(IMutation mutation)
-    {
-        if (requestSinks.isEmpty())
-            return mutation;
-
-        for (IRequestSink rs : requestSinks)
-        {
-            mutation = rs.handleWriteRequest(mutation);
-            if (mutation == null)
-                return null;
-        }
-        return mutation;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java 
b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 615167c..a9f18f5 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -43,8 +43,7 @@ import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.sink.IMessageSink;
-import org.apache.cassandra.sink.SinkManager;
+import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.service.StorageService;
@@ -73,7 +72,7 @@ public class ValidatorTest
     @After
     public void tearDown()
     {
-        SinkManager.clear();
+        MessagingService.instance().clearMessageSinks();
     }
 
     @Test
@@ -83,10 +82,10 @@ public class ValidatorTest
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, range);
 
         final SimpleCondition lock = new SimpleCondition();
-        SinkManager.add(new IMessageSink()
+        MessagingService.instance().addMessageSink(new IMessageSink()
         {
             @SuppressWarnings("unchecked")
-            public MessageOut handleMessage(MessageOut message, int id, 
InetAddress to)
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
             {
                 try
                 {
@@ -95,20 +94,20 @@ public class ValidatorTest
                         RepairMessage m = (RepairMessage) message.payload;
                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, 
m.messageType);
                         assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete)m).success);
-                        assertNotNull(((ValidationComplete)m).tree);
+                        assertTrue(((ValidationComplete) m).success);
+                        assertNotNull(((ValidationComplete) m).tree);
                     }
                 }
                 finally
                 {
                     lock.signalAll();
                 }
-                return null;
+                return false;
             }
 
-            public MessageIn handleMessage(MessageIn message, int id, 
InetAddress to)
+            public boolean allowIncomingMessage(MessageIn message, int id)
             {
-                return null;
+                return false;
             }
         });
 
@@ -165,10 +164,10 @@ public class ValidatorTest
         final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), keyspace, columnFamily, range);
 
         final SimpleCondition lock = new SimpleCondition();
-        SinkManager.add(new IMessageSink()
+        MessagingService.instance().addMessageSink(new IMessageSink()
         {
             @SuppressWarnings("unchecked")
-            public MessageOut handleMessage(MessageOut message, int id, 
InetAddress to)
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
             {
                 try
                 {
@@ -178,19 +177,19 @@ public class ValidatorTest
                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, 
m.messageType);
                         assertEquals(desc, m.desc);
                         assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete)m).tree);
+                        assertNull(((ValidationComplete) m).tree);
                     }
                 }
                 finally
                 {
                     lock.signalAll();
                 }
-                return null;
+                return false;
             }
 
-            public MessageIn handleMessage(MessageIn message, int id, 
InetAddress to)
+            public boolean allowIncomingMessage(MessageIn message, int id)
             {
-                return null;
+                return false;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java 
b/test/unit/org/apache/cassandra/service/RemoveTest.java
index e5e4620..6d7cac8 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -90,7 +89,7 @@ public class RemoveTest
     @After
     public void tearDown()
     {
-        SinkManager.clear();
+        MessagingService.instance().clearMessageSinks();
         MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
     }

Reply via email to