lujiajing1126 commented on code in PR #720:
URL: https://github.com/apache/skywalking-java/pull/720#discussion_r1820430428


##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTask.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.asyncprofiler;
+
+import one.profiler.AsyncProfiler;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class AsyncProfilerTask {
+    private static final ILog LOGGER = 
LogManager.getLogger(AsyncProfilerTask.class);
+    private static final String COMMA = ",";
+    /**
+     * task id
+     */
+    private String taskId;
+    /**
+     * User input parameters
+     * @see <a 
href="https://github.com/async-profiler/async-profiler/blob/v1.8.4/src/arguments.cpp#L49";>async-profiler
 argument</a>
+     */
+    private String execArgs;
+    /**
+     * run profiling for duration (second)
+     */
+    private int duration;
+    /**
+     * The time when oap server created this task
+     */
+    private long createTime;
+    /**
+     * tempFile generated by async-profiler execution
+     */
+    private Path tempFile;
+
+    private static String execute(AsyncProfiler asyncProfiler, String args)
+            throws IllegalArgumentException, IOException {
+        LOGGER.info("async profiler execute args:{}", args);
+        String result = asyncProfiler.execute(args);

Review Comment:
   What is the meaning of this return val?



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerTaskExecutionService.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.asyncprofiler;
+
+import one.profiler.AsyncProfiler;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@DefaultImplementor
+public class AsyncProfilerTaskExecutionService implements BootService {
+
+    private static final ILog LOGGER = 
LogManager.getLogger(AsyncProfilerTaskChannelService.class);
+
+    private static final AsyncProfiler ASYNC_PROFILER = 
AsyncProfiler.getInstance();
+
+    private static final String SUCCESS_RESULT = "Profiling started";
+
+    // profile executor thread pool, only running one thread
+    private static final ScheduledExecutorService ASYNC_PROFILER_EXECUTOR = 
Executors.newSingleThreadScheduledExecutor(
+            new DefaultNamedThreadFactory("ASYNC-PROFILING-TASK"));
+
+    // last command create time, use to next query task list
+    private volatile long lastCommandCreateTime = -1;
+
+    // task schedule future
+    private volatile ScheduledFuture<?> scheduledFuture;
+
+    public void processAsyncProfilerTask(AsyncProfilerTask task) {
+        if (task.getCreateTime() <= lastCommandCreateTime) {
+            LOGGER.warn("get repeat task because createTime is less than 
lastCommandCreateTime");
+            return;
+        }
+        lastCommandCreateTime = task.getCreateTime();
+        LOGGER.info("add async profiler task: {}", task.getTaskId());
+        // add task to list
+        ASYNC_PROFILER_EXECUTOR.execute(() -> {
+            try {
+                if (Objects.nonNull(scheduledFuture) && 
!scheduledFuture.isDone()) {
+                    LOGGER.info("AsyncProfilerTask already running");
+                    return;
+                }
+
+                String result = task.start(ASYNC_PROFILER);
+                if (!SUCCESS_RESULT.equals(result)) {
+                    stopWhenError(task, result);
+                    return;
+                }
+                scheduledFuture = ASYNC_PROFILER_EXECUTOR.schedule(
+                        () -> stopWhenSuccess(task), task.getDuration(), 
TimeUnit.SECONDS
+                );
+            } catch (IOException e) {
+                LOGGER.error("AsyncProfilerTask executor error:" + 
e.getMessage(), e);
+            }
+        });
+    }
+
+    private void stopWhenError(AsyncProfilerTask task, String errorMessage) {
+        LOGGER.error("AsyncProfilerTask start fail result:" + errorMessage);
+        AsyncProfilerDataSender dataSender = 
ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class);
+        dataSender.sendError(task, errorMessage);
+    }
+
+    private void stopWhenSuccess(AsyncProfilerTask task) {
+
+        try {
+            File dumpFile = task.stop(ASYNC_PROFILER);
+            // stop task
+            try (FileInputStream fileInputStream = new 
FileInputStream(dumpFile)) {
+                // upload file
+                FileChannel channel = fileInputStream.getChannel();
+
+                AsyncProfilerDataSender dataSender = 
ServiceManager.INSTANCE.findService(AsyncProfilerDataSender.class);
+                dataSender.sendData(task, channel);
+            }
+
+            if (!dumpFile.delete()) {
+                LOGGER.warn("delete async profiler dump file failed");
+            }
+        } catch (Exception e) {
+            LOGGER.error("stop async profiler task error", e);
+            return;
+        }
+    }
+
+    public long getLastCommandCreateTime() {
+        return lastCommandCreateTime;
+    }
+
+    @Override
+    public void prepare() throws Throwable {
+
+    }
+
+    @Override
+    public void boot() throws Throwable {
+
+    }
+
+    @Override
+    public void onComplete() throws Throwable {
+
+    }
+
+    @Override
+    public void shutdown() throws Throwable {
+        ASYNC_PROFILER_EXECUTOR.shutdown();

Review Comment:
   Shall we cancel the future before shutdown?



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.asyncprofiler;
+
+import com.google.protobuf.ByteString;
+import io.grpc.Channel;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
+import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
+import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
+import 
org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerCollectionResponse;
+import 
org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerData;
+import 
org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerMetaData;
+import 
org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilerTaskGrpc;
+import 
org.apache.skywalking.apm.network.language.asyncprofiler.v10.AsyncProfilingStatus;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
+@DefaultImplementor
+public class AsyncProfilerDataSender implements BootService, 
GRPCChannelListener {
+    private static final ILog LOGGER = 
LogManager.getLogger(ProfileSnapshotSender.class);
+    private static final int DATA_CHUNK_SIZE = 1024 * 1024;
+
+    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
+
+    private volatile AsyncProfilerTaskGrpc.AsyncProfilerTaskStub 
asyncProfilerTaskStub;
+
+    @Override
+    public void prepare() throws Throwable {
+        
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
+    }
+
+    @Override
+    public void boot() throws Throwable {
+
+    }
+
+    @Override
+    public void onComplete() throws Throwable {
+
+    }
+
+    @Override
+    public void shutdown() throws Throwable {
+
+    }
+
+    @Override
+    public void statusChanged(GRPCChannelStatus status) {
+        if (GRPCChannelStatus.CONNECTED.equals(status)) {
+            Channel channel = 
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
+            asyncProfilerTaskStub = AsyncProfilerTaskGrpc.newStub(channel);
+        } else {
+            asyncProfilerTaskStub = null;
+        }
+        this.status = status;
+    }
+
+    public void sendData(AsyncProfilerTask task, FileChannel channel) throws 
IOException, InterruptedException {
+        if (status != GRPCChannelStatus.CONNECTED || Objects.isNull(channel) 
|| !channel.isOpen()) {
+            return;
+        }
+
+        int size = Math.toIntExact(channel.size());
+        final GRPCStreamServiceStatus status = new 
GRPCStreamServiceStatus(false);
+        StreamObserver<AsyncProfilerData> dataStreamObserver = 
asyncProfilerTaskStub.withDeadlineAfter(
+                GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
+        ).collect(new ClientResponseObserver<AsyncProfilerData, 
AsyncProfilerCollectionResponse>() {
+            ClientCallStreamObserver<AsyncProfilerData> requestStream;
+
+            @Override
+            public void 
beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
+                this.requestStream = requestStream;
+            }
+
+            @Override
+            public void onNext(AsyncProfilerCollectionResponse value) {
+                if 
(!AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {

Review Comment:
   I suppose positive check is better here. For example,
   
   ```java
   switch (value.getType()) {
   case AsyncProfilingStatus.TERMINATED_BY_OVERSIZE:
       // ???
       break;
   default:
       // ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to