http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 630dc5d..439d8cf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.*;
@@ -26,9 +25,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.CacheLoader;
@@ -123,16 +119,7 @@ public class StorageProxy implements StorageProxyMBean
 
     static
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
         HintsService.instance.registerMBean();
         HintedHandOffManager.instance.registerMBean();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index e7ca4be..68dba93 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -231,7 +231,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = 
new CopyOnWriteArrayList<>();
 
-    private final ObjectName jmxObjectName;
+    private final String jmxObjectName;
 
     private Collection<Token> bootstrapTokens = null;
 
@@ -270,17 +270,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // use dedicated executor for handling JMX notifications
         super(JMXBroadcastExecutor.executor);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            jmxObjectName = new 
ObjectName("org.apache.cassandra.db:type=StorageService");
-            mbs.registerMBean(this, jmxObjectName);
-            mbs.registerMBean(StreamManager.instance, new 
ObjectName(StreamManager.OBJECT_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        jmxObjectName = "org.apache.cassandra.db:type=StorageService";
+        MBeanWrapper.instance.registerMBean(this, jmxObjectName);
+        MBeanWrapper.instance.registerMBean(StreamManager.instance, 
StreamManager.OBJECT_NAME);
 
         ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 151eb18..50d35bc 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -471,6 +471,32 @@ public class ByteBufferUtil
         return bytes.getDouble(bytes.position());
     }
 
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return ByteBufferUtil.bytes((int) obj);
+        else if (obj instanceof Byte)
+            return ByteBufferUtil.bytes((byte) obj);
+        else if (obj instanceof Short)
+            return ByteBufferUtil.bytes((short) obj);
+        else if (obj instanceof Long)
+            return ByteBufferUtil.bytes((long) obj);
+        else if (obj instanceof Float)
+            return ByteBufferUtil.bytes((float) obj);
+        else if (obj instanceof Double)
+            return ByteBufferUtil.bytes((double) obj);
+        else if (obj instanceof UUID)
+            return ByteBufferUtil.bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return ByteBufferUtil.bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return ByteBufferUtil.bytes((String) obj);
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert 
value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
     public static ByteBuffer bytes(byte b)
     {
         return ByteBuffer.allocate(1).put(0, b);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/MBeanWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java 
b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
new file mode 100644
index 0000000..edee6af
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.management.ManagementFactory;
+import java.util.function.Consumer;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to avoid catching and rethrowing checked exceptions on MBean 
and
+ * allow turning of MBean registration for test purposes.
+ */
+public interface MBeanWrapper
+{
+    static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+
+    static final MBeanWrapper instance = 
Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ?
+                                         new NoOpMBeanWrapper() :
+                                         new PlatformMBeanWrapper();
+
+    // Passing true for graceful will log exceptions instead of rethrowing them
+    public void registerMBean(Object obj, ObjectName mbeanName, OnException 
onException);
+    default void registerMBean(Object obj, ObjectName mbeanName)
+    {
+        registerMBean(obj, mbeanName, OnException.THROW);
+    }
+
+    public void registerMBean(Object obj, String mbeanName, OnException 
onException);
+    default void registerMBean(Object obj, String mbeanName)
+    {
+        registerMBean(obj, mbeanName, OnException.THROW);
+    }
+
+    public boolean isRegistered(ObjectName mbeanName, OnException onException);
+    default boolean isRegistered(ObjectName mbeanName)
+    {
+        return isRegistered(mbeanName, OnException.THROW);
+    }
+
+    public boolean isRegistered(String mbeanName, OnException onException);
+    default boolean isRegistered(String mbeanName)
+    {
+        return isRegistered(mbeanName, OnException.THROW);
+    }
+
+    public void unregisterMBean(ObjectName mbeanName, OnException onException);
+    default void unregisterMBean(ObjectName mbeanName)
+    {
+        unregisterMBean(mbeanName, OnException.THROW);
+    }
+
+    public void unregisterMBean(String mbeanName, OnException onException);
+    default void unregisterMBean(String mbeanName)
+    {
+        unregisterMBean(mbeanName, OnException.THROW);
+    }
+
+    static class NoOpMBeanWrapper implements MBeanWrapper
+    {
+        public void registerMBean(Object obj, ObjectName mbeanName, 
OnException onException) {}
+        public void registerMBean(Object obj, String mbeanName, OnException 
onException) {}
+        public boolean isRegistered(ObjectName mbeanName, OnException 
onException) { return false; }
+        public boolean isRegistered(String mbeanName, OnException onException) 
{ return false; }
+        public void unregisterMBean(ObjectName mbeanName, OnException 
onException) {}
+        public void unregisterMBean(String mbeanName, OnException onException) 
{}
+    }
+
+    static class PlatformMBeanWrapper implements MBeanWrapper
+    {
+        private final MBeanServer mbs = 
ManagementFactory.getPlatformMBeanServer();
+        public void registerMBean(Object obj, ObjectName mbeanName, 
OnException onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public void registerMBean(Object obj, String mbeanName, OnException 
onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public boolean isRegistered(ObjectName mbeanName, OnException 
onException)
+        {
+            try
+            {
+                return mbs.isRegistered(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public boolean isRegistered(String mbeanName, OnException onException)
+        {
+            try
+            {
+                return mbs.isRegistered(new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public void unregisterMBean(ObjectName mbeanName, OnException 
onException)
+        {
+            try
+            {
+                mbs.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public void unregisterMBean(String mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.unregisterMBean(new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+    }
+
+    public enum OnException
+    {
+        THROW(e -> { throw new RuntimeException(e); }),
+        LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
+        IGNORE(e -> {});
+
+        private Consumer<Exception> handler;
+        OnException(Consumer<Exception> handler)
+        {
+            this.handler = handler;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/Mx4jTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java 
b/src/java/org/apache/cassandra/utils/Mx4jTool.java
index 5baaea2..eda6354 100644
--- a/src/java/org/apache/cassandra/utils/Mx4jTool.java
+++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,7 +42,7 @@ public class Mx4jTool
         try
         {
             logger.trace("Will try to load mx4j now, if it's in the 
classpath");
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            MBeanWrapper mbs = MBeanWrapper.instance;
             ObjectName processorName = new 
ObjectName("Server:name=XSLTProcessor");
 
             Class<?> httpAdaptorClass = 
Class.forName("mx4j.tools.adaptor.http.HttpAdaptor");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java 
b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index b69e6bf..1a17a1f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -31,6 +31,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -354,11 +355,10 @@ public final class Ref<T> implements RefCounted<T>
     static final Set<Class<?>> concurrentIterables = 
Collections.newSetFromMap(new IdentityHashMap<>());
     private static final Set<GlobalState> globallyExtant = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     static final ReferenceQueue<Object> referenceQueue = new 
ReferenceQueue<>();
-    private static final ExecutorService EXEC = 
Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+    private static final InfiniteLoopExecutor EXEC = new 
InfiniteLoopExecutor("Reference-Reaper", Ref::reapOneReference).start();
     static final ScheduledExecutorService STRONG_LEAK_DETECTOR = 
!DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Strong-Reference-Leak-Detector"));
     static
     {
-        EXEC.execute(new ReferenceReaper());
         if (DEBUG_ENABLED)
         {
             STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, 
TimeUnit.MINUTES);
@@ -367,28 +367,12 @@ public final class Ref<T> implements RefCounted<T>
         concurrentIterables.addAll(Arrays.asList(concurrentIterableClasses));
     }
 
-    static final class ReferenceReaper implements Runnable
+    private static void reapOneReference() throws InterruptedException
     {
-        public void run()
+        Object obj = referenceQueue.remove(100);
+        if (obj instanceof Ref.State)
         {
-            try
-            {
-                while (true)
-                {
-                    Object obj = referenceQueue.remove();
-                    if (obj instanceof Ref.State)
-                    {
-                        ((Ref.State) obj).release(true);
-                    }
-                }
-            }
-            catch (InterruptedException e)
-            {
-            }
-            finally
-            {
-                EXEC.execute(this);
-            }
+            ((Ref.State) obj).release(true);
         }
     }
 
@@ -719,4 +703,11 @@ public final class Ref<T> implements RefCounted<T>
             }
         }
     }
+
+    @VisibleForTesting
+    public static void shutdownReferenceReaper() throws InterruptedException
+    {
+        EXEC.shutdown();
+        EXEC.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java 
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index f9ec40c..c8ad078 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -27,11 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
@@ -493,34 +493,16 @@ public class BufferPool
     private static final ConcurrentLinkedQueue<LocalPoolRef> 
localPoolReferences = new ConcurrentLinkedQueue<>();
 
     private static final ReferenceQueue<Object> localPoolRefQueue = new 
ReferenceQueue<>();
-    private static final ExecutorService EXEC = 
Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner"));
-    static
+    private static final InfiniteLoopExecutor EXEC = new 
InfiniteLoopExecutor("LocalPool-Cleaner", 
BufferPool::cleanupOneReference).start();
+
+    private static void cleanupOneReference() throws InterruptedException
     {
-        EXEC.execute(new Runnable()
+        Object obj = localPoolRefQueue.remove(100);
+        if (obj instanceof LocalPoolRef)
         {
-            public void run()
-            {
-                try
-                {
-                    while (true)
-                    {
-                        Object obj = localPoolRefQueue.remove();
-                        if (obj instanceof LocalPoolRef)
-                        {
-                            ((LocalPoolRef) obj).release();
-                            localPoolReferences.remove(obj);
-                        }
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                }
-                finally
-                {
-                    EXEC.execute(this);
-                }
-            }
-        });
+            ((LocalPoolRef) obj).release();
+            localPoolReferences.remove(obj);
+        }
     }
 
     private static ByteBuffer allocateDirectAligned(int capacity)
@@ -872,4 +854,11 @@ public class BufferPool
         int mask = unit - 1;
         return (size + mask) & ~mask;
     }
+
+    @VisibleForTesting
+    public static void shutdownLocalCleaner() throws InterruptedException
+    {
+        EXEC.shutdown();
+        EXEC.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java 
b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index 5a90463..b905d2c 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -18,57 +18,70 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 /**
  * A thread that reclaims memory from a MemtablePool on demand.  The actual 
reclaiming work is delegated to the
  * cleaner Runnable, e.g., FlushLargestColumnFamily
  */
-class MemtableCleanerThread<P extends MemtablePool> extends Thread
+public class MemtableCleanerThread<P extends MemtablePool> extends 
InfiniteLoopExecutor
 {
-    /** The pool we're cleaning */
-    final P pool;
-
-    /** should ensure that at least some memory has been marked reclaiming 
after completion */
-    final Runnable cleaner;
+    private static class Clean<P extends MemtablePool> implements 
InterruptibleRunnable
+    {
+        /** The pool we're cleaning */
+        final P pool;
 
-    /** signalled whenever needsCleaning() may return true */
-    final WaitQueue wait = new WaitQueue();
+        /** should ensure that at least some memory has been marked reclaiming 
after completion */
+        final Runnable cleaner;
 
-    MemtableCleanerThread(P pool, Runnable cleaner)
-    {
-        super(pool.getClass().getSimpleName() + "Cleaner");
-        this.pool = pool;
-        this.cleaner = cleaner;
-        setDaemon(true);
-    }
+        /** signalled whenever needsCleaning() may return true */
+        final WaitQueue wait = new WaitQueue();
 
-    boolean needsCleaning()
-    {
-        return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
-    }
+        private Clean(P pool, Runnable cleaner)
+        {
+            this.pool = pool;
+            this.cleaner = cleaner;
+        }
 
-    // should ONLY be called when we really think it already needs cleaning
-    void trigger()
-    {
-        wait.signal();
-    }
+        boolean needsCleaning()
+        {
+            return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
+        }
 
-    @Override
-    public void run()
-    {
-        while (true)
+        @Override
+        public void run() throws InterruptedException
         {
-            while (!needsCleaning())
+            if (needsCleaning())
+            {
+                cleaner.run();
+            }
+            else
             {
                 final WaitQueue.Signal signal = wait.register();
                 if (!needsCleaning())
-                    signal.awaitUninterruptibly();
+                    signal.await();
                 else
                     signal.cancel();
             }
-
-            cleaner.run();
         }
     }
+
+    private final Runnable trigger;
+    private MemtableCleanerThread(Clean<P> clean)
+    {
+        super(clean.pool.getClass().getSimpleName() + "Cleaner", clean);
+        this.trigger = clean.wait::signal;
+    }
+
+    MemtableCleanerThread(P pool, Runnable cleaner)
+    {
+        this(new Clean<>(pool, cleaner));
+    }
+
+    // should ONLY be called when we really think it already needs cleaning
+    public void trigger()
+    {
+        trigger.run();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java 
b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index c082856..684db93 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -18,8 +18,11 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -63,6 +66,12 @@ public abstract class MemtablePool
         return cleaner == null ? null : new MemtableCleanerThread<>(this, 
cleaner);
     }
 
+    public void shutdown() throws InterruptedException
+    {
+        cleaner.shutdown();
+        cleaner.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public abstract MemtableAllocator newAllocator();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/conf/logback-dtest.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..b8019f6
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,79 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+  <define name="instance_id" 
class="org.apache.cassandra.distributed.InstanceIDDefiner" />
+
+  <!-- Shutdown hook ensures that async appender flushes -->
+  <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+  <!-- Status listener is used to wrap stdout/stderr and tee to log file -->
+  <statusListener class="org.apache.cassandra.LogbackStatusListener" />
+
+  <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+    <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+    <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      
<fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>20</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>20MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} 
%msg%n</pattern>
+      <immediateFlush>false</immediateFlush>
+    </encoder>
+  </appender>
+
+  <appender name="STDOUT" target="System.out" 
class="org.apache.cassandra.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>DEBUG</level>
+    </filter>
+  </appender>
+
+  <appender name="TEE" class="org.apache.cassandra.TeeingAppender">
+      <appender-ref ref="FILE"/>
+      <appender-ref ref="STDOUT"/>
+  </appender>
+
+  <logger name="org.apache.hadoop" level="WARN"/>
+
+  <logger name="org.apache.cassandra.db.monitoring" level="DEBUG"/>
+
+  <!-- Do not change the name of this appender. LogbackStatusListener uses the 
thread name
+       tied to the appender name to know when to write to real stdout/stderr 
vs forwarding to logback -->
+  <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+      <discardingThreshold>0</discardingThreshold>
+      <maxFlushTime>0</maxFlushTime>
+      <queueSize>1024</queueSize>
+      <appender-ref ref="TEE"/>
+      <includeCallerData>true</includeCallerData>
+  </appender>
+
+  <root level="DEBUG">
+    <appender-ref ref="ASYNC" />
+  </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Coordinator.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Coordinator.java 
b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
new file mode 100644
index 0000000..91ab480
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class Coordinator
+{
+    final Instance instance;
+
+    public Coordinator(Instance instance)
+    {
+        this.instance = instance;
+    }
+
+    private static Object[][] coordinatorExecute(String query, int 
consistencyLevel, Object[] bindings)
+    {
+        CQLStatement prepared = QueryProcessor.getStatement(query, 
ClientState.forInternalCalls());
+        List<ByteBuffer> boundValues = new ArrayList<>();
+        for (Object binding : bindings)
+        {
+            boundValues.add(ByteBufferUtil.objectToBytes(binding));
+        }
+
+        ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+                                             
QueryOptions.create(ConsistencyLevel.fromCode(consistencyLevel),
+                                                                 boundValues,
+                                                                 false,
+                                                                 10,
+                                                                 null,
+                                                                 null,
+                                                                 
ProtocolVersion.V4,
+                                                                 null),
+                                             System.nanoTime());
+
+        if (res != null && res.kind == ResultMessage.Kind.ROWS)
+        {
+            return RowUtil.toObjects((ResultMessage.Rows) res);
+        }
+        else
+        {
+            return new Object[][]{};
+        }
+    }
+
+    public Object[][] execute(String query, ConsistencyLevel consistencyLevel, 
Object... boundValues)
+    {
+        return 
instance.appliesOnInstance(Coordinator::coordinatorExecute).apply(query, 
consistencyLevel.code, boundValues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
 
b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
new file mode 100644
index 0000000..a61c8af
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR;
+
+public class DistributedReadWritePathTest extends DistributedTestBase
+{
+    @Test
+    public void coordinatorRead() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 2, 2)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 3, 3)");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?",
+                                                     ConsistencyLevel.ALL,
+                                                     1),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+        }
+    }
+
+    @Test
+    public void coordinatorWrite() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)",
+                                          ConsistencyLevel.QUORUM);
+
+            for (int i = 0; i < 3; i++)
+            {
+                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
+                           row(1, 1, 1));
+            }
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void failingReadRepairTest() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"));
+
+            cluster.verbs(READ_REPAIR).to(3).drop();
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Data was not repaired
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"));
+        }
+    }
+
+    @Test
+    public void writeWithSchemaDisagreement() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                              ConsistencyLevel.QUORUM);
+            }
+            catch (RuntimeException e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage().contains("Exception occurred 
on the node"));
+            Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown 
column v2 during deserialization"));
+        }
+    }
+
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                         ConsistencyLevel.ALL),
+                           row(1, 1, 1, null));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+            Assert.assertTrue(thrown.getMessage().contains("Exception occurred 
on the node"));
+            Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown 
column v2 during deserialization"));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStatic() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1),
+                       row(1, 2),
+                       row(1, 3));
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int 
static");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, null),
+                       row(1, 2, null),
+                       row(1, 3, null));
+
+            cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, v1) VALUES (?, ?)",
+                                          ConsistencyLevel.ALL,
+                                          1, 1);
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 1),
+                       row(1, 3, 1));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStaticDisagreementCoordinatorSide() throws 
Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 
1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1",
+                                              ConsistencyLevel.ALL);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is 
not a subset of"));
+
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int 
static", 1);
+
+            try
+            {
+                cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + 
".tbl WHERE pk = 1",
+                                              ConsistencyLevel.ALL);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is 
not a subset of"));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStaticDisagreementReplicaSide() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(2))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column on the replica
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 
2);
+
+            // Columns are going to be read and read-repaired as long as 
they're available
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+
+            assertRows(cluster.get(2).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1),
+                       row(1, 2),
+                       row(1, 3));
+
+            // Re-add as static on the replica
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int 
static", 2);
+
+            // Try reading
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+
+            // Make sure read-repair did not corrupt the data
+            assertRows(cluster.get(2).executeInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, null),
+                       row(1, 2, null),
+                       row(1, 3, null));
+
+            // Writing to the replica with disagreeing schema should not work
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, 1, 5);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertNotNull(thrown);
+
+            thrown = null;
+
+            // If somehow replica got new data, reading that data should not 
be possible, either
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1) VALUES (?, ?, ?)",
+                                           1, 1, 100);
+
+            try
+            {
+                assertRows(cluster.coordinator().execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                         ConsistencyLevel.ALL),
+                           row(1, 1, 1),
+                           row(1, 2, 2),
+                           row(1, 3, 3));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertNotNull(thrown);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
new file mode 100644
index 0000000..f873fce
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public class DistributedTestBase
+{
+    static String KEYSPACE = "distributed_test_keyspace";
+
+    @BeforeClass
+    public static void setup()
+    {
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", 
"true");
+    }
+
+    TestCluster createCluster(int nodeCount) throws Throwable
+    {
+        TestCluster cluster = TestCluster.create(nodeCount);
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + nodeCount + 
"};");
+
+        return cluster;
+    }
+
+    public static void assertRows(Object[][] actual, Object[]... expected)
+    {
+        Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
+                            expected.length, actual.length);
+
+        for (int i = 0; i < expected.length; i++)
+        {
+            Object[] expectedRow = expected[i];
+            Object[] actualRow = actual[i];
+            Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
+                              Arrays.equals(expectedRow, actualRow));
+        }
+    }
+
+    public static String rowsNotEqualErrorMessage(Object[][] actual, 
Object[][] expected)
+    {
+        return String.format("Expected: %s\nActual:%s\n",
+                             rowsToString(expected),
+                             rowsToString(actual));
+    }
+
+    public static String rowsToString(Object[][] rows)
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        boolean isFirst = true;
+        for (Object[] row : rows)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                builder.append(",");
+            builder.append(Arrays.toString(row));
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    public static Object[] row(Object... expected)
+    {
+        return expected;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Instance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/Instance.java
new file mode 100644
index 0000000..f9ee5bb
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.SharedExecutorPool;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageDeliveryTask;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.MessageInHandler;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public class Instance extends InvokableInstance
+{
+    public final InstanceConfig config;
+
+    public Instance(InstanceConfig config, ClassLoader classLoader)
+    {
+        super(classLoader);
+        this.config = config;
+    }
+
+    public InetAddressAndPort getBroadcastAddress() { return 
callOnInstance(FBUtilities::getBroadcastAddressAndPort); }
+
+    public Object[][] executeInternal(String query, Object... args)
+    {
+        return callOnInstance(() ->
+        {
+            QueryHandler.Prepared prepared = 
QueryProcessor.prepareInternal(query);
+            ResultMessage result = 
prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+                    QueryProcessor.makeInternalOptions(prepared.statement, 
args));
+
+            if (result instanceof ResultMessage.Rows)
+                return RowUtil.toObjects((ResultMessage.Rows)result);
+            else
+                return null;
+        });
+    }
+
+    public UUID getSchemaVersion()
+    {
+        // we do not use method reference syntax here, because we need to 
invoke on the node-local schema instance
+        //noinspection Convert2MethodRef
+        return callOnInstance(() -> Schema.instance.getVersion());
+    }
+
+    public void schemaChange(String query)
+    {
+        runOnInstance(() ->
+        {
+            try
+            {
+                ClientState state = 
ClientState.forInternalCalls(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+                QueryState queryState = new QueryState(state);
+
+                CQLStatement statement = QueryProcessor.parseStatement(query, 
queryState.getClientState());
+                statement.validate(state);
+
+                QueryOptions options = 
QueryOptions.forInternalCalls(Collections.emptyList());
+                statement.executeLocally(queryState, options);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Error setting schema for test 
(query was: " + query + ")", e);
+            }
+        });
+    }
+
+    private void registerMockMessaging(TestCluster cluster)
+    {
+        BiConsumer<InetAddressAndPort, Message> deliverToInstance = (to, 
message) -> cluster.get(to).receiveMessage(message);
+        BiConsumer<InetAddressAndPort, Message> deliverToInstanceIfNotFiltered 
= cluster.filters().filter(deliverToInstance);
+
+        acceptsOnInstance((BiConsumer<InetAddressAndPort, Message> deliver) ->
+                MessagingService.instance().addMessageSink(new 
MessageDeliverySink(deliver))
+        ).accept(deliverToInstanceIfNotFiltered);
+    }
+
+    private static class MessageDeliverySink implements IMessageSink
+    {
+        private final BiConsumer<InetAddressAndPort, Message> deliver;
+        MessageDeliverySink(BiConsumer<InetAddressAndPort, Message> deliver)
+        {
+            this.deliver = deliver;
+        }
+
+        public boolean allowOutgoingMessage(MessageOut messageOut, int id, 
InetAddressAndPort to)
+        {
+            try (DataOutputBuffer out = new DataOutputBuffer(1024))
+            {
+                InetAddressAndPort from = 
FBUtilities.getBroadcastAddressAndPort();
+                messageOut.serialize(out, MessagingService.current_version);
+                deliver.accept(to, new Message(messageOut.verb.getId(), 
out.toByteArray(), id, MessagingService.current_version, from));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+
+        public boolean allowIncomingMessage(MessageIn message, int id)
+        {
+            // we can filter to our heart's content on the outgoing message; 
no need to worry about incoming
+            return true;
+        }
+    }
+
+    private void receiveMessage(Message message)
+    {
+        acceptsOnInstance((Message m) ->
+        {
+            try (DataInputBuffer in = new DataInputBuffer(m.bytes))
+            {
+                MessageIn<?> messageIn = MessageInHandler.deserialize(in, 
m.id, m.version, m.from);
+                Runnable deliver = new MessageDeliveryTask(messageIn, m.id);
+                deliver.run();
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException("Exception occurred on the node " + 
FBUtilities.getBroadcastAddressAndPort(), t);
+            }
+
+        }).accept(message);
+    }
+
+    void launch(TestCluster cluster)
+    {
+        try
+        {
+            mkdirs();
+            int id = config.num;
+            runOnInstance(() -> InstanceIDDefiner.instanceId = id); // for 
logging
+
+            startup();
+            initializeRing(cluster);
+            registerMockMessaging(cluster);
+        }
+        catch (Throwable t)
+        {
+            if (t instanceof RuntimeException)
+                throw (RuntimeException) t;
+            throw new RuntimeException(t);
+        }
+    }
+
+    private void mkdirs()
+    {
+        new File(config.saved_caches_directory).mkdirs();
+        new File(config.hints_directory).mkdirs();
+        new File(config.commitlog_directory).mkdirs();
+        for (String dir : config.data_file_directories)
+            new File(dir).mkdirs();
+    }
+
+    private void startup()
+    {
+        acceptsOnInstance((InstanceConfig config) ->
+        {
+            DatabaseDescriptor.daemonInitialization(() -> loadConfig(config));
+
+            DatabaseDescriptor.createAllDirectories();
+            Keyspace.setInitialized();
+            SystemKeyspace.persistLocalMetadata();
+        }).accept(config);
+    }
+
+
+    public static Config loadConfig(InstanceConfig overrides)
+    {
+        Config config = new Config();
+        // Defaults
+        config.commitlog_sync = Config.CommitLogSync.batch;
+        config.endpoint_snitch = SimpleSnitch.class.getName();
+        config.seed_provider = new 
ParameterizedClass(SimpleSeedProvider.class.getName(),
+                                                      
Collections.singletonMap("seeds", "127.0.0.1:7010"));
+        config.diagnostic_events_enabled = true; // necessary for schema 
change monitoring
+
+        // Overrides
+        config.partitioner = overrides.partitioner;
+        config.broadcast_address = overrides.broadcast_address;
+        config.listen_address = overrides.listen_address;
+        config.broadcast_rpc_address = overrides.broadcast_rpc_address;
+        config.rpc_address = overrides.rpc_address;
+        config.saved_caches_directory = overrides.saved_caches_directory;
+        config.data_file_directories = overrides.data_file_directories;
+        config.commitlog_directory = overrides.commitlog_directory;
+        config.hints_directory = overrides.hints_directory;
+        config.cdc_raw_directory = overrides.cdc_directory;
+        config.concurrent_writes = overrides.concurrent_writes;
+        config.concurrent_counter_writes = overrides.concurrent_counter_writes;
+        config.concurrent_materialized_view_writes = 
overrides.concurrent_materialized_view_writes;
+        config.concurrent_reads = overrides.concurrent_reads;
+        config.memtable_flush_writers = overrides.memtable_flush_writers;
+        config.concurrent_compactors = overrides.concurrent_compactors;
+        config.memtable_heap_space_in_mb = overrides.memtable_heap_space_in_mb;
+        config.initial_token = overrides.initial_token;
+        return config;
+    }
+
+    private void initializeRing(TestCluster cluster)
+    {
+        // This should be done outside instance in order to avoid serializing 
config
+        String partitionerName = config.partitioner;
+        List<String> initialTokens = new ArrayList<>();
+        List<InetAddressAndPort> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+        for (int i = 1 ; i <= cluster.size() ; ++i)
+        {
+            InstanceConfig config = cluster.get(i).config;
+            initialTokens.add(config.initial_token);
+            try
+            {
+                
hosts.add(InetAddressAndPort.getByName(config.broadcast_address));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+            hostIds.add(config.hostId);
+        }
+
+        runOnInstance(() ->
+        {
+            try
+            {
+                IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerName);
+                StorageService storageService = StorageService.instance;
+                List<Token> tokens = new ArrayList<>();
+                for (String token : initialTokens)
+                    
tokens.add(partitioner.getTokenFactory().fromString(token));
+
+                for (int i = 0; i < tokens.size(); i++)
+                {
+                    InetAddressAndPort ep = hosts.get(i);
+                    Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 
1);
+                    Gossiper.instance.injectApplicationState(ep,
+                            ApplicationState.TOKENS,
+                            new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
+                    storageService.onChange(ep,
+                            ApplicationState.STATUS_WITH_PORT,
+                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+                    storageService.onChange(ep,
+                            ApplicationState.STATUS,
+                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+                    Gossiper.instance.realMarkAlive(ep, 
Gossiper.instance.getEndpointStateForEndpoint(ep));
+                    MessagingService.instance().setVersion(ep, 
MessagingService.current_version);
+                }
+
+                // check that all nodes are in token metadata
+                for (int i = 0; i < tokens.size(); ++i)
+                    assert 
storageService.getTokenMetadata().isMember(hosts.get(i));
+            }
+            catch (Throwable e) // UnknownHostException
+            {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    void shutdown()
+    {
+        runOnInstance(() -> {
+            Throwable error = null;
+            error = runAndMergeThrowable(error,
+                    BatchlogManager.instance::shutdown,
+                    HintsService.instance::shutdownBlocking,
+                    CommitLog.instance::shutdownBlocking,
+                    CompactionManager.instance::forceShutdown,
+                    Gossiper.instance::stop,
+                    SecondaryIndexManager::shutdownExecutors,
+                    MessagingService.instance()::shutdown,
+                    ColumnFamilyStore::shutdownFlushExecutor,
+                    ColumnFamilyStore::shutdownPostFlushExecutor,
+                    ColumnFamilyStore::shutdownReclaimExecutor,
+                    ColumnFamilyStore::shutdownPerDiskFlushExecutors,
+                    PendingRangeCalculatorService.instance::shutdownExecutor,
+                    BufferPool::shutdownLocalCleaner,
+                    Ref::shutdownReferenceReaper,
+                    StageManager::shutdownAndWait,
+                    SharedExecutorPool::shutdownSharedPool,
+                    Memtable.MEMORY_POOL::shutdown,
+                    ScheduledExecutors::shutdownAndWait);
+            error = shutdownAndWait(error, 
ActiveRepairService.repairCommandExecutor);
+            Throwables.maybeFail(error);
+        });
+    }
+
+    private static Throwable shutdownAndWait(Throwable existing, 
ExecutorService executor)
+    {
+        return runAndMergeThrowable(existing, () -> {
+            executor.shutdownNow();
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+            assert executor.isTerminated() && executor.isShutdown() : executor;
+        });
+    }
+
+    private static Throwable runAndMergeThrowable(Throwable existing, 
ThrowingRunnable runnable)
+    {
+        try
+        {
+            runnable.run();
+        }
+        catch (Throwable t)
+        {
+            return Throwables.merge(existing, t);
+        }
+
+        return existing;
+    }
+
+    private static Throwable runAndMergeThrowable(Throwable existing, 
ThrowingRunnable ... runnables)
+    {
+        for (ThrowingRunnable runnable : runnables)
+        {
+            try
+            {
+                runnable.run();
+            }
+            catch (Throwable t)
+            {
+                existing = Throwables.merge(existing, t);
+            }
+        }
+        return existing;
+    }
+
+    public static interface ThrowingRunnable
+    {
+        public void run() throws Throwable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java 
b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
new file mode 100644
index 0000000..6349d5a
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import com.google.common.base.Predicate;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.Pair;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.IntFunction;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+    // Classes that have to be shared between instances, for configuration or 
returning values
+    private final static Class<?>[] commonClasses = new Class[]
+            {
+                    Pair.class,
+                    InstanceConfig.class,
+                    Message.class,
+                    InetAddressAndPort.class,
+                    InvokableInstance.SerializableCallable.class,
+                    InvokableInstance.SerializableRunnable.class,
+                    InvokableInstance.SerializableConsumer.class,
+                    InvokableInstance.SerializableBiConsumer.class,
+                    InvokableInstance.SerializableFunction.class,
+                    InvokableInstance.SerializableBiFunction.class,
+                    InvokableInstance.SerializableTriFunction.class,
+                    InvokableInstance.InstanceFunction.class
+            };
+
+    private final int id; // for debug purposes
+    private final ClassLoader commonClassLoader;
+    private final Predicate<String> isCommonClassName;
+
+    InstanceClassLoader(int id, URL[] urls, Predicate<String> 
isCommonClassName, ClassLoader commonClassLoader)
+    {
+        super(urls, null);
+        this.id = id;
+        this.commonClassLoader = commonClassLoader;
+        this.isCommonClassName = isCommonClassName;
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException
+    {
+        // Do not share:
+        //  * yaml, which  is a rare exception because it does mess with 
loading org.cassandra...Config class instances
+        //  * most of the rest of Cassandra classes (unless they were 
explicitly shared) g
+        if (name.startsWith("org.slf4j") ||
+                name.startsWith("ch.qos.logback") ||
+                name.startsWith("org.yaml") ||
+                (name.startsWith("org.apache.cassandra") && 
!isCommonClassName.test(name)))
+            return loadClassInternal(name);
+
+        return commonClassLoader.loadClass(name);
+    }
+
+    Class<?> loadClassInternal(String name) throws ClassNotFoundException
+    {
+        synchronized (getClassLoadingLock(name))
+        {
+            // First, check if the class has already been loaded
+            Class<?> c = findLoadedClass(name);
+
+            if (c == null)
+                c = findClass(name);
+
+            return c;
+        }
+    }
+
+    public static IntFunction<ClassLoader> createFactory(URLClassLoader 
contextClassLoader)
+    {
+        Set<String> commonClassNames = new HashSet<>();
+        for (Class<?> k : commonClasses)
+            commonClassNames.add(k.getName());
+
+        URL[] urls = contextClassLoader.getURLs();
+        return id -> new InstanceClassLoader(id, urls, 
commonClassNames::contains, contextClassLoader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
new file mode 100644
index 0000000..49c2e1f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.UUID;
+
+public class InstanceConfig implements Serializable
+{
+    public final int num;
+    public final UUID hostId =java.util.UUID.randomUUID();
+    public final String partitioner = 
"org.apache.cassandra.dht.Murmur3Partitioner";
+    public final String broadcast_address;
+    public final String listen_address;
+    public final String broadcast_rpc_address;
+    public final String rpc_address;
+    public final String saved_caches_directory;
+    public final String[] data_file_directories;
+    public final String commitlog_directory;
+    public final String hints_directory;
+    public final String cdc_directory;
+    public final int concurrent_writes = 2;
+    public final int concurrent_counter_writes = 2;
+    public final int concurrent_materialized_view_writes = 2;
+    public final int concurrent_reads = 2;
+    public final int memtable_flush_writers = 1;
+    public final int concurrent_compactors = 1;
+    public final int memtable_heap_space_in_mb = 10;
+    public final String initial_token;
+
+    private InstanceConfig(int num,
+                           String broadcast_address,
+                           String listen_address,
+                           String broadcast_rpc_address,
+                           String rpc_address,
+                           String saved_caches_directory,
+                           String[] data_file_directories,
+                           String commitlog_directory,
+                           String hints_directory,
+                           String cdc_directory,
+                           String initial_token)
+    {
+        this.num = num;
+        this.broadcast_address = broadcast_address;
+        this.listen_address = listen_address;
+        this.broadcast_rpc_address = broadcast_rpc_address;
+        this.rpc_address = rpc_address;
+        this.saved_caches_directory = saved_caches_directory;
+        this.data_file_directories = data_file_directories;
+        this.commitlog_directory = commitlog_directory;
+        this.hints_directory = hints_directory;
+        this.cdc_directory = cdc_directory;
+        this.initial_token = initial_token;
+    }
+
+    public static InstanceConfig generate(int nodeNum, File root, String token)
+    {
+        return new InstanceConfig(nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  String.format("%s/node%d/saved_caches", 
root, nodeNum),
+                                  new String[] { 
String.format("%s/node%d/data", root, nodeNum) },
+                                  String.format("%s/node%d/commitlog", root, 
nodeNum),
+                                  String.format("%s/node%d/hints", root, 
nodeNum),
+                                  String.format("%s/node%d/cdc", root, 
nodeNum),
+                                  token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java 
b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
new file mode 100644
index 0000000..1167748
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import ch.qos.logback.core.PropertyDefinerBase;
+
+/**
+ * Used by logback to find/define property value, see logback-dtest.xml
+ */
+public class InstanceIDDefiner extends PropertyDefinerBase
+{
+    // Instantiated per classloader, set by Instance
+    public static int instanceId = -1;
+
+    public String getPropertyValue()
+    {
+        if (instanceId == -1)
+            return "<main>";
+        else
+            return "INSTANCE" + instanceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
----------------------------------------------------------------------
diff --git 
a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java 
b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
new file mode 100644
index 0000000..f646ae1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class InvokableInstance
+{
+    private final ClassLoader classLoader;
+    private final Method deserializeOnInstance;
+
+    public InvokableInstance(ClassLoader classLoader)
+    {
+        this.classLoader = classLoader;
+        try
+        {
+            this.deserializeOnInstance = 
classLoader.loadClass(InvokableInstance.class.getName()).getDeclaredMethod("deserializeOneObject",
 byte[].class);
+        }
+        catch (ClassNotFoundException | NoSuchMethodException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public interface SerializableCallable<T> extends Callable<T>, Serializable 
{ public T call(); }
+    public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T> 
call) { return (SerializableCallable<T>) transferOneObject(call); }
+    public <T> T callOnInstance(SerializableCallable<T> call) { return 
callsOnInstance(call).call(); }
+
+    public interface SerializableRunnable extends Runnable, Serializable {}
+    public SerializableRunnable runsOnInstance(SerializableRunnable run) { 
return (SerializableRunnable) transferOneObject(run); }
+    public void runOnInstance(SerializableRunnable run) { 
runsOnInstance(run).run(); }
+
+    public interface SerializableConsumer<T> extends Consumer<T>, Serializable 
{}
+    public <T> SerializableConsumer<T> 
acceptsOnInstance(SerializableConsumer<T> consumer) { return 
(SerializableConsumer<T>) transferOneObject(consumer); }
+
+    public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1, 
T2>, Serializable {}
+    public <T1, T2> SerializableBiConsumer<T1, T2> 
acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return 
(SerializableBiConsumer<T1, T2>) transferOneObject(consumer); }
+
+    public interface SerializableFunction<I, O> extends Function<I, O>, 
Serializable {}
+    public <I, O> SerializableFunction<I, O> 
appliesOnInstance(SerializableFunction<I, O> f) { return 
(SerializableFunction<I, O>) transferOneObject(f); }
+
+    public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, 
I2, O>, Serializable {}
+    public <I1, I2, O> SerializableBiFunction<I1, I2, O> 
appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return 
(SerializableBiFunction<I1, I2, O>) transferOneObject(f); }
+
+    public interface SerializableTriFunction<I1, I2, I3, O> extends 
Serializable
+    {
+        O apply(I1 i1, I2 i2, I3 i3);
+    }
+    public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O> 
appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return 
(SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); }
+
+    public interface InstanceFunction<I, O> extends 
SerializableBiFunction<Instance, I, O> {}
+
+    // E must be a functional interface, and lambda must be implemented by a 
lambda function
+    public <E extends Serializable> E invokesOnInstance(E lambda)
+    {
+        return (E) transferOneObject(lambda);
+    }
+
+    public Object transferOneObject(Object object)
+    {
+        byte[] bytes = serializeOneObject(object);
+        try
+        {
+            Object onInstance = deserializeOnInstance.invoke(null, bytes);
+            if (onInstance.getClass().getClassLoader() != classLoader)
+                throw new IllegalStateException(onInstance + " seemingly from 
wrong class loader: " + onInstance.getClass().getClassLoader() + ", but 
expected " + classLoader);
+
+            return onInstance;
+        }
+        catch (IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private byte[] serializeOneObject(Object object)
+    {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos))
+        {
+            oos.writeObject(object);
+            oos.close();
+            return baos.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SuppressWarnings("unused") // called through method invocation
+    public static Object deserializeOneObject(byte[] bytes)
+    {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+             ObjectInputStream ois = new ObjectInputStream(bais);)
+        {
+            return ois.readObject();
+        }
+        catch (IOException | ClassNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Message.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Message.java 
b/test/distributed/org/apache/cassandra/distributed/Message.java
new file mode 100644
index 0000000..b5492a2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Message.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+// a container for simplifying the method signature for per-instance message 
handling/delivery
+public class Message
+{
+    public final int verb;
+    public final byte[] bytes;
+    public final int id;
+    public final int version;
+    public final InetAddressAndPort from;
+
+    public Message(int verb, byte[] bytes, int id, int version, 
InetAddressAndPort from)
+    {
+        this.verb = verb;
+        this.bytes = bytes;
+        this.id = id;
+        this.version = version;
+        this.from = from;
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to