This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 521a6e2  Fixed a NullPointerException when calling nodetool 
enablethrift
521a6e2 is described below

commit 521a6e2aa9f8a4bc95dd13e768ec6de33cf6fa15
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Oct 12 09:30:41 2020 -0700

    Fixed a NullPointerException when calling nodetool enablethrift
    
    patch by David Capwell; reviewed by Ekaterina Dimitrova, Jordan West, Yifan 
Cai for CASSANDRA-16127
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |   3 +
 .../apache/cassandra/service/CassandraDaemon.java  | 139 +++++++++-----
 .../apache/cassandra/service/StorageService.java   |  20 +-
 .../distributed/impl/AbstractCluster.java          |  18 +-
 .../cassandra/distributed/impl/Instance.java       |  32 +++-
 .../cassandra/distributed/shared/Byteman.java      | 207 +++++++++++++++++++++
 .../cassandra/distributed/shared/Shared.java       |  37 ++++
 .../test/BootstrapBinaryDisabledTest.java          | 165 ++++++++++++++++
 .../test/ClientNetworkStopStartTest.java           | 192 +++++++++++++++++++
 test/resources/byteman/stream_failure.btm          |  14 ++
 11 files changed, 761 insertions(+), 67 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 73e9ba9..1274689 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.19
  * Fix ExceptionInInitializerError when data_file_directories is not set 
(CASSANDRA-16008)
+ * Fixed a NullPointerException when calling nodetool enablethrift 
(CASSANDRA-16127)
 
 2.2.18
  * Fix CQL parsing of collections when the column type is reversed 
(CASSANDRA-15814)
diff --git a/build.xml b/build.xml
index 657cbbb..1e701f8 100644
--- a/build.xml
+++ b/build.xml
@@ -397,6 +397,7 @@
           <dependency groupId="junit" artifactId="junit" version="4.6" />
           <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.5" />
+          <dependency groupId="org.reflections" artifactId="reflections" 
version="0.9.12" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
@@ -513,6 +514,7 @@
         <dependency groupId="junit" artifactId="junit"/>
         <dependency groupId="org.mockito" artifactId="mockito-core" />
         <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
+        <dependency groupId="org.reflections" artifactId="reflections" />
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
        <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
@@ -538,6 +540,7 @@
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
         <dependency groupId="org.mockito" artifactId="mockito-core" />
+        <dependency groupId="org.reflections" artifactId="reflections" />
         <dependency groupId="org.apache.pig" artifactId="pig">
           <exclusion groupId="xmlenc" artifactId="xmlenc"/>
           <exclusion groupId="tomcat" artifactId="jasper-runtime"/>
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 86e2464..a67011d 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -29,17 +29,14 @@ import java.rmi.AlreadyBoundException;
 import java.rmi.NotBoundException;
 import java.rmi.Remote;
 import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.rmi.server.RMIClientSocketFactory;
 import java.rmi.server.RMIServerSocketFactory;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 import javax.management.remote.JMXConnectorServer;
@@ -47,6 +44,13 @@ import javax.management.remote.JMXServiceURL;
 import javax.management.remote.rmi.RMIConnectorServer;
 import javax.management.remote.rmi.RMIJRMPServerImpl;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.addthis.metrics3.reporter.config.ReporterConfig;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistryListener;
@@ -55,18 +59,18 @@ import com.codahale.metrics.jvm.BufferPoolMetricSet;
 import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
 import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SizeEstimatesRecorder;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.StartupException;
@@ -78,7 +82,14 @@ import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.Mx4jTool;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.RMIServerSocketFactoryImpl;
+import org.apache.cassandra.utils.WindowsTimer;
 
 /**
  * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
@@ -154,8 +165,8 @@ public class CassandraDaemon
 
     private static final CassandraDaemon instance = new CassandraDaemon();
 
-    public Server thriftServer;
-    public Server nativeServer;
+    private volatile Server thriftServer;
+    private volatile Server nativeServer;
 
     private final boolean runManaged;
     protected final StartupChecks startupChecks;
@@ -377,12 +388,12 @@ public class CassandraDaemon
         if (sizeRecorderInterval > 0)
             
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, sizeRecorderInterval, TimeUnit.SECONDS);
 
-        initializeNativeTransport();
+        initializeClientTransports();
 
         completeSetup();
     }
 
-    public void initializeNativeTransport()
+    public synchronized void initializeClientTransports()
     {
         // Thrift
         InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
@@ -396,22 +407,6 @@ public class CassandraDaemon
         nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, 
nativePort);
     }
 
-    public void startNativeTransport()
-    {
-        validateTransportsCanStart();
-
-        if (nativeServer == null)
-            throw new IllegalStateException("native transport should be set up 
before it can be started");
-
-        nativeServer.start();
-        logger.info("Native server running on {}", new 
InetSocketAddress(DatabaseDescriptor.getRpcAddress(), 
DatabaseDescriptor.getNativeTransportPort()));
-
-        if (thriftServer == null)
-            throw new IllegalStateException("thrift transport should be set up 
before it can be started");
-        thriftServer.start();
-        logger.info("Thrift server running on {}", new 
InetSocketAddress(DatabaseDescriptor.getRpcAddress(), 
DatabaseDescriptor.getRpcPort()));
-    }
-
     private void validateTransportsCanStart()
     {
         // We only start transports if bootstrap has completed and we're not 
in survey mode, OR if we are in
@@ -512,6 +507,8 @@ public class CassandraDaemon
      */
     public void start()
     {
+        // check to see if transports may start else return without starting.  
This is needed when in survey mode or
+        // when bootstrap has not completed.
         try
         {
             validateTransportsCanStart();
@@ -519,10 +516,15 @@ public class CassandraDaemon
         catch (IllegalStateException isx)
         {
             // If there are any errors, we just log and return in this case
-            logger.info(isx.getMessage());
+            logger.warn(isx.getMessage());
             return;
         }
 
+        startClientTransports();
+    }
+
+    private void startClientTransports()
+    {
         String nativeFlag = 
System.getProperty("cassandra.start_native_transport");
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || 
(nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
         {
@@ -533,7 +535,7 @@ public class CassandraDaemon
 
         String rpcFlag = System.getProperty("cassandra.start_rpc");
         if ((rpcFlag != null && Boolean.parseBoolean(rpcFlag)) || (rpcFlag == 
null && DatabaseDescriptor.startRpc()))
-            thriftServer.start();
+            startThriftServer();
         else
             logger.info("Not starting RPC server as requested. Use JMX 
(StorageService->startRPCServer()) or nodetool (enablethrift) to start it");
     }
@@ -548,8 +550,7 @@ public class CassandraDaemon
         // On linux, this doesn't entirely shut down Cassandra, just the RPC 
server.
         // jsvc takes care of taking the rest down
         logger.info("Cassandra shutting down...");
-        thriftServer.stop();
-        nativeServer.stop();
+        destroyClientTransports();
 
         // On windows, we need to stop the entire system as prunsrv doesn't 
have the jsvc hooks
         // We rely on the shutdown hook to drain the node
@@ -570,24 +571,14 @@ public class CassandraDaemon
     }
 
     @VisibleForTesting
-    public void destroyNativeTransport() throws InterruptedException
+    public void destroyClientTransports()
     {
         // In 2.2, just stopping the server works. Future versions require 
`destroy` to be called
         // so we maintain the name for consistency
-        if (nativeServer != null)
-        {
-            nativeServer.stopAndAwaitTermination();
-            nativeServer = null;
-        }
-
-        if (thriftServer != null)
-        {
-            thriftServer.stopAndAwaitTermination();
-            thriftServer = null;
-        }
+        stopThriftServer();
+        stopNativeTransport();
     }
 
-
     /**
      * Clean up all resources obtained during the lifetime of the daemon. This
      * is a hook for JSVC.
@@ -638,6 +629,8 @@ public class CassandraDaemon
             }
 
             start();
+
+            logger.info("Startup complete");
         }
         catch (Throwable e)
         {
@@ -665,6 +658,52 @@ public class CassandraDaemon
         }
     }
 
+    public void startNativeTransport()
+    {
+        validateTransportsCanStart();
+
+        if (nativeServer == null)
+            throw new IllegalStateException("setup() must be called first for 
CassandraDaemon");
+
+        nativeServer.start();
+        logger.info("Native server running on {}", new 
InetSocketAddress(DatabaseDescriptor.getRpcAddress(), 
DatabaseDescriptor.getNativeTransportPort()));
+    }
+
+    public void stopNativeTransport()
+    {
+        if (nativeServer != null)
+        {
+            nativeServer.stopAndAwaitTermination();
+        }
+    }
+
+    public boolean isNativeTransportRunning()
+    {
+        return nativeServer != null && nativeServer.isRunning();
+    }
+
+    public void startThriftServer()
+    {
+        validateTransportsCanStart();
+
+        if (thriftServer == null)
+            throw new IllegalStateException("setup() must be called first for 
CassandraDaemon");
+        thriftServer.start();
+    }
+
+    public void stopThriftServer()
+    {
+        if (thriftServer != null)
+        {
+            thriftServer.stopAndAwaitTermination();
+        }
+    }
+
+    public boolean isThriftServerRunning()
+    {
+        return thriftServer != null && thriftServer.isRunning();
+    }
+
     /**
      * A convenience method to stop and destroy the daemon in one shot.
      */
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index b1d8e26..7781f2b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -355,8 +355,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 }
             }
         }
-        
-        daemon.thriftServer.start();
+
+        daemon.startThriftServer();
     }
 
     public void stopRPCServer()
@@ -365,17 +365,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        if (daemon.thriftServer != null)
-            daemon.thriftServer.stop();
+        daemon.stopThriftServer();
     }
 
     public boolean isRPCServerRunning()
     {
-        if ((daemon == null) || (daemon.thriftServer == null))
+        if (daemon == null)
         {
             return false;
         }
-        return daemon.thriftServer.isRunning();
+        return daemon.isThriftServerRunning();
     }
 
     public void startNativeTransport()
@@ -403,7 +402,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         try
         {
-            daemon.nativeServer.start();
+            daemon.startNativeTransport();
         }
         catch (Exception e)
         {
@@ -417,17 +416,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        if (daemon.nativeServer != null)
-            daemon.nativeServer.stop();
+        daemon.stopNativeTransport();
     }
 
     public boolean isNativeTransportRunning()
     {
-        if ((daemon == null) || (daemon.nativeServer == null))
+        if (daemon == null)
         {
             return false;
         }
-        return daemon.nativeServer.isRunning();
+        return daemon.isNativeTransportRunning();
     }
 
     public void stopTransports()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index f74078e..cb55d3e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -65,12 +66,14 @@ import 
org.apache.cassandra.distributed.shared.AbstractBuilder;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.shared.ShutdownException;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.reflections.Reflections;
 
 /**
  * AbstractCluster creates, initializes and manages Cassandra instances 
({@link Instance}.
@@ -106,6 +109,13 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractCluster.class);
     private static final AtomicInteger GENERATION = new AtomicInteger();
 
+    // include byteman so tests can use
+    private static final Set<String> SHARED_CLASSES = 
findClassesMarkedForSharedClassLoader();
+    private static final Predicate<String> SHARED_PREDICATE = s ->
+                                                              
SHARED_CLASSES.contains(s) ||
+                                                              
InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
+                                                              
s.startsWith("org.jboss.byteman");
+
     private final UUID clusterId = UUID.randomUUID();
     private final File root;
     private final ClassLoader sharedClassLoader;
@@ -163,7 +173,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
 
         private IInvokableInstance newInstance(int generation)
         {
-            ClassLoader classLoader = new InstanceClassLoader(generation, 
config.num(), version.classpath, sharedClassLoader);
+            ClassLoader classLoader = new InstanceClassLoader(generation, 
config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE);
             if (instanceInitializer != null)
                 instanceInitializer.accept(classLoader, config.num());
             return 
Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, 
IInvokableInstance>)Instance::new, classLoader)
@@ -718,5 +728,11 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
                .collect(Collectors.toList());
     }
 
+    private static Set<String> findClassesMarkedForSharedClassLoader()
+    {
+        return new 
Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
+                                                      .map(Class::getName)
+                                                      
.collect(Collectors.toSet());
+    }
 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index db97e79..0e07231 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -101,6 +101,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throwables;
@@ -567,6 +568,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 }
 
                 // TODO: this is more than just gossip
+                
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                 if (config.has(GOSSIP))
                 {
                     StorageService.instance.initServer();
@@ -581,11 +583,12 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
 
                 SystemKeyspace.finishStartup();
 
+                CassandraDaemon.getInstanceForTesting().setupCompleted();
+
                 if (config.has(NATIVE_PROTOCOL))
                 {
-                    
CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
-                    
CassandraDaemon.getInstanceForTesting().startNativeTransport();
-                    StorageService.instance.setRpcReady(true);
+                    
CassandraDaemon.getInstanceForTesting().initializeClientTransports();
+                    CassandraDaemon.getInstanceForTesting().start();
                 }
 
                 if 
(!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
@@ -680,7 +683,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
 
             error = parallelRun(error, executor,
                     () -> StorageService.instance.setRpcReady(false),
-                    
CassandraDaemon.getInstanceForTesting()::destroyNativeTransport);
+                    
CassandraDaemon.getInstanceForTesting()::destroyClientTransports);
 
             if (config.has(GOSSIP) || config.has(NETWORK))
             {
@@ -692,7 +695,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             }
 
             error = parallelRun(error, executor,
-                                MessagingService.instance()::shutdown
+                                (IgnoreThrowingRunnable) 
MessagingService.instance()::shutdown
             );
 
             error = parallelRun(error, executor,
@@ -846,5 +849,24 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }
         return accumulate;
     }
+
+    @FunctionalInterface
+    private interface IgnoreThrowingRunnable extends ThrowingRunnable
+    {
+        void doRun() throws Throwable;
+
+        @Override
+        default void run()
+        {
+            try
+            {
+                doRun();
+            }
+            catch (Throwable e)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+            }
+        }
+    }
 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java 
b/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java
new file mode 100644
index 0000000..bc27ec7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.nio.charset.StandardCharsets;
+import java.security.CodeSigner;
+import java.security.CodeSource;
+import java.security.ProtectionDomain;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.StandardSystemProperty;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.jboss.byteman.agent.Transformer;
+
+public final class Byteman
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(Byteman.class);
+
+    private static final boolean DEBUG_TRANSFORMATIONS = 
Boolean.getBoolean("cassandra.test.byteman.transformations.debug");
+    private static final Method METHOD;
+    private static final URL BYTEMAN;
+
+    static
+    {
+        try
+        {
+            Method method = ClassLoader.class.getDeclaredMethod("defineClass",
+                                                                String.class, 
byte[].class, Integer.TYPE, Integer.TYPE,
+                                                                
ProtectionDomain.class);
+            method.setAccessible(true);
+            METHOD = method;
+        }
+        catch (NoSuchMethodException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        try
+        {
+            // this is just to make it more clear when you inspect a class 
that it was created by byteman
+            // the code source will show it came from byteman:// which isn't a 
valid java URL (hence the stream handler
+            // override)
+            BYTEMAN = new URL(null, "byteman://", new URLStreamHandler() {
+                protected URLConnection openConnection(URL u)
+                {
+                    throw new UnsupportedOperationException();
+                }
+            });
+        }
+        catch (MalformedURLException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private final Transformer transformer;
+    private final List<KlassDetails> klasses;
+
+    public static Byteman createFromScripts(String... scripts)
+    {
+        List<String> texts = Stream.of(scripts).map(p -> {
+            try
+            {
+                return Files.toString(new File(p), StandardCharsets.UTF_8);
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException(e);
+            }
+        }).collect(Collectors.toList());
+
+        return new Byteman(Arrays.asList(scripts), texts, 
extractClasses(texts));
+    }
+
+    public static Byteman createFromText(String text)
+    {
+        return new Byteman(Arrays.asList("invalid"), Arrays.asList(text), 
extractClasses(Arrays.asList(text)));
+    }
+
+    private Byteman(List<String> scripts, List<String> texts, Set<String> 
modifiedClassNames)
+    {
+        klasses = modifiedClassNames.stream().map(fullyQualifiedKlass -> {
+            try
+            {
+                Class<?> klass = Class.forName(fullyQualifiedKlass);
+                String klassPath = fullyQualifiedKlass.replace(".", "/");
+                byte[] bytes = 
ByteStreams.toByteArray(Thread.currentThread().getContextClassLoader().getResourceAsStream(klassPath
 + ".class"));
+
+                return new KlassDetails(klassPath, klass, 
klass.getProtectionDomain(), bytes);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+
+        try
+        {
+            this.transformer = new Transformer(null, null, scripts, texts, 
false);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void install(ClassLoader cl)
+    {
+        try
+        {
+            for (KlassDetails details : klasses)
+            {
+                byte[] newBytes = transformer.transform(cl, details.klassPath, 
details.klass, details.protectionDomain, details.bytes);
+                if (newBytes == null)
+                    throw new AssertionError("Unable to transform bytes for " 
+ details.klassPath);
+
+                // inject the bytes into the classloader
+                METHOD.invoke(cl, null, newBytes, 0, newBytes.length,
+                              new ProtectionDomain(new CodeSource(BYTEMAN, new 
CodeSigner[0]), details.protectionDomain.getPermissions()));
+                if (DEBUG_TRANSFORMATIONS)
+                {
+                    File f = new 
File(StandardSystemProperty.JAVA_IO_TMPDIR.value(), "byteman/" + 
details.klassPath + ".class");
+                    f.getParentFile().mkdirs();
+                    File original = new File(f.getParentFile(), "original-" + 
f.getName());
+                    logger.info("Writing class file for {} to {}", 
details.klassPath, f.getAbsolutePath());
+                    Files.asByteSink(f).write(newBytes);
+                    Files.asByteSink(original).write(details.bytes);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Set<String> extractClasses(List<String> texts)
+    {
+        Pattern pattern = Pattern.compile("^CLASS (.*)$");
+        Set<String> modifiedClassNames = new HashSet<>();
+        for (String text : texts)
+        {
+            for (String line : text.split("\n"))
+            {
+                Matcher matcher = pattern.matcher(line);
+                if (!matcher.find())
+                    continue;
+                modifiedClassNames.add(matcher.group(1));
+            }
+        }
+        if (modifiedClassNames.isEmpty())
+            throw new AssertionError("Unable to find any classes to modify");
+        return modifiedClassNames;
+    }
+
+    private static final class KlassDetails
+    {
+        private final String klassPath;
+        private final Class<?> klass;
+        private final ProtectionDomain protectionDomain;
+        private final byte[] bytes;
+
+        public KlassDetails(String klassPath,
+                            Class<?> klass, ProtectionDomain protectionDomain, 
byte[] bytes)
+        {
+            this.klassPath = klassPath;
+            this.klass = klass;
+            this.protectionDomain = protectionDomain;
+            this.bytes = bytes;
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java 
b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
new file mode 100644
index 0000000..a1047b6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Tells jvm-dtest that a class should be shared accross all {@link 
ClassLoader}s.
+ *
+ * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
+ * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
+ * is not desirable, this annotation will tell jvm-dtest to share the class 
accross all class loaders.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE })
+public @interface Shared
+{
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
new file mode 100644
index 0000000..3ac5028
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.Byteman;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Shared;
+
+/**
+ * Replaces python dtest 
bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled
+ */
+public class BootstrapBinaryDisabledTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException, TimeoutException
+    {
+        Map<String, Object> config = new HashMap<>();
+        config.put("authenticator", 
"org.apache.cassandra.auth.PasswordAuthenticator");
+        config.put("authorizer", 
"org.apache.cassandra.auth.CassandraAuthorizer");
+        config.put("role_manager", 
"org.apache.cassandra.auth.CassandraRoleManager");
+        config.put("permissions_validity_in_ms", 0);
+        config.put("roles_validity_in_ms", 0);
+
+        int originalNodeCount = 1;
+        int expandedNodeCount = originalNodeCount + 2;
+        Byteman byteman = 
Byteman.createFromScripts("test/resources/byteman/stream_failure.btm");
+        try (Cluster cluster = init(Cluster.build(originalNodeCount)
+                                           
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                           
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                           .withConfig(c -> {
+                                               config.forEach(c::set);
+                                               c.with(Feature.GOSSIP, 
Feature.NETWORK, Feature.NATIVE_PROTOCOL);
+                                           })
+                                           .withInstanceInitializer((cl, 
nodeNumber) -> {
+                                               switch (nodeNumber) {
+                                                   case 1:
+                                                   case 2:
+                                                       byteman.install(cl);
+                                                       break;
+                                               }
+                                           })
+                                           .start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text 
primary key)");
+            populate(cluster.get(1));
+            cluster.forEach(c -> c.flush(KEYSPACE));
+
+            bootstrap(cluster, config, false);
+            // Test write survey behaviour
+            bootstrap(cluster, config, true);
+        }
+    }
+
+    private static void bootstrap(Cluster cluster,
+                                  Map<String, Object> config,
+                                  boolean isWriteSurvey) throws 
TimeoutException
+    {
+        IInstanceConfig nodeConfig = cluster.newInstanceConfig();
+        nodeConfig.set("auto_bootstrap", true);
+        config.forEach(nodeConfig::set);
+
+        //TODO can we make this more isolated?
+        System.setProperty("cassandra.ring_delay_ms", "5000");
+        if (isWriteSurvey)
+            System.setProperty("cassandra.write_survey", "true");
+
+        RewriteEnabled.enable();
+        cluster.bootstrap(nodeConfig).startup();
+        IInvokableInstance node = cluster.get(cluster.size());
+        assertLogHas(node, "Some data streaming failed");
+        assertLogHas(node, isWriteSurvey ?
+                           "Not starting client transports in write_survey 
mode as it's bootstrapping or auth is enabled" :
+                           "Node is not yet bootstrapped completely");
+
+        node.nodetoolResult("join").asserts()
+             .failure()
+             .errorContains("Cannot join the ring until bootstrap completes");
+
+        RewriteEnabled.disable();
+        node.nodetoolResult("bootstrap", "resume").asserts().success();
+        if (isWriteSurvey)
+            assertLogHas(node, "Not starting client transports in write_survey 
mode as it's bootstrapping or auth is enabled");
+
+        if (isWriteSurvey)
+        {
+            node.nodetoolResult("join").asserts().success();
+            assertLogHas(node, "Leaving write survey mode and joining ring at 
operator request");
+        }
+
+        node.logs().watchFor("Starting listening for CQL clients");
+        assertBootstrapState(node, "COMPLETED");
+    }
+
+    private static void assertBootstrapState(IInvokableInstance node, String 
expected)
+    {
+        SimpleQueryResult qr = node.executeInternalWithResult("SELECT 
bootstrapped FROM system.local WHERE key='local'");
+        Assert.assertTrue("No rows found", qr.hasNext());
+        Assert.assertEquals(expected, qr.next().getString("bootstrapped"));
+    }
+
+    private static void assertLogHas(IInvokableInstance node, String msg)
+    {
+        LogResult<List<String>> results = node.logs().grep(msg);
+        Assert.assertFalse("Unable to find '" + msg + "'", 
results.getResult().isEmpty());
+    }
+
+    private void populate(IInvokableInstance inst)
+    {
+        for (int i = 0; i < 10; i++)
+            inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES 
(?)", Integer.toString(i));
+    }
+
+    @Shared
+    public static final class RewriteEnabled
+    {
+        private static volatile boolean enabled = false;
+
+        public static boolean isEnabled()
+        {
+            return enabled;
+        }
+
+        public static void enable()
+        {
+            enabled = true;
+        }
+
+        public static void disable()
+        {
+            enabled = false;
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
new file mode 100644
index 0000000..1d23ac7
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
+public class ClientNetworkStopStartTest extends TestBaseImpl
+{
+    /**
+     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-16127";>CASSANDRA-16127</a>
+     */
+    @Test
+    public void stopStartThrift() throws IOException, TException
+    {
+        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL)).start()))
+        {
+            IInvokableInstance node = cluster.get(1);
+            assertTransportStatus(node, "binary", true);
+            assertTransportStatus(node, "thrift", true);
+            node.nodetoolResult("disablethrift").asserts().success();
+            assertTransportStatus(node, "binary", true);
+            assertTransportStatus(node, "thrift", false);
+            node.nodetoolResult("enablethrift").asserts().success();
+            assertTransportStatus(node, "binary", true);
+            assertTransportStatus(node, "thrift", true);
+
+            // now use it to make sure it still works!
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
value int, PRIMARY KEY (pk))");
+
+            ThriftClientUtils.thriftClient(node, thrift -> {
+                thrift.set_keyspace(KEYSPACE);
+                Mutation mutation = new Mutation();
+                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
+                Column column = new Column();
+                
column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
+                column.setValue(ByteBufferUtil.bytes(0));
+                column.setTimestamp(System.currentTimeMillis());
+                csoc.setColumn(column);
+                mutation.setColumn_or_supercolumn(csoc);
+
+                
thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
+                                                             
Collections.singletonMap("tbl", Arrays.asList(mutation))),
+                                    
org.apache.cassandra.thrift.ConsistencyLevel.ALL);
+            });
+
+            SimpleQueryResult qr = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", 
ConsistencyLevel.ALL);
+            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 
0).build());
+        }
+    }
+
+    /**
+     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-16127";>CASSANDRA-16127</a>
+     */
+    @Test
+    public void stopStartNative() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL)).start()))
+        {
+            IInvokableInstance node = cluster.get(1);
+            assertTransportStatus(node, "binary", true);
+            assertTransportStatus(node, "thrift", true);
+            node.nodetoolResult("disablebinary").asserts().success();
+            assertTransportStatus(node, "binary", false);
+            assertTransportStatus(node, "thrift", true);
+            node.nodetoolResult("enablebinary").asserts().success();
+            assertTransportStatus(node, "binary", true);
+            assertTransportStatus(node, "thrift", true);
+
+            // now use it to make sure it still works!
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
value int, PRIMARY KEY (pk))");
+
+            try (com.datastax.driver.core.Cluster client = 
com.datastax.driver.core.Cluster.builder().addContactPoints(node.broadcastAddress().getAddress()).build();
+                 Session session = client.connect())
+            {
+                session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, value) 
VALUES (?, ?)", 0, 0);
+            }
+
+            SimpleQueryResult qr = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", 
ConsistencyLevel.ALL);
+            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 
0).build());
+        }
+    }
+
+    private static void assertTransportStatus(IInvokableInstance node, String 
transport, boolean running)
+    {
+        assertNodetoolStdout(node, running ? "running" : "not running", 
running ? "not running" : null, "status" + transport);
+    }
+
+    private static void assertNodetoolStdout(IInvokableInstance node, String 
expectedStatus, String notExpected, String... nodetool)
+    {
+        // without CASSANDRA-16057 need this hack
+        PrintStream previousStdout = System.out;
+        try
+        {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            PrintStream stdout = new PrintStream(out, true);
+            System.setOut(stdout);
+
+            node.nodetoolResult(nodetool).asserts().success();
+
+            stdout.flush();
+            String output = out.toString();
+            Assert.assertThat(output, new StringContains(expectedStatus));
+            if (notExpected != null)
+                Assert.assertThat(output, new StringNotContains(notExpected));
+        }
+        finally
+        {
+            System.setOut(previousStdout);
+        }
+    }
+
+    private static final class StringContains extends BaseMatcher<String>
+    {
+        private final String expected;
+
+        private StringContains(String expected)
+        {
+            this.expected = Objects.requireNonNull(expected);
+        }
+
+        public boolean matches(Object o)
+        {
+            return o.toString().contains(expected);
+        }
+
+        public void describeTo(Description description)
+        {
+            description.appendText("Expected to find '" + expected + "', but 
did not");
+        }
+    }
+
+    private static final class StringNotContains extends BaseMatcher<String>
+    {
+        private final String notExpected;
+
+        private StringNotContains(String expected)
+        {
+            this.notExpected = Objects.requireNonNull(expected);
+        }
+
+        public boolean matches(Object o)
+        {
+            return !o.toString().contains(notExpected);
+        }
+
+        public void describeTo(Description description)
+        {
+            description.appendText("Expected not to find '" + notExpected + 
"', but did");
+        }
+    }
+}
diff --git a/test/resources/byteman/stream_failure.btm 
b/test/resources/byteman/stream_failure.btm
new file mode 100644
index 0000000..e40f7fe
--- /dev/null
+++ b/test/resources/byteman/stream_failure.btm
@@ -0,0 +1,14 @@
+#
+# Inject streaming failure
+#
+# Before start streaming files in `StreamSession#prepare()` method,
+# interrupt streaming by throwing RuntimeException.
+#
+RULE inject stream failure
+CLASS org.apache.cassandra.streaming.StreamSession
+METHOD prepare
+AT INVOKE maybeCompleted
+IF 
org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest$RewriteEnabled.isEnabled()
+DO
+   throw new java.lang.RuntimeException("Triggering network failure")
+ENDRULE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to