This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-2.2 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push: new 521a6e2 Fixed a NullPointerException when calling nodetool enablethrift 521a6e2 is described below commit 521a6e2aa9f8a4bc95dd13e768ec6de33cf6fa15 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Oct 12 09:30:41 2020 -0700 Fixed a NullPointerException when calling nodetool enablethrift patch by David Capwell; reviewed by Ekaterina Dimitrova, Jordan West, Yifan Cai for CASSANDRA-16127 --- CHANGES.txt | 1 + build.xml | 3 + .../apache/cassandra/service/CassandraDaemon.java | 139 +++++++++----- .../apache/cassandra/service/StorageService.java | 20 +- .../distributed/impl/AbstractCluster.java | 18 +- .../cassandra/distributed/impl/Instance.java | 32 +++- .../cassandra/distributed/shared/Byteman.java | 207 +++++++++++++++++++++ .../cassandra/distributed/shared/Shared.java | 37 ++++ .../test/BootstrapBinaryDisabledTest.java | 165 ++++++++++++++++ .../test/ClientNetworkStopStartTest.java | 192 +++++++++++++++++++ test/resources/byteman/stream_failure.btm | 14 ++ 11 files changed, 761 insertions(+), 67 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 73e9ba9..1274689 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.2.19 * Fix ExceptionInInitializerError when data_file_directories is not set (CASSANDRA-16008) + * Fixed a NullPointerException when calling nodetool enablethrift (CASSANDRA-16127) 2.2.18 * Fix CQL parsing of collections when the column type is reversed (CASSANDRA-15814) diff --git a/build.xml b/build.xml index 657cbbb..1e701f8 100644 --- a/build.xml +++ b/build.xml @@ -397,6 +397,7 @@ <dependency groupId="junit" artifactId="junit" version="4.6" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" /> + <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> @@ -513,6 +514,7 @@ <dependency groupId="junit" artifactId="junit"/> <dependency groupId="org.mockito" artifactId="mockito-core" /> <dependency groupId="org.apache.cassandra" artifactId="dtest-api" /> + <dependency groupId="org.reflections" artifactId="reflections" /> <dependency groupId="org.apache.rat" artifactId="apache-rat"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> @@ -538,6 +540,7 @@ version="${version}"/> <dependency groupId="junit" artifactId="junit"/> <dependency groupId="org.mockito" artifactId="mockito-core" /> + <dependency groupId="org.reflections" artifactId="reflections" /> <dependency groupId="org.apache.pig" artifactId="pig"> <exclusion groupId="xmlenc" artifactId="xmlenc"/> <exclusion groupId="tomcat" artifactId="jasper-runtime"/> diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 86e2464..a67011d 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -29,17 +29,14 @@ import java.rmi.AlreadyBoundException; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.RemoteException; -import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMIServerSocketFactory; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; - import javax.management.ObjectName; import javax.management.StandardMBean; import javax.management.remote.JMXConnectorServer; @@ -47,6 +44,13 @@ import javax.management.remote.JMXServiceURL; import javax.management.remote.rmi.RMIConnectorServer; import javax.management.remote.rmi.RMIJRMPServerImpl; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.addthis.metrics3.reporter.config.ReporterConfig; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistryListener; @@ -55,18 +59,18 @@ import com.codahale.metrics.jvm.BufferPoolMetricSet; import com.codahale.metrics.jvm.FileDescriptorRatioGauge; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Uninterruptibles; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.*; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SizeEstimatesRecorder; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.WindowsFailedSnapshotTracker; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.StartupException; @@ -78,7 +82,14 @@ import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.Mx4jTool; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.RMIServerSocketFactoryImpl; +import org.apache.cassandra.utils.WindowsTimer; /** * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon @@ -154,8 +165,8 @@ public class CassandraDaemon private static final CassandraDaemon instance = new CassandraDaemon(); - public Server thriftServer; - public Server nativeServer; + private volatile Server thriftServer; + private volatile Server nativeServer; private final boolean runManaged; protected final StartupChecks startupChecks; @@ -377,12 +388,12 @@ public class CassandraDaemon if (sizeRecorderInterval > 0) ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS); - initializeNativeTransport(); + initializeClientTransports(); completeSetup(); } - public void initializeNativeTransport() + public synchronized void initializeClientTransports() { // Thrift InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); @@ -396,22 +407,6 @@ public class CassandraDaemon nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); } - public void startNativeTransport() - { - validateTransportsCanStart(); - - if (nativeServer == null) - throw new IllegalStateException("native transport should be set up before it can be started"); - - nativeServer.start(); - logger.info("Native server running on {}", new InetSocketAddress(DatabaseDescriptor.getRpcAddress(), DatabaseDescriptor.getNativeTransportPort())); - - if (thriftServer == null) - throw new IllegalStateException("thrift transport should be set up before it can be started"); - thriftServer.start(); - logger.info("Thrift server running on {}", new InetSocketAddress(DatabaseDescriptor.getRpcAddress(), DatabaseDescriptor.getRpcPort())); - } - private void validateTransportsCanStart() { // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in @@ -512,6 +507,8 @@ public class CassandraDaemon */ public void start() { + // check to see if transports may start else return without starting. This is needed when in survey mode or + // when bootstrap has not completed. try { validateTransportsCanStart(); @@ -519,10 +516,15 @@ public class CassandraDaemon catch (IllegalStateException isx) { // If there are any errors, we just log and return in this case - logger.info(isx.getMessage()); + logger.warn(isx.getMessage()); return; } + startClientTransports(); + } + + private void startClientTransports() + { String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { @@ -533,7 +535,7 @@ public class CassandraDaemon String rpcFlag = System.getProperty("cassandra.start_rpc"); if ((rpcFlag != null && Boolean.parseBoolean(rpcFlag)) || (rpcFlag == null && DatabaseDescriptor.startRpc())) - thriftServer.start(); + startThriftServer(); else logger.info("Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it"); } @@ -548,8 +550,7 @@ public class CassandraDaemon // On linux, this doesn't entirely shut down Cassandra, just the RPC server. // jsvc takes care of taking the rest down logger.info("Cassandra shutting down..."); - thriftServer.stop(); - nativeServer.stop(); + destroyClientTransports(); // On windows, we need to stop the entire system as prunsrv doesn't have the jsvc hooks // We rely on the shutdown hook to drain the node @@ -570,24 +571,14 @@ public class CassandraDaemon } @VisibleForTesting - public void destroyNativeTransport() throws InterruptedException + public void destroyClientTransports() { // In 2.2, just stopping the server works. Future versions require `destroy` to be called // so we maintain the name for consistency - if (nativeServer != null) - { - nativeServer.stopAndAwaitTermination(); - nativeServer = null; - } - - if (thriftServer != null) - { - thriftServer.stopAndAwaitTermination(); - thriftServer = null; - } + stopThriftServer(); + stopNativeTransport(); } - /** * Clean up all resources obtained during the lifetime of the daemon. This * is a hook for JSVC. @@ -638,6 +629,8 @@ public class CassandraDaemon } start(); + + logger.info("Startup complete"); } catch (Throwable e) { @@ -665,6 +658,52 @@ public class CassandraDaemon } } + public void startNativeTransport() + { + validateTransportsCanStart(); + + if (nativeServer == null) + throw new IllegalStateException("setup() must be called first for CassandraDaemon"); + + nativeServer.start(); + logger.info("Native server running on {}", new InetSocketAddress(DatabaseDescriptor.getRpcAddress(), DatabaseDescriptor.getNativeTransportPort())); + } + + public void stopNativeTransport() + { + if (nativeServer != null) + { + nativeServer.stopAndAwaitTermination(); + } + } + + public boolean isNativeTransportRunning() + { + return nativeServer != null && nativeServer.isRunning(); + } + + public void startThriftServer() + { + validateTransportsCanStart(); + + if (thriftServer == null) + throw new IllegalStateException("setup() must be called first for CassandraDaemon"); + thriftServer.start(); + } + + public void stopThriftServer() + { + if (thriftServer != null) + { + thriftServer.stopAndAwaitTermination(); + } + } + + public boolean isThriftServerRunning() + { + return thriftServer != null && thriftServer.isRunning(); + } + /** * A convenience method to stop and destroy the daemon in one shot. */ diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b1d8e26..7781f2b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -355,8 +355,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } } - - daemon.thriftServer.start(); + + daemon.startThriftServer(); } public void stopRPCServer() @@ -365,17 +365,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { throw new IllegalStateException("No configured daemon"); } - if (daemon.thriftServer != null) - daemon.thriftServer.stop(); + daemon.stopThriftServer(); } public boolean isRPCServerRunning() { - if ((daemon == null) || (daemon.thriftServer == null)) + if (daemon == null) { return false; } - return daemon.thriftServer.isRunning(); + return daemon.isThriftServerRunning(); } public void startNativeTransport() @@ -403,7 +402,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE try { - daemon.nativeServer.start(); + daemon.startNativeTransport(); } catch (Exception e) { @@ -417,17 +416,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { throw new IllegalStateException("No configured daemon"); } - if (daemon.nativeServer != null) - daemon.nativeServer.stop(); + daemon.stopNativeTransport(); } public boolean isNativeTransportRunning() { - if ((daemon == null) || (daemon.nativeServer == null)) + if (daemon == null) { return false; } - return daemon.nativeServer.isRunning(); + return daemon.isNativeTransportRunning(); } public void stopTransports() diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index f74078e..cb55d3e 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -65,12 +66,14 @@ import org.apache.cassandra.distributed.shared.AbstractBuilder; import org.apache.cassandra.distributed.shared.InstanceClassLoader; import org.apache.cassandra.distributed.shared.MessageFilters; import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.shared.Shared; import org.apache.cassandra.distributed.shared.ShutdownException; import org.apache.cassandra.distributed.shared.Versions; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.reflections.Reflections; /** * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}. @@ -106,6 +109,13 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I private static final Logger logger = LoggerFactory.getLogger(AbstractCluster.class); private static final AtomicInteger GENERATION = new AtomicInteger(); + // include byteman so tests can use + private static final Set<String> SHARED_CLASSES = findClassesMarkedForSharedClassLoader(); + private static final Predicate<String> SHARED_PREDICATE = s -> + SHARED_CLASSES.contains(s) || + InstanceClassLoader.getDefaultLoadSharedFilter().test(s) || + s.startsWith("org.jboss.byteman"); + private final UUID clusterId = UUID.randomUUID(); private final File root; private final ClassLoader sharedClassLoader; @@ -163,7 +173,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I private IInvokableInstance newInstance(int generation) { - ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader); + ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE); if (instanceInitializer != null) instanceInitializer.accept(classLoader, config.num()); return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader) @@ -718,5 +728,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I .collect(Collectors.toList()); } + private static Set<String> findClassesMarkedForSharedClassLoader() + { + return new Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream() + .map(Class::getName) + .collect(Collectors.toSet()); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index db97e79..0e07231 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -101,6 +101,7 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; @@ -567,6 +568,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } // TODO: this is more than just gossip + StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting()); if (config.has(GOSSIP)) { StorageService.instance.initServer(); @@ -581,11 +583,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance SystemKeyspace.finishStartup(); + CassandraDaemon.getInstanceForTesting().setupCompleted(); + if (config.has(NATIVE_PROTOCOL)) { - CassandraDaemon.getInstanceForTesting().initializeNativeTransport(); - CassandraDaemon.getInstanceForTesting().startNativeTransport(); - StorageService.instance.setRpcReady(true); + CassandraDaemon.getInstanceForTesting().initializeClientTransports(); + CassandraDaemon.getInstanceForTesting().start(); } if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress())) @@ -680,7 +683,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance error = parallelRun(error, executor, () -> StorageService.instance.setRpcReady(false), - CassandraDaemon.getInstanceForTesting()::destroyNativeTransport); + CassandraDaemon.getInstanceForTesting()::destroyClientTransports); if (config.has(GOSSIP) || config.has(NETWORK)) { @@ -692,7 +695,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } error = parallelRun(error, executor, - MessagingService.instance()::shutdown + (IgnoreThrowingRunnable) MessagingService.instance()::shutdown ); error = parallelRun(error, executor, @@ -846,5 +849,24 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance } return accumulate; } + + @FunctionalInterface + private interface IgnoreThrowingRunnable extends ThrowingRunnable + { + void doRun() throws Throwable; + + @Override + default void run() + { + try + { + doRun(); + } + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + } + } + } } diff --git a/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java b/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java new file mode 100644 index 0000000..bc27ec7 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/shared/Byteman.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.nio.charset.StandardCharsets; +import java.security.CodeSigner; +import java.security.CodeSource; +import java.security.ProtectionDomain; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.StandardSystemProperty; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jboss.byteman.agent.Transformer; + +public final class Byteman +{ + private static final Logger logger = LoggerFactory.getLogger(Byteman.class); + + private static final boolean DEBUG_TRANSFORMATIONS = Boolean.getBoolean("cassandra.test.byteman.transformations.debug"); + private static final Method METHOD; + private static final URL BYTEMAN; + + static + { + try + { + Method method = ClassLoader.class.getDeclaredMethod("defineClass", + String.class, byte[].class, Integer.TYPE, Integer.TYPE, + ProtectionDomain.class); + method.setAccessible(true); + METHOD = method; + } + catch (NoSuchMethodException e) + { + throw new AssertionError(e); + } + + try + { + // this is just to make it more clear when you inspect a class that it was created by byteman + // the code source will show it came from byteman:// which isn't a valid java URL (hence the stream handler + // override) + BYTEMAN = new URL(null, "byteman://", new URLStreamHandler() { + protected URLConnection openConnection(URL u) + { + throw new UnsupportedOperationException(); + } + }); + } + catch (MalformedURLException e) + { + throw new AssertionError(e); + } + } + + private final Transformer transformer; + private final List<KlassDetails> klasses; + + public static Byteman createFromScripts(String... scripts) + { + List<String> texts = Stream.of(scripts).map(p -> { + try + { + return Files.toString(new File(p), StandardCharsets.UTF_8); + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + + return new Byteman(Arrays.asList(scripts), texts, extractClasses(texts)); + } + + public static Byteman createFromText(String text) + { + return new Byteman(Arrays.asList("invalid"), Arrays.asList(text), extractClasses(Arrays.asList(text))); + } + + private Byteman(List<String> scripts, List<String> texts, Set<String> modifiedClassNames) + { + klasses = modifiedClassNames.stream().map(fullyQualifiedKlass -> { + try + { + Class<?> klass = Class.forName(fullyQualifiedKlass); + String klassPath = fullyQualifiedKlass.replace(".", "/"); + byte[] bytes = ByteStreams.toByteArray(Thread.currentThread().getContextClassLoader().getResourceAsStream(klassPath + ".class")); + + return new KlassDetails(klassPath, klass, klass.getProtectionDomain(), bytes); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + + try + { + this.transformer = new Transformer(null, null, scripts, texts, false); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public void install(ClassLoader cl) + { + try + { + for (KlassDetails details : klasses) + { + byte[] newBytes = transformer.transform(cl, details.klassPath, details.klass, details.protectionDomain, details.bytes); + if (newBytes == null) + throw new AssertionError("Unable to transform bytes for " + details.klassPath); + + // inject the bytes into the classloader + METHOD.invoke(cl, null, newBytes, 0, newBytes.length, + new ProtectionDomain(new CodeSource(BYTEMAN, new CodeSigner[0]), details.protectionDomain.getPermissions())); + if (DEBUG_TRANSFORMATIONS) + { + File f = new File(StandardSystemProperty.JAVA_IO_TMPDIR.value(), "byteman/" + details.klassPath + ".class"); + f.getParentFile().mkdirs(); + File original = new File(f.getParentFile(), "original-" + f.getName()); + logger.info("Writing class file for {} to {}", details.klassPath, f.getAbsolutePath()); + Files.asByteSink(f).write(newBytes); + Files.asByteSink(original).write(details.bytes); + } + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private static Set<String> extractClasses(List<String> texts) + { + Pattern pattern = Pattern.compile("^CLASS (.*)$"); + Set<String> modifiedClassNames = new HashSet<>(); + for (String text : texts) + { + for (String line : text.split("\n")) + { + Matcher matcher = pattern.matcher(line); + if (!matcher.find()) + continue; + modifiedClassNames.add(matcher.group(1)); + } + } + if (modifiedClassNames.isEmpty()) + throw new AssertionError("Unable to find any classes to modify"); + return modifiedClassNames; + } + + private static final class KlassDetails + { + private final String klassPath; + private final Class<?> klass; + private final ProtectionDomain protectionDomain; + private final byte[] bytes; + + public KlassDetails(String klassPath, + Class<?> klass, ProtectionDomain protectionDomain, byte[] bytes) + { + this.klassPath = klassPath; + this.klass = klass; + this.protectionDomain = protectionDomain; + this.bytes = bytes; + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java new file mode 100644 index 0000000..a1047b6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.shared; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Tells jvm-dtest that a class should be shared accross all {@link ClassLoader}s. + * + * Jvm-dtest relies on classloader isolation to run multiple cassandra instances in the same JVM, this makes it + * so some classes do not get shared (outside a blesssed set of classes/packages). When the default behavior + * is not desirable, this annotation will tell jvm-dtest to share the class accross all class loaders. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.TYPE }) +public @interface Shared +{ +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java new file mode 100644 index 0000000..3ac5028 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.LogResult; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.Byteman; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.shared.Shared; + +/** + * Replaces python dtest bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled + */ +public class BootstrapBinaryDisabledTest extends TestBaseImpl +{ + @Test + public void test() throws IOException, TimeoutException + { + Map<String, Object> config = new HashMap<>(); + config.put("authenticator", "org.apache.cassandra.auth.PasswordAuthenticator"); + config.put("authorizer", "org.apache.cassandra.auth.CassandraAuthorizer"); + config.put("role_manager", "org.apache.cassandra.auth.CassandraRoleManager"); + config.put("permissions_validity_in_ms", 0); + config.put("roles_validity_in_ms", 0); + + int originalNodeCount = 1; + int expandedNodeCount = originalNodeCount + 2; + Byteman byteman = Byteman.createFromScripts("test/resources/byteman/stream_failure.btm"); + try (Cluster cluster = init(Cluster.build(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(c -> { + config.forEach(c::set); + c.with(Feature.GOSSIP, Feature.NETWORK, Feature.NATIVE_PROTOCOL); + }) + .withInstanceInitializer((cl, nodeNumber) -> { + switch (nodeNumber) { + case 1: + case 2: + byteman.install(cl); + break; + } + }) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text primary key)"); + populate(cluster.get(1)); + cluster.forEach(c -> c.flush(KEYSPACE)); + + bootstrap(cluster, config, false); + // Test write survey behaviour + bootstrap(cluster, config, true); + } + } + + private static void bootstrap(Cluster cluster, + Map<String, Object> config, + boolean isWriteSurvey) throws TimeoutException + { + IInstanceConfig nodeConfig = cluster.newInstanceConfig(); + nodeConfig.set("auto_bootstrap", true); + config.forEach(nodeConfig::set); + + //TODO can we make this more isolated? + System.setProperty("cassandra.ring_delay_ms", "5000"); + if (isWriteSurvey) + System.setProperty("cassandra.write_survey", "true"); + + RewriteEnabled.enable(); + cluster.bootstrap(nodeConfig).startup(); + IInvokableInstance node = cluster.get(cluster.size()); + assertLogHas(node, "Some data streaming failed"); + assertLogHas(node, isWriteSurvey ? + "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled" : + "Node is not yet bootstrapped completely"); + + node.nodetoolResult("join").asserts() + .failure() + .errorContains("Cannot join the ring until bootstrap completes"); + + RewriteEnabled.disable(); + node.nodetoolResult("bootstrap", "resume").asserts().success(); + if (isWriteSurvey) + assertLogHas(node, "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled"); + + if (isWriteSurvey) + { + node.nodetoolResult("join").asserts().success(); + assertLogHas(node, "Leaving write survey mode and joining ring at operator request"); + } + + node.logs().watchFor("Starting listening for CQL clients"); + assertBootstrapState(node, "COMPLETED"); + } + + private static void assertBootstrapState(IInvokableInstance node, String expected) + { + SimpleQueryResult qr = node.executeInternalWithResult("SELECT bootstrapped FROM system.local WHERE key='local'"); + Assert.assertTrue("No rows found", qr.hasNext()); + Assert.assertEquals(expected, qr.next().getString("bootstrapped")); + } + + private static void assertLogHas(IInvokableInstance node, String msg) + { + LogResult<List<String>> results = node.logs().grep(msg); + Assert.assertFalse("Unable to find '" + msg + "'", results.getResult().isEmpty()); + } + + private void populate(IInvokableInstance inst) + { + for (int i = 0; i < 10; i++) + inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)", Integer.toString(i)); + } + + @Shared + public static final class RewriteEnabled + { + private static volatile boolean enabled = false; + + public static boolean isEnabled() + { + return enabled; + } + + public static void enable() + { + enabled = true; + } + + public static void disable() + { + enabled = false; + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java new file mode 100644 index 0000000..1d23ac7 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Objects; + +import org.junit.Assert; +import org.junit.Test; + +import com.datastax.driver.core.Session; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.QueryResults; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.thrift.TException; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +public class ClientNetworkStopStartTest extends TestBaseImpl +{ + /** + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a> + */ + @Test + public void stopStartThrift() throws IOException, TException + { + try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start())) + { + IInvokableInstance node = cluster.get(1); + assertTransportStatus(node, "binary", true); + assertTransportStatus(node, "thrift", true); + node.nodetoolResult("disablethrift").asserts().success(); + assertTransportStatus(node, "binary", true); + assertTransportStatus(node, "thrift", false); + node.nodetoolResult("enablethrift").asserts().success(); + assertTransportStatus(node, "binary", true); + assertTransportStatus(node, "thrift", true); + + // now use it to make sure it still works! + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))"); + + ThriftClientUtils.thriftClient(node, thrift -> { + thrift.set_keyspace(KEYSPACE); + Mutation mutation = new Mutation(); + ColumnOrSuperColumn csoc = new ColumnOrSuperColumn(); + Column column = new Column(); + column.setName(CompositeType.build(ByteBufferUtil.bytes("value"))); + column.setValue(ByteBufferUtil.bytes(0)); + column.setTimestamp(System.currentTimeMillis()); + csoc.setColumn(column); + mutation.setColumn_or_supercolumn(csoc); + + thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0), + Collections.singletonMap("tbl", Arrays.asList(mutation))), + org.apache.cassandra.thrift.ConsistencyLevel.ALL); + }); + + SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); + AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build()); + } + } + + /** + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a> + */ + @Test + public void stopStartNative() throws IOException + { + try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start())) + { + IInvokableInstance node = cluster.get(1); + assertTransportStatus(node, "binary", true); + assertTransportStatus(node, "thrift", true); + node.nodetoolResult("disablebinary").asserts().success(); + assertTransportStatus(node, "binary", false); + assertTransportStatus(node, "thrift", true); + node.nodetoolResult("enablebinary").asserts().success(); + assertTransportStatus(node, "binary", true); + assertTransportStatus(node, "thrift", true); + + // now use it to make sure it still works! + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))"); + + try (com.datastax.driver.core.Cluster client = com.datastax.driver.core.Cluster.builder().addContactPoints(node.broadcastAddress().getAddress()).build(); + Session session = client.connect()) + { + session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, value) VALUES (?, ?)", 0, 0); + } + + SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL); + AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build()); + } + } + + private static void assertTransportStatus(IInvokableInstance node, String transport, boolean running) + { + assertNodetoolStdout(node, running ? "running" : "not running", running ? "not running" : null, "status" + transport); + } + + private static void assertNodetoolStdout(IInvokableInstance node, String expectedStatus, String notExpected, String... nodetool) + { + // without CASSANDRA-16057 need this hack + PrintStream previousStdout = System.out; + try + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream stdout = new PrintStream(out, true); + System.setOut(stdout); + + node.nodetoolResult(nodetool).asserts().success(); + + stdout.flush(); + String output = out.toString(); + Assert.assertThat(output, new StringContains(expectedStatus)); + if (notExpected != null) + Assert.assertThat(output, new StringNotContains(notExpected)); + } + finally + { + System.setOut(previousStdout); + } + } + + private static final class StringContains extends BaseMatcher<String> + { + private final String expected; + + private StringContains(String expected) + { + this.expected = Objects.requireNonNull(expected); + } + + public boolean matches(Object o) + { + return o.toString().contains(expected); + } + + public void describeTo(Description description) + { + description.appendText("Expected to find '" + expected + "', but did not"); + } + } + + private static final class StringNotContains extends BaseMatcher<String> + { + private final String notExpected; + + private StringNotContains(String expected) + { + this.notExpected = Objects.requireNonNull(expected); + } + + public boolean matches(Object o) + { + return !o.toString().contains(notExpected); + } + + public void describeTo(Description description) + { + description.appendText("Expected not to find '" + notExpected + "', but did"); + } + } +} diff --git a/test/resources/byteman/stream_failure.btm b/test/resources/byteman/stream_failure.btm new file mode 100644 index 0000000..e40f7fe --- /dev/null +++ b/test/resources/byteman/stream_failure.btm @@ -0,0 +1,14 @@ +# +# Inject streaming failure +# +# Before start streaming files in `StreamSession#prepare()` method, +# interrupt streaming by throwing RuntimeException. +# +RULE inject stream failure +CLASS org.apache.cassandra.streaming.StreamSession +METHOD prepare +AT INVOKE maybeCompleted +IF org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest$RewriteEnabled.isEnabled() +DO + throw new java.lang.RuntimeException("Triggering network failure") +ENDRULE --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org