Repository: cassandra Updated Branches: refs/heads/trunk 50ea0182e -> 642546aba
Clean up I*Sink testing hooks patch by Aleksey Yeschenko; reviewed by Jeremiah Jordan for CASSANDRA-8936 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/642546ab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/642546ab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/642546ab Branch: refs/heads/trunk Commit: 642546abac7527b640e228cdeb1993c53fc71582 Parents: 50ea018 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Mar 17 13:10:21 2015 -0700 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Mar 17 13:10:21 2015 -0700 ---------------------------------------------------------------------- .../org/apache/cassandra/net/IMessageSink.java | 37 +++++++ .../apache/cassandra/net/MessagingService.java | 54 +++++----- .../apache/cassandra/service/StorageProxy.java | 32 ++---- .../org/apache/cassandra/sink/IMessageSink.java | 42 -------- .../org/apache/cassandra/sink/IRequestSink.java | 32 ------ .../org/apache/cassandra/sink/SinkManager.java | 104 ------------------- .../apache/cassandra/repair/ValidatorTest.java | 31 +++--- .../apache/cassandra/service/RemoveTest.java | 3 +- 8 files changed, 88 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/net/IMessageSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IMessageSink.java b/src/java/org/apache/cassandra/net/IMessageSink.java new file mode 100644 index 0000000..5150901 --- /dev/null +++ b/src/java/org/apache/cassandra/net/IMessageSink.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +public interface IMessageSink +{ + /** + * Allow or drop an outgoing message + * + * @return true if the message is allowed, false if it should be dropped + */ + boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to); + + /** + * Allow or drop an incoming message + * + * @return true if the message is allowed, false if it should be dropped + */ + boolean allowIncomingMessage(MessageIn message, int id); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 1f952a3..fb699e4 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -25,6 +25,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; import java.util.*; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -60,7 +61,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; -import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; @@ -308,10 +308,24 @@ public final class MessagingService implements MessagingServiceMBean // protocol versions of the other nodes in the cluster private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); + // message sinks are a testing hook + private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>(); + + public void addMessageSink(IMessageSink sink) + { + messageSinks.add(sink); + } + + public void clearMessageSinks() + { + messageSinks.clear(); + } + private static class MSHandle { public static final MessagingService instance = new MessagingService(); } + public static MessagingService instance() { return MSHandle.instance; @@ -672,17 +686,15 @@ public final class MessagingService implements MessagingServiceMBean logger.trace("Message-to-self {} going over MessagingService", message); // message sinks are a testing hook - MessageOut processedMessage = SinkManager.processOutboundMessage(message, id, to); - if (processedMessage == null) - { - return; - } + for (IMessageSink ms : messageSinks) + if (!ms.allowOutgoingMessage(message, id, to)) + return; // get pooled connection (really, connection queue) - OutboundTcpConnection connection = getConnection(to, processedMessage); + OutboundTcpConnection connection = getConnection(to, message); // write it - connection.enqueue(processedMessage, id); + connection.enqueue(message, id); } public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to) @@ -732,13 +744,10 @@ public final class MessagingService implements MessagingServiceMBean if (state != null) state.trace("Message received from {}", message.from); - Verb verb = message.verb; - message = SinkManager.processInboundMessage(message, id); - if (message == null) - { - incrementRejectedMessages(verb); - return; - } + // message sinks are a testing hook + for (IMessageSink ms : messageSinks) + if (!ms.allowIncomingMessage(message, id)) + return; Runnable runnable = new MessageDeliveryTask(message, id, timestamp); TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType()); @@ -855,27 +864,12 @@ public final class MessagingService implements MessagingServiceMBean return versions.containsKey(endpoint); } - public void incrementDroppedMessages(Verb verb) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; droppedMessages.get(verb).dropped.mark(); } - /** - * Same as incrementDroppedMessages(), but allows non-droppable verbs. Called for IMessageSink-caused message drops. - */ - private void incrementRejectedMessages(Verb verb) - { - DroppedMessageMetrics metrics = droppedMessages.get(verb); - if (metrics == null) - { - metrics = new DroppedMessageMetrics(verb); - droppedMessages.put(verb, metrics); - } - metrics.dropped.mark(); - } - private void logDroppedMessages() { boolean logTpstats = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index d667b1e..45c4c4d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -60,7 +60,6 @@ import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; -import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; @@ -1054,19 +1053,15 @@ public class StorageProxy implements StorageProxyMBean { public void runMayThrow() { - IMutation processed = SinkManager.processWriteRequest(mutation); - if (processed != null) + try { - try - { - ((Mutation) processed).apply(); - responseHandler.response(null); - } - catch (Exception ex) - { - logger.error("Failed to apply mutation locally : {}", ex.getMessage()); - responseHandler.onFailure(FBUtilities.getBroadcastAddress()); - } + mutation.apply(); + responseHandler.response(null); + } + catch (Exception ex) + { + logger.error("Failed to apply mutation locally : {}", ex.getMessage()); + responseHandler.onFailure(FBUtilities.getBroadcastAddress()); } } }); @@ -1177,18 +1172,13 @@ public class StorageProxy implements StorageProxyMBean @Override public void runMayThrow() throws OverloadedException, WriteTimeoutException { - IMutation processed = SinkManager.processWriteRequest(mutation); - if (processed == null) - return; - - assert processed instanceof CounterMutation; - CounterMutation cm = (CounterMutation) processed; + assert mutation instanceof CounterMutation; - Mutation result = cm.apply(); + Mutation result = ((CounterMutation) mutation).apply(); responseHandler.response(null); Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), - ImmutableSet.of(FBUtilities.getBroadcastAddress())); + ImmutableSet.of(FBUtilities.getBroadcastAddress())); if (!remotes.isEmpty()) sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/IMessageSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/IMessageSink.java b/src/java/org/apache/cassandra/sink/IMessageSink.java deleted file mode 100644 index 996e7ff..0000000 --- a/src/java/org/apache/cassandra/sink/IMessageSink.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.sink; - -import java.net.InetAddress; - -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; - -public interface IMessageSink -{ - /** - * Transform or drop an outgoing message - * - * @return null if the message is dropped, or the transformed message to send, which may be just - * the original message - */ - MessageOut handleMessage(MessageOut message, int id, InetAddress to); - - /** - * Transform or drop an incoming message - * - * @return null if the message is dropped, or the transformed message to receive, which may be just - * the original message - */ - MessageIn handleMessage(MessageIn message, int id, InetAddress to); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/IRequestSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java b/src/java/org/apache/cassandra/sink/IRequestSink.java deleted file mode 100644 index 2873e46..0000000 --- a/src/java/org/apache/cassandra/sink/IRequestSink.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.sink; - -import org.apache.cassandra.db.IMutation; - -public interface IRequestSink -{ - /** - * Transform or drop a write request (represented by a Mutation). - * - * @param mutation the Mutation to be applied locally. - * @return null if the mutation is to be dropped, or the transformed mutation to apply, which may be just - * the original mutation. - */ - IMutation handleWriteRequest(IMutation mutation); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/src/java/org/apache/cassandra/sink/SinkManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/SinkManager.java b/src/java/org/apache/cassandra/sink/SinkManager.java deleted file mode 100644 index 303c107..0000000 --- a/src/java/org/apache/cassandra/sink/SinkManager.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.sink; - -import java.net.InetAddress; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -import org.apache.cassandra.db.IMutation; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; - -/** - * a class used only for testing to avoid sending/receiving data from a junit test. - * needs to be in the source tree as MessagingService calls it directly. - */ -public class SinkManager -{ - private static final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>(); - private static final Set<IRequestSink> requestSinks = new CopyOnWriteArraySet<>(); - - public static void add(IMessageSink ms) - { - messageSinks.add(ms); - } - - public static void add(IRequestSink rs) - { - requestSinks.add(rs); - } - - public static void remove(IMessageSink ms) - { - messageSinks.remove(ms); - } - - public static void remove(IRequestSink rs) - { - requestSinks.remove(rs); - } - - public static void clear() - { - messageSinks.clear(); - requestSinks.clear(); - } - - public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to) - { - if (messageSinks.isEmpty()) - return message; - - for (IMessageSink ms : messageSinks) - { - message = ms.handleMessage(message, id, to); - if (message == null) - return null; - } - return message; - } - - public static MessageIn processInboundMessage(MessageIn message, int id) - { - if (messageSinks.isEmpty()) - return message; - - for (IMessageSink ms : messageSinks) - { - message = ms.handleMessage(message, id, null); - if (message == null) - return null; - } - return message; - } - - public static IMutation processWriteRequest(IMutation mutation) - { - if (requestSinks.isEmpty()) - return mutation; - - for (IRequestSink rs : requestSinks) - { - mutation = rs.handleWriteRequest(mutation); - if (mutation == null) - return null; - } - return mutation; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 615167c..a9f18f5 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -43,8 +43,7 @@ import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.sink.IMessageSink; -import org.apache.cassandra.sink.SinkManager; +import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.service.StorageService; @@ -73,7 +72,7 @@ public class ValidatorTest @After public void tearDown() { - SinkManager.clear(); + MessagingService.instance().clearMessageSinks(); } @Test @@ -83,10 +82,10 @@ public class ValidatorTest final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); final SimpleCondition lock = new SimpleCondition(); - SinkManager.add(new IMessageSink() + MessagingService.instance().addMessageSink(new IMessageSink() { @SuppressWarnings("unchecked") - public MessageOut handleMessage(MessageOut message, int id, InetAddress to) + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) { try { @@ -95,20 +94,20 @@ public class ValidatorTest RepairMessage m = (RepairMessage) message.payload; assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); assertEquals(desc, m.desc); - assertTrue(((ValidationComplete)m).success); - assertNotNull(((ValidationComplete)m).tree); + assertTrue(((ValidationComplete) m).success); + assertNotNull(((ValidationComplete) m).tree); } } finally { lock.signalAll(); } - return null; + return false; } - public MessageIn handleMessage(MessageIn message, int id, InetAddress to) + public boolean allowIncomingMessage(MessageIn message, int id) { - return null; + return false; } }); @@ -165,10 +164,10 @@ public class ValidatorTest final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); final SimpleCondition lock = new SimpleCondition(); - SinkManager.add(new IMessageSink() + MessagingService.instance().addMessageSink(new IMessageSink() { @SuppressWarnings("unchecked") - public MessageOut handleMessage(MessageOut message, int id, InetAddress to) + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) { try { @@ -178,19 +177,19 @@ public class ValidatorTest assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); assertEquals(desc, m.desc); assertFalse(((ValidationComplete) m).success); - assertNull(((ValidationComplete)m).tree); + assertNull(((ValidationComplete) m).tree); } } finally { lock.signalAll(); } - return null; + return false; } - public MessageIn handleMessage(MessageIn message, int id, InetAddress to) + public boolean allowIncomingMessage(MessageIn message, int id) { - return null; + return false; } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/642546ab/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index e5e4620..6d7cac8 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -39,7 +39,6 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; @@ -90,7 +89,7 @@ public class RemoveTest @After public void tearDown() { - SinkManager.clear(); + MessagingService.instance().clearMessageSinks(); MessagingService.instance().clearCallbacksUnsafe(); MessagingService.instance().shutdown(); }