smiklosovic commented on code in PR #4487:
URL: https://github.com/apache/cassandra/pull/4487#discussion_r2599681406


##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)

Review Comment:
   can you please use "VisibleForTesting" annotation here? Maybe reducing 
visibility to package protected would be OK as well?



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())

Review Comment:
   I am not completely sure we should do this. Because if we do not register 
MBean, then you will not be able to call any `nodetool profile` command. We 
still want to be able to reach it, it is just we will say that it is not 
enabled. That is different from not registering MBean at all. Also, if this do 
_after_ we call `instance.getProfiler()`, then if that fails, we will never 
register it. We want to register it even if it fails. Then we will say that it 
is not possible to execute the commands because it was not possible to 
initialize it. 



##########
src/java/org/apache/cassandra/tools/nodetool/AsyncProfileCommandGroup.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.nio.file.Files;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.service.AsyncProfilerService.AsyncProfilerEvent;
+import org.apache.cassandra.service.AsyncProfilerService.AsyncProfilerFormat;
+import org.apache.cassandra.utils.FBUtilities;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.stream.Collectors.joining;
+import static org.apache.cassandra.service.AsyncProfilerService.parseDuration;
+import static 
org.apache.cassandra.service.AsyncProfilerService.validateCommand;
+import static 
org.apache.cassandra.service.AsyncProfilerService.validateOutputFileName;
+
+@Command(name = "profile", description = "Manage Async-Profiler on a Cassandra 
process",
+subcommands = {
+AsyncProfileCommandGroup.AsyncProfileStartCommand.class,
+AsyncProfileCommandGroup.AsyncProfileStopCommand.class,
+AsyncProfileCommandGroup.AsyncProfileExecuteCommand.class,
+AsyncProfileCommandGroup.AsyncProfilePurgeCommand.class,
+AsyncProfileCommandGroup.AsyncProfileListCommand.class,
+AsyncProfileCommandGroup.AsyncProfileFetchCommand.class,
+AsyncProfileCommandGroup.AsyncProfileStatusCommand.class
+})
+public class AsyncProfileCommandGroup extends AbstractCommand
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        AbstractCommand cmd = new AsyncProfileStartCommand();
+        cmd.probe(probe);
+        cmd.logger(output);
+        cmd.run();
+    }
+
+    private static void doWithProfiler(NodeProbe probe, 
Consumer<AsyncProfilerMBean> consumer, boolean requiresEnabledProfiler)
+    {
+        AsyncProfilerMBean profiler = probe.getAsyncProfilerProxy();
+
+        if (requiresEnabledProfiler && !profiler.isEnabled())
+        {
+            probe.output().err.println("Async-profiler native library is not 
enabled or not possible to load.");
+            System.exit(1);
+        }
+
+        consumer.accept(profiler);
+    }
+
+    private static String getOutputFileName(AsyncProfilerFormat outputFormat)
+    {
+        String filename = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")
+                                           
.withZone(ZoneId.systemDefault()).format(FBUtilities.now());
+
+        if (outputFormat == AsyncProfilerFormat.jfr)
+            filename += ".jfr";
+        else
+            filename += ".html";
+
+        return filename;
+    }
+
+    public static void doWithProfiler(NodeProbe probe, 
Consumer<AsyncProfilerMBean> consumer)
+    {
+        doWithProfiler(probe, consumer, true);
+    }
+
+    @Command(name = "start", description = "Run Async-Profiler on a Cassandra 
process")
+    public static class AsyncProfileStartCommand extends AbstractCommand
+    {
+        @Option(names = { "-e", "--event" },
+        description = "Event(s) to profile, one of or combination of 'cpu', 
'alloc', " +
+                      "'lock', 'wall', 'nativemem', 'cache_misses', delimited 
by comma, defaults to 'cpu'")
+        public List<AsyncProfilerEvent> event = 
List.of(AsyncProfilerEvent.cpu);
+
+        @Option(names = { "-o", "--output" }, description = "File name to save 
profiling results into, defaults to a " +
+                                                            "file of name 
'yyyy-MM-dd-HH-mm-ss.html'")
+        public String filename;
+
+        @Option(names = { "-d", "--duration" }, description = "Duration of 
profiling, defaults to '60s'. Accepts string values " +
+                                                              "in the form of 
'5m', '30s' and similar.")
+        public String duration = "60s";
+
+        @Option(names = { "-f", "--format" },
+        description = "Output format, one of 'flat', 'traces', 'collapsed', 
'flamegraph', 'tree', 'jfr', defaults to 'flamegraph'")
+        public AsyncProfilerFormat outputFormat = 
AsyncProfilerFormat.flamegraph;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            // make sure it is valid
+            parseDuration(duration);
+
+            if (filename == null)
+                filename = 
AsyncProfileCommandGroup.getOutputFileName(outputFormat);
+
+            doWithProfiler(probe, profiler -> {
+                if 
(!profiler.start(event.stream().map(Enum::name).collect(joining(",")),
+                                    outputFormat.name(),
+                                    duration,
+                                    validateOutputFileName(filename)))
+                {
+                    output.err.println("Profiler has already started or there 
was a failure to start it.");
+                    System.exit(1);
+                }
+            });
+        }
+    }
+
+    @Command(name = "stop", description = "Stop Async-Profiler on a Cassandra 
process")
+    public static class AsyncProfileStopCommand extends AbstractCommand
+    {
+        @Option(names = { "-o", "--output" }, description = "File name to save 
profiling results into, defaults to a " +
+                                                            "file of name 
'yyyy-MM-dd-HH-mm-ss.html'")
+        public String filename = 
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")
+                                                  
.withZone(ZoneId.systemDefault()).format(FBUtilities.now()) + ".html";
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            doWithProfiler(probe, profiler -> {
+                if (!profiler.stop(validateOutputFileName(filename)))
+                {
+                    output.err.println("Profiler has already stopped or there 
was a failure to stop it.");
+                    System.exit(1);
+                }
+            });
+        }
+    }
+
+    @Command(name = "execute", description = "Execute an arbitrary command on 
Async-Profiler on a Cassandra process.")
+    public static class AsyncProfileExecuteCommand extends AbstractCommand
+    {
+        @Parameters(index = "0", description = "Raw command to execute. There 
has to be 'unsafe' profiler configured " +
+                                               "in Cassandra, driven by 
cassandra.async_profiler.unsafe_mode property set" +
+                                               " to true, to be able to do 
this.", arity = "1")
+        public String command;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            doWithProfiler(probe, profiler -> {
+                try
+                {
+                    
output.out.print(profiler.execute(validateCommand(command)));
+                }
+                catch (SecurityException ex)
+                {
+                    output.err.print(ex.getMessage());
+                    System.exit(1);
+                }
+            });
+        }
+    }
+
+    @Command(name = "purge", description = "Remove all profiling results from 
node's disk")
+    public static class AsyncProfilePurgeCommand extends AbstractCommand
+    {
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            doWithProfiler(probe, AsyncProfilerMBean::purge, false);
+        }
+    }
+
+    @Command(name = "list", description = "List profiling result files of a 
node")
+    public static class AsyncProfileListCommand extends AbstractCommand
+    {
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            doWithProfiler(probe, profiler -> {
+                for (String resultFile : profiler.list())
+                    output.out.println(resultFile);
+            }, false);
+        }
+    }
+
+    @Command(name = "fetch", description = "Copy profiler result file from a 
node to a local file")
+    public static class AsyncProfileFetchCommand extends AbstractCommand
+    {
+        @Option(names = { "-b", "--binary" }, description = "treat file to be 
downloaded having binary content, not string one.")
+        private boolean binary;
+
+        @Parameters(index = "0", description = "Remote profiler file name", 
arity = "1")
+        private String remoteFile;
+
+        @Parameters(index = "1", description = "Local file name", arity = "1")
+        private String localFile;
+
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            doWithProfiler(probe, profiler -> {
+                if (binary)

Review Comment:
   could we investigate how to consolidate this? maybe writing byte[] by
   
   Files.write(new File(localFile).toPath(), content ...
   
   in every situation will make the branching like this go away?



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()
+    {
+        return instance != null;
+    }
+
+    public static String validateOutputFileName(String outputFile)
+    {
+        if (outputFile == null || outputFile.trim().isEmpty())
+            throw new IllegalArgumentException("Output file name must not be 
null or empty.");
+
+        if (!VALID_FILENAME_REGEX_PATTERN.matcher(outputFile).matches())
+            throw new IllegalArgumentException(format("Output file name must 
match pattern %s", VALID_FILENAME_REGEX_PATTERN));
+
+        return outputFile;
+    }
+
+    public static String validateCommand(String command)
+    {
+        if (command == null || command.isBlank())
+            throw new IllegalArgumentException("Command can not be null or 
blank string");
+
+        return command;
+    }
+
+    /**
+     * @param duration duration of profiling
+     * @return converted string representation of duration to seconds
+     */
+    public static int parseDuration(String duration)
+    {
+        int durationSeconds = new 
DurationSpec.IntSecondsBound(duration).toSeconds();
+        if (durationSeconds > MAX_SAFE_PROFILING_DURATION)
+            throw new IllegalArgumentException(format("Max profiling duration 
is %s seconds. If you need longer profiling, use execute command instead",
+                                                      
MAX_SAFE_PROFILING_DURATION));
+        return new DurationSpec.IntSecondsBound(duration).toSeconds();
+    }
+
+    /**
+     * @throws ConfigurationException in case it is not possible to configure 
directory for logs.
+     */
+    private void createLogDir() throws ConfigurationException
+    {
+        String dir = new File(logDir).toAbsolute().toString();
+
+        if ((DatabaseDescriptor.getCommitLogLocation() != null && 
dir.startsWith(DatabaseDescriptor.getCommitLogLocation())) ||
+            (DatabaseDescriptor.getAccordJournalDirectory() != null && 
dir.startsWith(DatabaseDescriptor.getAccordJournalDirectory())) ||
+            
dir.startsWith(DatabaseDescriptor.getHintsDirectory().absolutePath()) ||
+            (DatabaseDescriptor.getCDCLogLocation() != null && 
dir.startsWith(DatabaseDescriptor.getCDCLogLocation())) ||
+            (DatabaseDescriptor.getSavedCachesLocation() != null && 
dir.startsWith(DatabaseDescriptor.getSavedCachesLocation())))
+        {
+            throw new ConfigurationException("You can not store Async-Profiler 
results into system Cassandra directory.");
+        }
+
+        for (String location : 
StorageService.instance.getAllDataFileLocations())
+        {
+            if (dir.startsWith(location))
+            {
+                throw new ConfigurationException("You can not store 
Async-Profiler results into a data directory of Cassandra.");
+            }
+        }
+
+        try
+        {
+            new File(logDir).createDirectoriesIfNotExists();
+        }
+        catch (Throwable t)
+        {
+            throw new ConfigurationException("Unable to create directory " + 
logDir);
+        }
+    }
+
+    private boolean isRunning()
+    {
+        if (!isEnabled())
+            return false;
+
+        try
+        {
+            String status = status();
+            return status != null && status.contains("Profiling is running");
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException(t);
+        }
+    }
+
+    private <T> T run(ThrowingFunction<AsyncProfiler, T> f)
+    {
+        if (asyncProfiler != null)
+        {
+            try
+            {
+                return f.apply(asyncProfiler);
+            }
+            catch (IllegalStateException | IllegalArgumentException t)
+            {
+                throw t;
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        else
+            throw new IllegalStateException("Async profiler not available.");
+    }
+
+    public abstract static class ThrowingFunction<A, B>
+    {
+        public abstract B apply(AsyncProfiler a) throws Throwable;
+    }
+
+    private Optional<AsyncProfiler> getProfiler()

Review Comment:
   this method is used only once in production code:
   
       asyncProfiler = instance.getProfiler().orElse(null);
   
   Maybe we can covert it to return `AsyncProfiler` only, without `Optional`. 
We have rewritten the logic in `run` method which checks if it is null or not, 
it does not work with Optional anymore so ... 



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_LOG_DIR;
+
+public class AsyncProfilerService
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr, otlp;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    private AsyncProfiler profilerInstance;
+
+    private String logDir;
+
+    public synchronized void enable()
+    {
+        if (isEnabled())
+            return;
+
+        ASYNC_PROFILER_ENABLED.setBoolean(true);
+        maybeInitialize();
+    }
+
+    public synchronized void disable()
+    {
+        if (!isEnabled())
+            return;
+
+        if (isRunning())
+            stop(null);
+
+        ASYNC_PROFILER_ENABLED.setBoolean(false);
+        profilerInstance = null;
+    }
+
+    public synchronized AsyncProfiler maybeInitialize()
+    {
+        if (!ASYNC_PROFILER_ENABLED.getBoolean())
+            throw new IllegalStateException("Async-Profiler is not enabled.");
+
+        // if somebody removes dir while a node runs, just recreate it
+        createLogDir();
+
+        if (profilerInstance == null)
+        {
+            try
+            {
+                profilerInstance = one.profiler.AsyncProfiler.getInstance();
+            }
+            catch (ConfigurationException ex)
+            {
+                throw ex;
+            }
+            catch (Throwable t)
+            {
+                throw new IllegalStateException("Unable to get an instance of 
Async-Profiler", t);
+            }
+        }
+
+        return profilerInstance;
+    }
+
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                AsyncProfilerFormat.parseFormat(outputFormat),
+                                AsyncProfilerEvent.parseEvents(events),
+                                parseDuration(duration),
+                                new File(logDir, 
validateOutputFileName(outputFileName)));
+
+            String result = maybeInitialize().execute(cmd);
+            logger.debug("Started Async-Profiler: result={}, cmd={}", result, 
cmd);
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            String cmd = "stop";
+            if (outputFileName != null)
+            {
+                File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                cmd += ",file=" + outputFile.absolutePath();
+            }
+
+            String result = maybeInitialize().execute(cmd);
+            logger.debug("Stopped Async-Profiler: result={}, cmd={}", result, 
cmd);
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    public String execute(String command)
+    {
+        try
+        {
+            String result = 
maybeInitialize().execute(validateCommand(command));
+            logger.debug("Executed raw command in Async-Profiler: result={}, 
cmd={}", result, command);
+            return result;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to execute raw Async-Profiler command {}", 
command, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();

Review Comment:
   We still want to return list of files even it is not enabled. There is no 
harm creating that directory really ... 



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()

Review Comment:
   this should be probably synchronized too



##########
src/java/org/apache/cassandra/service/CassandraDaemon.java:
##########
@@ -39,6 +39,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+

Review Comment:
   remove this empty line



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()
+    {
+        return instance != null;
+    }
+
+    public static String validateOutputFileName(String outputFile)
+    {
+        if (outputFile == null || outputFile.trim().isEmpty())
+            throw new IllegalArgumentException("Output file name must not be 
null or empty.");
+
+        if (!VALID_FILENAME_REGEX_PATTERN.matcher(outputFile).matches())
+            throw new IllegalArgumentException(format("Output file name must 
match pattern %s", VALID_FILENAME_REGEX_PATTERN));
+
+        return outputFile;
+    }
+
+    public static String validateCommand(String command)
+    {
+        if (command == null || command.isBlank())
+            throw new IllegalArgumentException("Command can not be null or 
blank string");
+
+        return command;
+    }
+
+    /**
+     * @param duration duration of profiling
+     * @return converted string representation of duration to seconds
+     */
+    public static int parseDuration(String duration)
+    {
+        int durationSeconds = new 
DurationSpec.IntSecondsBound(duration).toSeconds();
+        if (durationSeconds > MAX_SAFE_PROFILING_DURATION)
+            throw new IllegalArgumentException(format("Max profiling duration 
is %s seconds. If you need longer profiling, use execute command instead",
+                                                      
MAX_SAFE_PROFILING_DURATION));
+        return new DurationSpec.IntSecondsBound(duration).toSeconds();
+    }
+
+    /**
+     * @throws ConfigurationException in case it is not possible to configure 
directory for logs.
+     */
+    private void createLogDir() throws ConfigurationException

Review Comment:
   yeah, rename to `maybeCreateProfilesLogDir` please



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()
+    {
+        return instance != null;
+    }
+
+    public static String validateOutputFileName(String outputFile)
+    {
+        if (outputFile == null || outputFile.trim().isEmpty())
+            throw new IllegalArgumentException("Output file name must not be 
null or empty.");
+
+        if (!VALID_FILENAME_REGEX_PATTERN.matcher(outputFile).matches())
+            throw new IllegalArgumentException(format("Output file name must 
match pattern %s", VALID_FILENAME_REGEX_PATTERN));
+
+        return outputFile;
+    }
+
+    public static String validateCommand(String command)
+    {
+        if (command == null || command.isBlank())
+            throw new IllegalArgumentException("Command can not be null or 
blank string");
+
+        return command;
+    }
+
+    /**
+     * @param duration duration of profiling
+     * @return converted string representation of duration to seconds
+     */
+    public static int parseDuration(String duration)
+    {
+        int durationSeconds = new 
DurationSpec.IntSecondsBound(duration).toSeconds();

Review Comment:
   what does `IntSecondsBound` throw when you insert invalid duration string 
into that? How is that treated from nodetool of when JMX method is called 
directly?



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()
+    {
+        return instance != null;
+    }
+
+    public static String validateOutputFileName(String outputFile)
+    {
+        if (outputFile == null || outputFile.trim().isEmpty())
+            throw new IllegalArgumentException("Output file name must not be 
null or empty.");
+
+        if (!VALID_FILENAME_REGEX_PATTERN.matcher(outputFile).matches())
+            throw new IllegalArgumentException(format("Output file name must 
match pattern %s", VALID_FILENAME_REGEX_PATTERN));
+
+        return outputFile;
+    }
+
+    public static String validateCommand(String command)
+    {
+        if (command == null || command.isBlank())
+            throw new IllegalArgumentException("Command can not be null or 
blank string");
+
+        return command;
+    }
+
+    /**
+     * @param duration duration of profiling
+     * @return converted string representation of duration to seconds
+     */
+    public static int parseDuration(String duration)
+    {
+        int durationSeconds = new 
DurationSpec.IntSecondsBound(duration).toSeconds();
+        if (durationSeconds > MAX_SAFE_PROFILING_DURATION)
+            throw new IllegalArgumentException(format("Max profiling duration 
is %s seconds. If you need longer profiling, use execute command instead",
+                                                      
MAX_SAFE_PROFILING_DURATION));
+        return new DurationSpec.IntSecondsBound(duration).toSeconds();

Review Comment:
   you can just return `durationSeconds` at this point, no?



##########
src/java/org/apache/cassandra/service/AsyncProfilerService.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import javax.management.StandardMBean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import one.profiler.AsyncProfiler;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_ENABLED;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.LOG_DIR;
+
+public class AsyncProfilerService implements AsyncProfilerMBean
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AsyncProfilerService.class);
+
+    private static final EnumSet<AsyncProfilerEvent> VALID_EVENTS = 
EnumSet.allOf(AsyncProfilerEvent.class);
+    private static final EnumSet<AsyncProfilerFormat> VALID_FORMATS = 
EnumSet.allOf(AsyncProfilerFormat.class);
+    private static final Pattern VALID_FILENAME_REGEX_PATTERN = 
Pattern.compile("^[a-zA-Z0-9-]*\\.?[a-zA-Z0-9-]*$");
+    private static final int MAX_SAFE_PROFILING_DURATION = 43200; // 12 hours
+    private static final String ASYNC_PROFILER_LOG_DIR = 
Path.of(LOG_DIR.getString(), "profiler").toString();
+
+    private static AsyncProfilerService instance;
+    private static AsyncProfiler asyncProfiler;
+    private final boolean unsafeMode;
+    private static String logDir;
+
+    // logDir as a parameter to be used by tests.
+    public static synchronized AsyncProfilerService instance(String logDir)
+    {
+        AsyncProfilerService.logDir = logDir;
+        if (instance == null)
+        {
+            try
+            {
+                instance = new 
AsyncProfilerService(ASYNC_PROFILER_UNSAFE_MODE.getBoolean());
+                asyncProfiler = instance.getProfiler().orElse(null);
+                if (ASYNC_PROFILER_ENABLED.getBoolean())
+                {
+                    // register mbean first, before initialisation, which 
might fail (e.g. profiler functionality is disabled)
+                    MBeanWrapper.instance.registerMBean(new 
StandardMBean(AsyncProfilerService.instance, AsyncProfilerMBean.class),
+                                                        
AsyncProfilerService.MBEAN_NAME,
+                                                        
MBeanWrapper.OnException.LOG);
+                }
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(t);
+            }
+        }
+        return AsyncProfilerService.instance;
+    }
+
+    public static synchronized AsyncProfilerService instance()
+    {
+        if (instance == null)
+            return instance(ASYNC_PROFILER_LOG_DIR);
+        else
+            return instance;
+    }
+
+    public AsyncProfilerService(boolean unsafeMode)
+    {
+        this.unsafeMode = unsafeMode;
+    }
+
+    public enum AsyncProfilerEvent
+    {
+        cpu("cpu"),
+        alloc("alloc"),
+        lock("lock"),
+        wall("wall"),
+        nativemem("nativemem"),
+        cache_misses("cache-misses");
+
+        private final String name;
+
+        AsyncProfilerEvent(String name)
+        {
+            this.name = name;
+        }
+
+        public String getEvent()
+        {
+            return name;
+        }
+
+        public static String parseEvents(String rawString)
+        {
+            if (rawString == null || rawString.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                List<String> processedEvents = new ArrayList<>();
+                for (String rawEvent : rawString.split(","))
+                    
processedEvents.add(AsyncProfilerEvent.valueOf(rawEvent).getEvent());
+
+                return String.join(",", processedEvents);
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Event must be one 
or a combination of %s", VALID_EVENTS));
+            }
+        }
+    }
+
+    public enum AsyncProfilerFormat
+    {
+        flat, traces, collapsed, flamegraph, tree, jfr;
+
+        public static String parseFormat(String rawFormat)
+        {
+            if (rawFormat == null || rawFormat.isBlank())
+                throw new IllegalArgumentException("Event can not be null nor 
blank string.");
+
+            try
+            {
+                return AsyncProfilerFormat.valueOf(rawFormat).name();
+            }
+            catch (IllegalArgumentException ex)
+            {
+                throw new IllegalArgumentException(format("Format must be one 
of %s", VALID_FORMATS));
+            }
+        }
+    }
+
+    @Override
+    public synchronized boolean start(String events, String outputFormat, 
String duration, String outputFileName)
+    {
+        if (isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = format("start,%s,event=%s,timeout=%s,file=%s",
+                                        
AsyncProfilerFormat.parseFormat(outputFormat),
+                                        AsyncProfilerEvent.parseEvents(events),
+                                        parseDuration(duration),
+                                        new File(logDir, 
validateOutputFileName(outputFileName)));
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Started Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to start Async-Profiler", t);
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized boolean stop(String outputFileName)
+    {
+        if (!isRunning())
+            return false;
+
+        try
+        {
+            run(new ThrowingFunction<>()
+            {
+                @Override
+                public Object apply(AsyncProfiler profiler) throws Throwable
+                {
+                    String cmd = "stop";
+                    if (outputFileName != null)
+                    {
+                        File outputFile = new File(logDir, 
validateOutputFileName(outputFileName));
+                        cmd += ",file=" + outputFile.absolutePath();
+                    }
+
+                    String result = profiler.execute(cmd);
+                    logger.debug("Stopped Async-Profiler: result={}, cmd={}", 
result, cmd);
+
+                    return null;
+                }
+            });
+            return true;
+        }
+        catch (IllegalStateException | IllegalArgumentException e)
+        {
+            throw e;
+        }
+        catch (Throwable e)
+        {
+            logger.error("Failed to stop Async-Profiler", e);
+            return false;
+        }
+    }
+
+    @Override
+    public String execute(String command)
+    {
+        if (!unsafeMode)
+        {
+            throw new SecurityException(String.format("The arbitrary command 
execution is not permitted " +
+                                                      "with %s MBean. If 
unsafe command execution is required, " +
+                                                      "start Cassandra with %s 
property set to true. " +
+                                                      "Rejected command: %s",
+                                                      
AsyncProfilerService.MBEAN_NAME,
+                                                      
CassandraRelevantProperties.ASYNC_PROFILER_UNSAFE_MODE.name(), command));
+        }
+
+        return run(new ThrowingFunction<AsyncProfiler, String>()
+        {
+            @Override
+            public String apply(AsyncProfiler profiler) throws Throwable
+            {
+                return profiler.execute(validateCommand(command));
+            }
+        });
+    }
+
+    @Override
+    public List<String> list()
+    {
+        try
+        {
+            createLogDir();
+            return Arrays.stream(new 
File(logDir).list()).map(File::name).sorted().collect(toList());
+        }
+        catch (Throwable t)
+        {
+            return List.of();
+        }
+    }
+
+    @Override
+    public byte[] fetch(String resultFile)
+    {
+        try
+        {
+            createLogDir();
+            return Files.readAllBytes(new File(logDir, resultFile).toPath());
+        }
+        catch (Throwable t)
+        {
+            logger.error("Result file " + resultFile + " not found or error 
occurred while returning it.", t);
+            throw new RuntimeException(t);
+        }
+    }
+
+    @Override
+    public void purge()
+    {
+        createLogDir();
+        new File(logDir).deleteRecursive();
+    }
+
+    @Override
+    public String status()
+    {
+        return run(new ThrowingFunction<>()
+        {
+            @Override
+            public String apply(AsyncProfiler asyncProfiler) throws Throwable
+            {
+                return asyncProfiler.execute("status");
+            }
+        });
+    }
+
+    @Override
+    public boolean isEnabled()
+    {
+        return instance != null;
+    }
+
+    public static String validateOutputFileName(String outputFile)
+    {
+        if (outputFile == null || outputFile.trim().isEmpty())
+            throw new IllegalArgumentException("Output file name must not be 
null or empty.");
+
+        if (!VALID_FILENAME_REGEX_PATTERN.matcher(outputFile).matches())
+            throw new IllegalArgumentException(format("Output file name must 
match pattern %s", VALID_FILENAME_REGEX_PATTERN));
+
+        return outputFile;
+    }
+
+    public static String validateCommand(String command)
+    {
+        if (command == null || command.isBlank())
+            throw new IllegalArgumentException("Command can not be null or 
blank string");
+
+        return command;
+    }
+
+    /**
+     * @param duration duration of profiling
+     * @return converted string representation of duration to seconds
+     */
+    public static int parseDuration(String duration)
+    {
+        int durationSeconds = new 
DurationSpec.IntSecondsBound(duration).toSeconds();
+        if (durationSeconds > MAX_SAFE_PROFILING_DURATION)
+            throw new IllegalArgumentException(format("Max profiling duration 
is %s seconds. If you need longer profiling, use execute command instead",

Review Comment:
   Finish the sentence with a dot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to