http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 630dc5d..439d8cf 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service; import java.io.File; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.nio.file.Paths; import java.util.*; @@ -26,9 +25,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheLoader; @@ -123,16 +119,7 @@ public class StorageProxy implements StorageProxyMBean static { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(instance, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - + MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME); HintsService.instance.registerMBean(); HintedHandOffManager.instance.registerMBean();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e7ca4be..68dba93 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -231,7 +231,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>(); - private final ObjectName jmxObjectName; + private final String jmxObjectName; private Collection<Token> bootstrapTokens = null; @@ -270,17 +270,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // use dedicated executor for handling JMX notifications super(JMXBroadcastExecutor.executor); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService"); - mbs.registerMBean(this, jmxObjectName); - mbs.registerMBean(StreamManager.instance, new ObjectName(StreamManager.OBJECT_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + jmxObjectName = "org.apache.cassandra.db:type=StorageService"; + MBeanWrapper.instance.registerMBean(this, jmxObjectName); + MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME); ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 151eb18..50d35bc 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -471,6 +471,32 @@ public class ByteBufferUtil return bytes.getDouble(bytes.position()); } + public static ByteBuffer objectToBytes(Object obj) + { + if (obj instanceof Integer) + return ByteBufferUtil.bytes((int) obj); + else if (obj instanceof Byte) + return ByteBufferUtil.bytes((byte) obj); + else if (obj instanceof Short) + return ByteBufferUtil.bytes((short) obj); + else if (obj instanceof Long) + return ByteBufferUtil.bytes((long) obj); + else if (obj instanceof Float) + return ByteBufferUtil.bytes((float) obj); + else if (obj instanceof Double) + return ByteBufferUtil.bytes((double) obj); + else if (obj instanceof UUID) + return ByteBufferUtil.bytes((UUID) obj); + else if (obj instanceof InetAddress) + return ByteBufferUtil.bytes((InetAddress) obj); + else if (obj instanceof String) + return ByteBufferUtil.bytes((String) obj); + else + throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s", + obj, + obj.getClass())); + } + public static ByteBuffer bytes(byte b) { return ByteBuffer.allocate(1).put(0, b); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/MBeanWrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java new file mode 100644 index 0000000..edee6af --- /dev/null +++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.lang.management.ManagementFactory; +import java.util.function.Consumer; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class to avoid catching and rethrowing checked exceptions on MBean and + * allow turning of MBean registration for test purposes. + */ +public interface MBeanWrapper +{ + static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class); + + static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ? + new NoOpMBeanWrapper() : + new PlatformMBeanWrapper(); + + // Passing true for graceful will log exceptions instead of rethrowing them + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException); + default void registerMBean(Object obj, ObjectName mbeanName) + { + registerMBean(obj, mbeanName, OnException.THROW); + } + + public void registerMBean(Object obj, String mbeanName, OnException onException); + default void registerMBean(Object obj, String mbeanName) + { + registerMBean(obj, mbeanName, OnException.THROW); + } + + public boolean isRegistered(ObjectName mbeanName, OnException onException); + default boolean isRegistered(ObjectName mbeanName) + { + return isRegistered(mbeanName, OnException.THROW); + } + + public boolean isRegistered(String mbeanName, OnException onException); + default boolean isRegistered(String mbeanName) + { + return isRegistered(mbeanName, OnException.THROW); + } + + public void unregisterMBean(ObjectName mbeanName, OnException onException); + default void unregisterMBean(ObjectName mbeanName) + { + unregisterMBean(mbeanName, OnException.THROW); + } + + public void unregisterMBean(String mbeanName, OnException onException); + default void unregisterMBean(String mbeanName) + { + unregisterMBean(mbeanName, OnException.THROW); + } + + static class NoOpMBeanWrapper implements MBeanWrapper + { + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {} + public void registerMBean(Object obj, String mbeanName, OnException onException) {} + public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; } + public boolean isRegistered(String mbeanName, OnException onException) { return false; } + public void unregisterMBean(ObjectName mbeanName, OnException onException) {} + public void unregisterMBean(String mbeanName, OnException onException) {} + } + + static class PlatformMBeanWrapper implements MBeanWrapper + { + private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) + { + try + { + mbs.registerMBean(obj, mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + + public void registerMBean(Object obj, String mbeanName, OnException onException) + { + try + { + mbs.registerMBean(obj, new ObjectName(mbeanName)); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + + public boolean isRegistered(ObjectName mbeanName, OnException onException) + { + try + { + return mbs.isRegistered(mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + return false; + } + + public boolean isRegistered(String mbeanName, OnException onException) + { + try + { + return mbs.isRegistered(new ObjectName(mbeanName)); + } + catch (Exception e) + { + onException.handler.accept(e); + } + return false; + } + + public void unregisterMBean(ObjectName mbeanName, OnException onException) + { + try + { + mbs.unregisterMBean(mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + + public void unregisterMBean(String mbeanName, OnException onException) + { + try + { + mbs.unregisterMBean(new ObjectName(mbeanName)); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + } + + public enum OnException + { + THROW(e -> { throw new RuntimeException(e); }), + LOG(e -> { logger.error("Error in MBean wrapper: ", e); }), + IGNORE(e -> {}); + + private Consumer<Exception> handler; + OnException(Consumer<Exception> handler) + { + this.handler = handler; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/Mx4jTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java b/src/java/org/apache/cassandra/utils/Mx4jTool.java index 5baaea2..eda6354 100644 --- a/src/java/org/apache/cassandra/utils/Mx4jTool.java +++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.utils; -import java.lang.management.ManagementFactory; -import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.commons.lang3.StringUtils; @@ -44,7 +42,7 @@ public class Mx4jTool try { logger.trace("Will try to load mx4j now, if it's in the classpath"); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + MBeanWrapper mbs = MBeanWrapper.instance; ObjectName processorName = new ObjectName("Server:name=XSLTProcessor"); Class<?> httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/concurrent/Ref.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java index b69e6bf..1a17a1f 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java @@ -31,6 +31,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -354,11 +355,10 @@ public final class Ref<T> implements RefCounted<T> static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>()); private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>()); static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>(); - private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper")); + private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("Reference-Reaper", Ref::reapOneReference).start(); static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new NamedThreadFactory("Strong-Reference-Leak-Detector")); static { - EXEC.execute(new ReferenceReaper()); if (DEBUG_ENABLED) { STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, TimeUnit.MINUTES); @@ -367,28 +367,12 @@ public final class Ref<T> implements RefCounted<T> concurrentIterables.addAll(Arrays.asList(concurrentIterableClasses)); } - static final class ReferenceReaper implements Runnable + private static void reapOneReference() throws InterruptedException { - public void run() + Object obj = referenceQueue.remove(100); + if (obj instanceof Ref.State) { - try - { - while (true) - { - Object obj = referenceQueue.remove(); - if (obj instanceof Ref.State) - { - ((Ref.State) obj).release(true); - } - } - } - catch (InterruptedException e) - { - } - finally - { - EXEC.execute(this); - } + ((Ref.State) obj).release(true); } } @@ -719,4 +703,11 @@ public final class Ref<T> implements RefCounted<T> } } } + + @VisibleForTesting + public static void shutdownReferenceReaper() throws InterruptedException + { + EXEC.shutdown(); + EXEC.awaitTermination(60, TimeUnit.SECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index f9ec40c..c8ad078 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -27,11 +27,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocal; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.FileUtils; @@ -493,34 +493,16 @@ public class BufferPool private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>(); private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>(); - private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner")); - static + private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start(); + + private static void cleanupOneReference() throws InterruptedException { - EXEC.execute(new Runnable() + Object obj = localPoolRefQueue.remove(100); + if (obj instanceof LocalPoolRef) { - public void run() - { - try - { - while (true) - { - Object obj = localPoolRefQueue.remove(); - if (obj instanceof LocalPoolRef) - { - ((LocalPoolRef) obj).release(); - localPoolReferences.remove(obj); - } - } - } - catch (InterruptedException e) - { - } - finally - { - EXEC.execute(this); - } - } - }); + ((LocalPoolRef) obj).release(); + localPoolReferences.remove(obj); + } } private static ByteBuffer allocateDirectAligned(int capacity) @@ -872,4 +854,11 @@ public class BufferPool int mask = unit - 1; return (size + mask) & ~mask; } + + @VisibleForTesting + public static void shutdownLocalCleaner() throws InterruptedException + { + EXEC.shutdown(); + EXEC.awaitTermination(60, TimeUnit.SECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java index 5a90463..b905d2c 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java @@ -18,57 +18,70 @@ */ package org.apache.cassandra.utils.memory; +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; import org.apache.cassandra.utils.concurrent.WaitQueue; /** * A thread that reclaims memory from a MemtablePool on demand. The actual reclaiming work is delegated to the * cleaner Runnable, e.g., FlushLargestColumnFamily */ -class MemtableCleanerThread<P extends MemtablePool> extends Thread +public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopExecutor { - /** The pool we're cleaning */ - final P pool; - - /** should ensure that at least some memory has been marked reclaiming after completion */ - final Runnable cleaner; + private static class Clean<P extends MemtablePool> implements InterruptibleRunnable + { + /** The pool we're cleaning */ + final P pool; - /** signalled whenever needsCleaning() may return true */ - final WaitQueue wait = new WaitQueue(); + /** should ensure that at least some memory has been marked reclaiming after completion */ + final Runnable cleaner; - MemtableCleanerThread(P pool, Runnable cleaner) - { - super(pool.getClass().getSimpleName() + "Cleaner"); - this.pool = pool; - this.cleaner = cleaner; - setDaemon(true); - } + /** signalled whenever needsCleaning() may return true */ + final WaitQueue wait = new WaitQueue(); - boolean needsCleaning() - { - return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning(); - } + private Clean(P pool, Runnable cleaner) + { + this.pool = pool; + this.cleaner = cleaner; + } - // should ONLY be called when we really think it already needs cleaning - void trigger() - { - wait.signal(); - } + boolean needsCleaning() + { + return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning(); + } - @Override - public void run() - { - while (true) + @Override + public void run() throws InterruptedException { - while (!needsCleaning()) + if (needsCleaning()) + { + cleaner.run(); + } + else { final WaitQueue.Signal signal = wait.register(); if (!needsCleaning()) - signal.awaitUninterruptibly(); + signal.await(); else signal.cancel(); } - - cleaner.run(); } } + + private final Runnable trigger; + private MemtableCleanerThread(Clean<P> clean) + { + super(clean.pool.getClass().getSimpleName() + "Cleaner", clean); + this.trigger = clean.wait::signal; + } + + MemtableCleanerThread(P pool, Runnable cleaner) + { + this(new Clean<>(pool, cleaner)); + } + + // should ONLY be called when we really think it already needs cleaning + public void trigger() + { + trigger.run(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtablePool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java index c082856..684db93 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@ -18,8 +18,11 @@ */ package org.apache.cassandra.utils.memory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import com.google.common.annotations.VisibleForTesting; + import com.codahale.metrics.Timer; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; @@ -63,6 +66,12 @@ public abstract class MemtablePool return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner); } + public void shutdown() throws InterruptedException + { + cleaner.shutdown(); + cleaner.awaitTermination(60, TimeUnit.SECONDS); + } + public abstract MemtableAllocator newAllocator(); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/conf/logback-dtest.xml ---------------------------------------------------------------------- diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml new file mode 100644 index 0000000..b8019f6 --- /dev/null +++ b/test/conf/logback-dtest.xml @@ -0,0 +1,79 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="instance_id" class="org.apache.cassandra.distributed.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <!-- Status listener is used to wrap stdout/stderr and tee to log file --> + <statusListener class="org.apache.cassandra.LogbackStatusListener" /> + + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + + <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>20</maxIndex> + </rollingPolicy> + + <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>20MB</maxFileSize> + </triggeringPolicy> + + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + <immediateFlush>false</immediateFlush> + </encoder> + </appender> + + <appender name="STDOUT" target="System.out" class="org.apache.cassandra.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <appender name="TEE" class="org.apache.cassandra.TeeingAppender"> + <appender-ref ref="FILE"/> + <appender-ref ref="STDOUT"/> + </appender> + + <logger name="org.apache.hadoop" level="WARN"/> + + <logger name="org.apache.cassandra.db.monitoring" level="DEBUG"/> + + <!-- Do not change the name of this appender. LogbackStatusListener uses the thread name + tied to the appender name to know when to write to real stdout/stderr vs forwarding to logback --> + <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> + <discardingThreshold>0</discardingThreshold> + <maxFlushTime>0</maxFlushTime> + <queueSize>1024</queueSize> + <appender-ref ref="TEE"/> + <includeCallerData>true</includeCallerData> + </appender> + + <root level="DEBUG"> + <appender-ref ref="ASYNC" /> + </root> +</configuration> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Coordinator.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/Coordinator.java new file mode 100644 index 0000000..91ab480 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/Coordinator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; + + +public class Coordinator +{ + final Instance instance; + + public Coordinator(Instance instance) + { + this.instance = instance; + } + + private static Object[][] coordinatorExecute(String query, int consistencyLevel, Object[] bindings) + { + CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()); + List<ByteBuffer> boundValues = new ArrayList<>(); + for (Object binding : bindings) + { + boundValues.add(ByteBufferUtil.objectToBytes(binding)); + } + + ResultMessage res = prepared.execute(QueryState.forInternalCalls(), + QueryOptions.create(ConsistencyLevel.fromCode(consistencyLevel), + boundValues, + false, + 10, + null, + null, + ProtocolVersion.V4, + null), + System.nanoTime()); + + if (res != null && res.kind == ResultMessage.Kind.ROWS) + { + return RowUtil.toObjects((ResultMessage.Rows) res); + } + else + { + return new Object[][]{}; + } + } + + public Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues) + { + return instance.appliesOnInstance(Coordinator::coordinatorExecute).apply(query, consistencyLevel.code, boundValues); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java new file mode 100644 index 0000000..a61c8af --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.ConsistencyLevel; + +import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR; + +public class DistributedReadWritePathTest extends DistributedTestBase +{ + @Test + public void coordinatorRead() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + 1), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + } + } + + @Test + public void coordinatorWrite() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'"); + + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)", + ConsistencyLevel.QUORUM); + + for (int i = 0; i < 3; i++) + { + assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + } + } + + @Test + public void readRepairTest() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + + // Verify that data got repaired to the third node + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, 1)); + } + } + + @Test + public void failingReadRepairTest() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + + cluster.verbs(READ_REPAIR).to(3).drop(); + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.QUORUM), + row(1, 1, 1)); + + // Data was not repaired + assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1")); + } + } + + @Test + public void writeWithSchemaDisagreement() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", + ConsistencyLevel.QUORUM); + } + catch (RuntimeException e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node")); + Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization")); + } + } + + @Test + public void readWithSchemaDisagreement() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Exception thrown = null; + try + { + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1, null)); + } + catch (Exception e) + { + thrown = e; + } + Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node")); + Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization")); + } + } + + @Test + public void reAddColumnAsStatic() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + for (int i = 1; i <= 3; i++) + { + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + 1, i, i); + } + + // Drop column + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1"); + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1), + row(1, 2), + row(1, 3)); + + // Drop column + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static"); + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, null), + row(1, 2, null), + row(1, 3, null)); + + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v1) VALUES (?, ?)", + ConsistencyLevel.ALL, + 1, 1); + + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1), + row(1, 2, 1), + row(1, 3, 1)); + } + } + + @Test + public void reAddColumnAsStaticDisagreementCoordinatorSide() throws Throwable + { + try (TestCluster cluster = createCluster(3)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + for (int i = 1; i <= 3; i++) + { + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + 1, i, i); + } + + // Drop column + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 1); + + Exception thrown = null; + try + { + cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of")); + + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 1); + + try + { + cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of")); + } + } + + @Test + public void reAddColumnAsStaticDisagreementReplicaSide() throws Throwable + { + try (TestCluster cluster = createCluster(2)) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"); + + for (int i = 1; i <= 3; i++) + { + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + 1, i, i); + } + + // Drop column on the replica + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 2); + + // Columns are going to be read and read-repaired as long as they're available + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + + assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1), + row(1, 2), + row(1, 3)); + + // Re-add as static on the replica + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 2); + + // Try reading + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + + // Make sure read-repair did not corrupt the data + assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"), + row(1, 1, null), + row(1, 2, null), + row(1, 3, null)); + + // Writing to the replica with disagreeing schema should not work + Exception thrown = null; + try + { + cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)", + ConsistencyLevel.ALL, + 1, 1, 5); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertNotNull(thrown); + + thrown = null; + + // If somehow replica got new data, reading that data should not be possible, either + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)", + 1, 1, 100); + + try + { + assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", + ConsistencyLevel.ALL), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + } + catch (Exception e) + { + thrown = e; + } + + Assert.assertNotNull(thrown); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java new file mode 100644 index 0000000..f873fce --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.BeforeClass; + +public class DistributedTestBase +{ + static String KEYSPACE = "distributed_test_keyspace"; + + @BeforeClass + public static void setup() + { + System.setProperty("org.apache.cassandra.disable_mbean_registration", "true"); + } + + TestCluster createCluster(int nodeCount) throws Throwable + { + TestCluster cluster = TestCluster.create(nodeCount); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + nodeCount + "};"); + + return cluster; + } + + public static void assertRows(Object[][] actual, Object[]... expected) + { + Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual), + expected.length, actual.length); + + for (int i = 0; i < expected.length; i++) + { + Object[] expectedRow = expected[i]; + Object[] actualRow = actual[i]; + Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected), + Arrays.equals(expectedRow, actualRow)); + } + } + + public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected) + { + return String.format("Expected: %s\nActual:%s\n", + rowsToString(expected), + rowsToString(actual)); + } + + public static String rowsToString(Object[][] rows) + { + StringBuilder builder = new StringBuilder(); + builder.append("["); + boolean isFirst = true; + for (Object[] row : rows) + { + if (isFirst) + isFirst = false; + else + builder.append(","); + builder.append(Arrays.toString(row)); + } + builder.append("]"); + return builder.toString(); + } + + public static Object[] row(Object... expected) + { + return expected; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Instance.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java new file mode 100644 index 0000000..f9ee5bb --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/Instance.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.SharedExecutorPool; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.SimpleSeedProvider; +import org.apache.cassandra.locator.SimpleSnitch; +import org.apache.cassandra.net.IMessageSink; +import org.apache.cassandra.net.MessageDeliveryTask; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.async.MessageInHandler; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.memory.BufferPool; + +public class Instance extends InvokableInstance +{ + public final InstanceConfig config; + + public Instance(InstanceConfig config, ClassLoader classLoader) + { + super(classLoader); + this.config = config; + } + + public InetAddressAndPort getBroadcastAddress() { return callOnInstance(FBUtilities::getBroadcastAddressAndPort); } + + public Object[][] executeInternal(String query, Object... args) + { + return callOnInstance(() -> + { + QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query); + ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(), + QueryProcessor.makeInternalOptions(prepared.statement, args)); + + if (result instanceof ResultMessage.Rows) + return RowUtil.toObjects((ResultMessage.Rows)result); + else + return null; + }); + } + + public UUID getSchemaVersion() + { + // we do not use method reference syntax here, because we need to invoke on the node-local schema instance + //noinspection Convert2MethodRef + return callOnInstance(() -> Schema.instance.getVersion()); + } + + public void schemaChange(String query) + { + runOnInstance(() -> + { + try + { + ClientState state = ClientState.forInternalCalls(SchemaConstants.SYSTEM_KEYSPACE_NAME); + QueryState queryState = new QueryState(state); + + CQLStatement statement = QueryProcessor.parseStatement(query, queryState.getClientState()); + statement.validate(state); + + QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList()); + statement.executeLocally(queryState, options); + } + catch (Exception e) + { + throw new RuntimeException("Error setting schema for test (query was: " + query + ")", e); + } + }); + } + + private void registerMockMessaging(TestCluster cluster) + { + BiConsumer<InetAddressAndPort, Message> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message); + BiConsumer<InetAddressAndPort, Message> deliverToInstanceIfNotFiltered = cluster.filters().filter(deliverToInstance); + + acceptsOnInstance((BiConsumer<InetAddressAndPort, Message> deliver) -> + MessagingService.instance().addMessageSink(new MessageDeliverySink(deliver)) + ).accept(deliverToInstanceIfNotFiltered); + } + + private static class MessageDeliverySink implements IMessageSink + { + private final BiConsumer<InetAddressAndPort, Message> deliver; + MessageDeliverySink(BiConsumer<InetAddressAndPort, Message> deliver) + { + this.deliver = deliver; + } + + public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddressAndPort to) + { + try (DataOutputBuffer out = new DataOutputBuffer(1024)) + { + InetAddressAndPort from = FBUtilities.getBroadcastAddressAndPort(); + messageOut.serialize(out, MessagingService.current_version); + deliver.accept(to, new Message(messageOut.verb.getId(), out.toByteArray(), id, MessagingService.current_version, from)); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + // we can filter to our heart's content on the outgoing message; no need to worry about incoming + return true; + } + } + + private void receiveMessage(Message message) + { + acceptsOnInstance((Message m) -> + { + try (DataInputBuffer in = new DataInputBuffer(m.bytes)) + { + MessageIn<?> messageIn = MessageInHandler.deserialize(in, m.id, m.version, m.from); + Runnable deliver = new MessageDeliveryTask(messageIn, m.id); + deliver.run(); + } + catch (Throwable t) + { + throw new RuntimeException("Exception occurred on the node " + FBUtilities.getBroadcastAddressAndPort(), t); + } + + }).accept(message); + } + + void launch(TestCluster cluster) + { + try + { + mkdirs(); + int id = config.num; + runOnInstance(() -> InstanceIDDefiner.instanceId = id); // for logging + + startup(); + initializeRing(cluster); + registerMockMessaging(cluster); + } + catch (Throwable t) + { + if (t instanceof RuntimeException) + throw (RuntimeException) t; + throw new RuntimeException(t); + } + } + + private void mkdirs() + { + new File(config.saved_caches_directory).mkdirs(); + new File(config.hints_directory).mkdirs(); + new File(config.commitlog_directory).mkdirs(); + for (String dir : config.data_file_directories) + new File(dir).mkdirs(); + } + + private void startup() + { + acceptsOnInstance((InstanceConfig config) -> + { + DatabaseDescriptor.daemonInitialization(() -> loadConfig(config)); + + DatabaseDescriptor.createAllDirectories(); + Keyspace.setInitialized(); + SystemKeyspace.persistLocalMetadata(); + }).accept(config); + } + + + public static Config loadConfig(InstanceConfig overrides) + { + Config config = new Config(); + // Defaults + config.commitlog_sync = Config.CommitLogSync.batch; + config.endpoint_snitch = SimpleSnitch.class.getName(); + config.seed_provider = new ParameterizedClass(SimpleSeedProvider.class.getName(), + Collections.singletonMap("seeds", "127.0.0.1:7010")); + config.diagnostic_events_enabled = true; // necessary for schema change monitoring + + // Overrides + config.partitioner = overrides.partitioner; + config.broadcast_address = overrides.broadcast_address; + config.listen_address = overrides.listen_address; + config.broadcast_rpc_address = overrides.broadcast_rpc_address; + config.rpc_address = overrides.rpc_address; + config.saved_caches_directory = overrides.saved_caches_directory; + config.data_file_directories = overrides.data_file_directories; + config.commitlog_directory = overrides.commitlog_directory; + config.hints_directory = overrides.hints_directory; + config.cdc_raw_directory = overrides.cdc_directory; + config.concurrent_writes = overrides.concurrent_writes; + config.concurrent_counter_writes = overrides.concurrent_counter_writes; + config.concurrent_materialized_view_writes = overrides.concurrent_materialized_view_writes; + config.concurrent_reads = overrides.concurrent_reads; + config.memtable_flush_writers = overrides.memtable_flush_writers; + config.concurrent_compactors = overrides.concurrent_compactors; + config.memtable_heap_space_in_mb = overrides.memtable_heap_space_in_mb; + config.initial_token = overrides.initial_token; + return config; + } + + private void initializeRing(TestCluster cluster) + { + // This should be done outside instance in order to avoid serializing config + String partitionerName = config.partitioner; + List<String> initialTokens = new ArrayList<>(); + List<InetAddressAndPort> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + for (int i = 1 ; i <= cluster.size() ; ++i) + { + InstanceConfig config = cluster.get(i).config; + initialTokens.add(config.initial_token); + try + { + hosts.add(InetAddressAndPort.getByName(config.broadcast_address)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + hostIds.add(config.hostId); + } + + runOnInstance(() -> + { + try + { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName); + StorageService storageService = StorageService.instance; + List<Token> tokens = new ArrayList<>(); + for (String token : initialTokens) + tokens.add(partitioner.getTokenFactory().fromString(token)); + + for (int i = 0; i < tokens.size(); i++) + { + InetAddressAndPort ep = hosts.get(i); + Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1); + Gossiper.instance.injectApplicationState(ep, + ApplicationState.TOKENS, + new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i)))); + storageService.onChange(ep, + ApplicationState.STATUS_WITH_PORT, + new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i)))); + storageService.onChange(ep, + ApplicationState.STATUS, + new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i)))); + Gossiper.instance.realMarkAlive(ep, Gossiper.instance.getEndpointStateForEndpoint(ep)); + MessagingService.instance().setVersion(ep, MessagingService.current_version); + } + + // check that all nodes are in token metadata + for (int i = 0; i < tokens.size(); ++i) + assert storageService.getTokenMetadata().isMember(hosts.get(i)); + } + catch (Throwable e) // UnknownHostException + { + throw new RuntimeException(e); + } + }); + } + + void shutdown() + { + runOnInstance(() -> { + Throwable error = null; + error = runAndMergeThrowable(error, + BatchlogManager.instance::shutdown, + HintsService.instance::shutdownBlocking, + CommitLog.instance::shutdownBlocking, + CompactionManager.instance::forceShutdown, + Gossiper.instance::stop, + SecondaryIndexManager::shutdownExecutors, + MessagingService.instance()::shutdown, + ColumnFamilyStore::shutdownFlushExecutor, + ColumnFamilyStore::shutdownPostFlushExecutor, + ColumnFamilyStore::shutdownReclaimExecutor, + ColumnFamilyStore::shutdownPerDiskFlushExecutors, + PendingRangeCalculatorService.instance::shutdownExecutor, + BufferPool::shutdownLocalCleaner, + Ref::shutdownReferenceReaper, + StageManager::shutdownAndWait, + SharedExecutorPool::shutdownSharedPool, + Memtable.MEMORY_POOL::shutdown, + ScheduledExecutors::shutdownAndWait); + error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor); + Throwables.maybeFail(error); + }); + } + + private static Throwable shutdownAndWait(Throwable existing, ExecutorService executor) + { + return runAndMergeThrowable(existing, () -> { + executor.shutdownNow(); + executor.awaitTermination(5, TimeUnit.SECONDS); + assert executor.isTerminated() && executor.isShutdown() : executor; + }); + } + + private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable runnable) + { + try + { + runnable.run(); + } + catch (Throwable t) + { + return Throwables.merge(existing, t); + } + + return existing; + } + + private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable ... runnables) + { + for (ThrowingRunnable runnable : runnables) + { + try + { + runnable.run(); + } + catch (Throwable t) + { + existing = Throwables.merge(existing, t); + } + } + return existing; + } + + public static interface ThrowingRunnable + { + public void run() throws Throwable; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java new file mode 100644 index 0000000..6349d5a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import com.google.common.base.Predicate; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.Pair; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.HashSet; +import java.util.Set; +import java.util.function.IntFunction; + +public class InstanceClassLoader extends URLClassLoader +{ + // Classes that have to be shared between instances, for configuration or returning values + private final static Class<?>[] commonClasses = new Class[] + { + Pair.class, + InstanceConfig.class, + Message.class, + InetAddressAndPort.class, + InvokableInstance.SerializableCallable.class, + InvokableInstance.SerializableRunnable.class, + InvokableInstance.SerializableConsumer.class, + InvokableInstance.SerializableBiConsumer.class, + InvokableInstance.SerializableFunction.class, + InvokableInstance.SerializableBiFunction.class, + InvokableInstance.SerializableTriFunction.class, + InvokableInstance.InstanceFunction.class + }; + + private final int id; // for debug purposes + private final ClassLoader commonClassLoader; + private final Predicate<String> isCommonClassName; + + InstanceClassLoader(int id, URL[] urls, Predicate<String> isCommonClassName, ClassLoader commonClassLoader) + { + super(urls, null); + this.id = id; + this.commonClassLoader = commonClassLoader; + this.isCommonClassName = isCommonClassName; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException + { + // Do not share: + // * yaml, which is a rare exception because it does mess with loading org.cassandra...Config class instances + // * most of the rest of Cassandra classes (unless they were explicitly shared) g + if (name.startsWith("org.slf4j") || + name.startsWith("ch.qos.logback") || + name.startsWith("org.yaml") || + (name.startsWith("org.apache.cassandra") && !isCommonClassName.test(name))) + return loadClassInternal(name); + + return commonClassLoader.loadClass(name); + } + + Class<?> loadClassInternal(String name) throws ClassNotFoundException + { + synchronized (getClassLoadingLock(name)) + { + // First, check if the class has already been loaded + Class<?> c = findLoadedClass(name); + + if (c == null) + c = findClass(name); + + return c; + } + } + + public static IntFunction<ClassLoader> createFactory(URLClassLoader contextClassLoader) + { + Set<String> commonClassNames = new HashSet<>(); + for (Class<?> k : commonClasses) + commonClassNames.add(k.getName()); + + URL[] urls = contextClassLoader.getURLs(); + return id -> new InstanceClassLoader(id, urls, commonClassNames::contains, contextClassLoader); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java new file mode 100644 index 0000000..49c2e1f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import java.io.File; +import java.io.Serializable; +import java.util.UUID; + +public class InstanceConfig implements Serializable +{ + public final int num; + public final UUID hostId =java.util.UUID.randomUUID(); + public final String partitioner = "org.apache.cassandra.dht.Murmur3Partitioner"; + public final String broadcast_address; + public final String listen_address; + public final String broadcast_rpc_address; + public final String rpc_address; + public final String saved_caches_directory; + public final String[] data_file_directories; + public final String commitlog_directory; + public final String hints_directory; + public final String cdc_directory; + public final int concurrent_writes = 2; + public final int concurrent_counter_writes = 2; + public final int concurrent_materialized_view_writes = 2; + public final int concurrent_reads = 2; + public final int memtable_flush_writers = 1; + public final int concurrent_compactors = 1; + public final int memtable_heap_space_in_mb = 10; + public final String initial_token; + + private InstanceConfig(int num, + String broadcast_address, + String listen_address, + String broadcast_rpc_address, + String rpc_address, + String saved_caches_directory, + String[] data_file_directories, + String commitlog_directory, + String hints_directory, + String cdc_directory, + String initial_token) + { + this.num = num; + this.broadcast_address = broadcast_address; + this.listen_address = listen_address; + this.broadcast_rpc_address = broadcast_rpc_address; + this.rpc_address = rpc_address; + this.saved_caches_directory = saved_caches_directory; + this.data_file_directories = data_file_directories; + this.commitlog_directory = commitlog_directory; + this.hints_directory = hints_directory; + this.cdc_directory = cdc_directory; + this.initial_token = initial_token; + } + + public static InstanceConfig generate(int nodeNum, File root, String token) + { + return new InstanceConfig(nodeNum, + "127.0.0." + nodeNum, + "127.0.0." + nodeNum, + "127.0.0." + nodeNum, + "127.0.0." + nodeNum, + String.format("%s/node%d/saved_caches", root, nodeNum), + new String[] { String.format("%s/node%d/data", root, nodeNum) }, + String.format("%s/node%d/commitlog", root, nodeNum), + String.format("%s/node%d/hints", root, nodeNum), + String.format("%s/node%d/cdc", root, nodeNum), + token); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java new file mode 100644 index 0000000..1167748 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import ch.qos.logback.core.PropertyDefinerBase; + +/** + * Used by logback to find/define property value, see logback-dtest.xml + */ +public class InstanceIDDefiner extends PropertyDefinerBase +{ + // Instantiated per classloader, set by Instance + public static int instanceId = -1; + + public String getPropertyValue() + { + if (instanceId == -1) + return "<main>"; + else + return "INSTANCE" + instanceId; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java new file mode 100644 index 0000000..f646ae1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public abstract class InvokableInstance +{ + private final ClassLoader classLoader; + private final Method deserializeOnInstance; + + public InvokableInstance(ClassLoader classLoader) + { + this.classLoader = classLoader; + try + { + this.deserializeOnInstance = classLoader.loadClass(InvokableInstance.class.getName()).getDeclaredMethod("deserializeOneObject", byte[].class); + } + catch (ClassNotFoundException | NoSuchMethodException e) + { + throw new RuntimeException(e); + } + } + + public interface SerializableCallable<T> extends Callable<T>, Serializable { public T call(); } + public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T> call) { return (SerializableCallable<T>) transferOneObject(call); } + public <T> T callOnInstance(SerializableCallable<T> call) { return callsOnInstance(call).call(); } + + public interface SerializableRunnable extends Runnable, Serializable {} + public SerializableRunnable runsOnInstance(SerializableRunnable run) { return (SerializableRunnable) transferOneObject(run); } + public void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); } + + public interface SerializableConsumer<T> extends Consumer<T>, Serializable {} + public <T> SerializableConsumer<T> acceptsOnInstance(SerializableConsumer<T> consumer) { return (SerializableConsumer<T>) transferOneObject(consumer); } + + public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1, T2>, Serializable {} + public <T1, T2> SerializableBiConsumer<T1, T2> acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return (SerializableBiConsumer<T1, T2>) transferOneObject(consumer); } + + public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {} + public <I, O> SerializableFunction<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return (SerializableFunction<I, O>) transferOneObject(f); } + + public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {} + public <I1, I2, O> SerializableBiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return (SerializableBiFunction<I1, I2, O>) transferOneObject(f); } + + public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable + { + O apply(I1 i1, I2 i2, I3 i3); + } + public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return (SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); } + + public interface InstanceFunction<I, O> extends SerializableBiFunction<Instance, I, O> {} + + // E must be a functional interface, and lambda must be implemented by a lambda function + public <E extends Serializable> E invokesOnInstance(E lambda) + { + return (E) transferOneObject(lambda); + } + + public Object transferOneObject(Object object) + { + byte[] bytes = serializeOneObject(object); + try + { + Object onInstance = deserializeOnInstance.invoke(null, bytes); + if (onInstance.getClass().getClassLoader() != classLoader) + throw new IllegalStateException(onInstance + " seemingly from wrong class loader: " + onInstance.getClass().getClassLoader() + ", but expected " + classLoader); + + return onInstance; + } + catch (IllegalAccessException | InvocationTargetException e) + { + throw new RuntimeException(e); + } + } + + private byte[] serializeOneObject(Object object) + { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) + { + oos.writeObject(object); + oos.close(); + return baos.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unused") // called through method invocation + public static Object deserializeOneObject(byte[] bytes) + { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais);) + { + return ois.readObject(); + } + catch (IOException | ClassNotFoundException e) + { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Message.java ---------------------------------------------------------------------- diff --git a/test/distributed/org/apache/cassandra/distributed/Message.java b/test/distributed/org/apache/cassandra/distributed/Message.java new file mode 100644 index 0000000..b5492a2 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/Message.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed; + +import org.apache.cassandra.locator.InetAddressAndPort; + +// a container for simplifying the method signature for per-instance message handling/delivery +public class Message +{ + public final int verb; + public final byte[] bytes; + public final int id; + public final int version; + public final InetAddressAndPort from; + + public Message(int verb, byte[] bytes, int id, int version, InetAddressAndPort from) + { + this.verb = verb; + this.bytes = bytes; + this.id = id; + this.version = version; + this.from = from; + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org