This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 10f680f  Optimize the configurable Shenyu agent log collection  #2907 
(#3013)
10f680f is described below

commit 10f680fd4a6e2b0647340a2d294ed7fec57fee6c
Author: hutaishi <[email protected]>
AuthorDate: Thu Mar 10 15:25:50 2022 +0800

    Optimize the configurable Shenyu agent log collection  #2907 (#3013)
---
 .../shenyu/agent/api/config/LogCollectConfig.java  | 359 +++++++++++++++++++++
 .../shenyu/agent/plugin/logging/LogCollector.java  |   6 +-
 ...LogCollectClient.java => LogConsumeClient.java} |   9 +-
 .../plugin/logging/entity/ShenyuRequestLog.java    |  42 +++
 .../logging/common/AbstractLogCollector.java       |  37 ++-
 .../logging/common/DefaultLogCollector.java}       |  20 +-
 .../plugin/logging/common/body/BodyWriter.java     | 107 ++++++
 .../common/body/LoggingServerHttpRequest.java      |  64 ++++
 .../common/body/LoggingServerHttpResponse.java     | 166 ++++++++++
 .../definition/LoggingAgentPluginDefinition.java   |   2 +
 .../handler/DefaultLoggingPluginHandler.java       |  76 +++++
 .../logging/common/sampler/CountingSampler.java    |  99 ++++++
 .../plugin/logging/common/sampler/Sampler.java     |  63 ++++
 .../common/utils/LogCollectConfigUtils.java        | 170 ++++++++++
 .../logging/common/utils/LogCollectUtils.java      |  73 +++++
 .../logging/rocketmq/RocketMQLogCollectClient.java |  36 +--
 .../boot/RocketMQAgentPluginBootService.java       |   6 +-
 .../handler/RocketMQGlobalPluginHandler.java       | 336 -------------------
 .../tracing/common/constant/TracingConstants.java  |   2 +
 .../jaeger/handler/JaegerGlobalPluginHandler.java  |   1 +
 .../handler/OpenTelemetryGlobalPluginHandler.java  |   1 +
 .../zipkin/handler/ZipkinGlobalPluginHandler.java  |   1 +
 .../conf/{logging-point.yaml => logging-meta.yaml} |  23 +-
 .../src/main/resources/conf/logging-point.yaml     |   2 +-
 24 files changed, 1299 insertions(+), 402 deletions(-)

diff --git 
a/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
 
b/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
new file mode 100644
index 0000000..e7e9ed9
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.api.config;
+
+import java.util.Map;
+
+/**
+ * log collect config.
+ */
+public class LogCollectConfig {
+
+    private LogFieldSwitchConfig logFieldSwitchConfig;
+
+    private GlobalLogConfig globalLogConfig;
+
+    private Map<String, LogApiConfig> logApiSwitchConfigMap;
+
+    /**
+     * get LogFieldSwitchConfig.
+     *
+     * @return LogFieldSwitchConfig
+     */
+    public LogFieldSwitchConfig getLogFieldSwitchConfig() {
+        return logFieldSwitchConfig;
+    }
+
+    /**
+     * set LogFieldSwitchConfig.
+     *
+     * @param logFieldSwitchConfig LogFieldSwitchConfig
+     */
+    public void setLogFieldSwitchConfig(final LogFieldSwitchConfig 
logFieldSwitchConfig) {
+        this.logFieldSwitchConfig = logFieldSwitchConfig;
+    }
+
+    /**
+     * get global log config.
+     *
+     * @return global log config
+     */
+    public GlobalLogConfig getGlobalLogConfig() {
+        return globalLogConfig;
+    }
+
+    /**
+     * set global log config.
+     *
+     * @param globalLogConfig global log config.
+     */
+    public void setGlobalLogConfig(final GlobalLogConfig globalLogConfig) {
+        this.globalLogConfig = globalLogConfig;
+    }
+
+    /**
+     * get log api switch config map.
+     *
+     * @return log api switch config map
+     */
+    public Map<String, LogApiConfig> getLogApiSwitchConfigMap() {
+        return logApiSwitchConfigMap;
+    }
+
+    /**
+     * set log api switch config.
+     *
+     * @param logApiSwitchConfigMap log api switch config
+     */
+    public void setLogApiSwitchConfigMap(final Map<String, LogApiConfig> 
logApiSwitchConfigMap) {
+        this.logApiSwitchConfigMap = logApiSwitchConfigMap;
+    }
+
+    /**
+     * global log config.
+     */
+    public static class GlobalLogConfig {
+        private String topic;
+
+        private String sampleRate = "1";
+
+        private boolean compress;
+
+        /**
+         * default 512KB.
+         */
+        private int maxResponseBody = 524288;
+
+        /**
+         * default 512kb.
+         */
+        private int maxRequestBody = 524288;
+
+        private int bufferQueueSize = 50000;
+
+        /**
+         * get sample rate.
+         *
+         * @return sample
+         */
+        public String getSampleRate() {
+            return sampleRate;
+        }
+
+        /**
+         * set sample rate.
+         *
+         * @param sampleRate rate
+         */
+        public void setSampleRate(final String sampleRate) {
+            this.sampleRate = sampleRate;
+        }
+
+        /**
+         * whether compress.
+         *
+         * @return compress or not
+         */
+        public boolean isCompress() {
+            return compress;
+        }
+
+        /**
+         * set compress.
+         *
+         * @param compress true: compress, false not compress
+         */
+        public void setCompress(final boolean compress) {
+            this.compress = compress;
+        }
+
+        /**
+         * get message queue topic.
+         *
+         * @return message queue topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * topic,used for message queue.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+
+        /**
+         * get max response body.
+         *
+         * @return get max response body
+         */
+        public int getMaxResponseBody() {
+            return maxResponseBody;
+        }
+
+        /**
+         * set max response body.
+         *
+         * @param maxResponseBody max response body
+         */
+        public void setMaxResponseBody(final int maxResponseBody) {
+            this.maxResponseBody = maxResponseBody;
+        }
+
+        /**
+         * get max request body.
+         *
+         * @return max request body
+         */
+        public int getMaxRequestBody() {
+            return maxRequestBody;
+        }
+
+        /**
+         * set max request body.
+         *
+         * @param maxRequestBody max request body
+         */
+        public void setMaxRequestBody(final int maxRequestBody) {
+            this.maxRequestBody = maxRequestBody;
+        }
+
+        /**
+         * get buffer queue size.
+         *
+         * @return buffer queue size
+         */
+        public int getBufferQueueSize() {
+            return bufferQueueSize;
+        }
+
+        /**
+         * set buffer queue size.
+         *
+         * @param bufferQueueSize buffer queue size
+         */
+        public void setBufferQueueSize(final int bufferQueueSize) {
+            this.bufferQueueSize = bufferQueueSize;
+        }
+    }
+
+    /**
+     * api log config.
+     */
+    public static class LogApiConfig {
+
+        /**
+         * 0 means never sample, 1 means always sample. Minimum probability is 
0.01, or 1% of logging
+         */
+        private String sampleRate;
+
+        /**
+         * This topic is useful if you use message queuing to collect logs.
+         */
+        private String topic;
+
+        /**
+         * get sample rate.
+         *
+         * @return sample rate
+         */
+        public String getSampleRate() {
+            return sampleRate;
+        }
+
+        /**
+         * set sample rate.
+         *
+         * @param sampleRate sample rate
+         */
+        public void setSampleRate(final String sampleRate) {
+            this.sampleRate = sampleRate;
+        }
+
+        /**
+         * get mq topic.
+         *
+         * @return mq topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * set  mq topic.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+    }
+
+
+    /**
+     * log field switch config, default value is true.
+     */
+    public static class LogFieldSwitchConfig {
+
+        private boolean requestHeader = true;
+
+        private boolean responseHeader = true;
+
+        private boolean requestBody = true;
+
+        private boolean responseBody = true;
+
+        /**
+         * whether to collect request header.
+         *
+         * @return true: collect , false:not collect
+         */
+        public boolean isRequestHeader() {
+            return requestHeader;
+        }
+
+
+        /**
+         * set  whether to collect request header.
+         *
+         * @param requestHeader whether collect
+         */
+        public void setRequestHeader(final boolean requestHeader) {
+            this.requestHeader = requestHeader;
+        }
+
+        /**
+         * whether collect response header.
+         *
+         * @return collect or not
+         */
+        public boolean isResponseHeader() {
+            return responseHeader;
+        }
+
+        /**
+         * whether collect response header.
+         *
+         * @param responseHeader boolean
+         */
+        public void setResponseHeader(final boolean responseHeader) {
+            this.responseHeader = responseHeader;
+        }
+
+
+        /**
+         * whether collect request body.
+         *
+         * @return collect or not
+         */
+        public boolean isRequestBody() {
+            return requestBody;
+        }
+
+        /**
+         * whether collect request body.
+         *
+         * @param requestBody boolean
+         */
+        public void setRequestBody(final boolean requestBody) {
+            this.requestBody = requestBody;
+        }
+
+        /**
+         * whether collect response body.
+         *
+         * @return collect or not
+         */
+        public boolean isResponseBody() {
+            return responseBody;
+        }
+
+        /**
+         * whether collect response body.
+         *
+         * @param responseBody boolean
+         */
+        public void setResponseBody(final boolean responseBody) {
+            this.responseBody = responseBody;
+        }
+
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
index 132dd08..4ff3238 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
@@ -17,8 +17,10 @@
 
 package org.apache.shenyu.agent.plugin.logging;
 
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+
 /**
- * Collect logs and save them.
+ * Collect logs and put into buffer queue.
  */
 public interface LogCollector extends AutoCloseable {
 
@@ -27,5 +29,5 @@ public interface LogCollector extends AutoCloseable {
      *
      * @param log access log
      */
-    void collect(Object log);
+    void collect(ShenyuRequestLog log);
 }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
similarity index 81%
rename from 
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
rename to 
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
index 5829d90..ac398c6 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
@@ -15,14 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.agent.plugin.logging.spi;
+package org.apache.shenyu.agent.plugin.logging;
+
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
 
 import java.util.List;
 
 /**
  * Used to collect logs, which can be stored in remote or local files or 
databases, or others.
  */
-public interface LogCollectClient extends AutoCloseable {
+
+public interface LogConsumeClient extends AutoCloseable {
 
 
     /**
@@ -31,6 +34,6 @@ public interface LogCollectClient extends AutoCloseable {
      * @param logs list of log
      * @throws Exception produce exception
      */
-    void collect(List<String> logs) throws Exception;
+    void consume(List<ShenyuRequestLog> logs) throws Exception;
 
 }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
index 8335eb0..ad18524 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
@@ -56,6 +56,13 @@ public class ShenyuRequestLog {
 
     private String module;
 
+    private String traceId;
+
+    /**
+     * path.
+     */
+    private String path;
+
     /**
      * get module.
      *
@@ -362,4 +369,39 @@ public class ShenyuRequestLog {
         this.upstreamResponseTime = upstreamResponseTime;
     }
 
+    /**
+     * get traceId.
+     *
+     * @return traceId
+     */
+    public String getTraceId() {
+        return traceId;
+    }
+
+    /**
+     * set traceId.
+     *
+     * @param traceId tracing id
+     */
+    public void setTraceId(final String traceId) {
+        this.traceId = traceId;
+    }
+
+    /**
+     * get request path.
+     *
+     * @return request path
+     */
+    public String getPath() {
+        return path;
+    }
+
+    /**
+     * request path.
+     *
+     * @param path request path
+     */
+    public void setPath(final String path) {
+        this.path = path;
+    }
 }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
index 1907e10..1128485 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
@@ -18,8 +18,9 @@
 package org.apache.shenyu.agent.plugin.logging.common;
 
 import org.apache.shenyu.agent.plugin.logging.LogCollector;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
-import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
 import org.apache.shenyu.common.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,36 +43,36 @@ public abstract class AbstractLogCollector implements 
LogCollector {
 
     private final ExecutorService threadExecutor = 
Executors.newSingleThreadExecutor();
 
-    private final int bufferSize = 50000;
+    private final int bufferSize;
 
     private final int batchSize = 100;
 
     private final int diffTimeMSForPush = 100;
 
-    private final BlockingQueue<String> bufferQueue = new 
LinkedBlockingDeque<>(bufferSize);
+    private final BlockingQueue<ShenyuRequestLog> bufferQueue;
 
     private long lastPushTime;
 
     private final AtomicBoolean started = new AtomicBoolean(true);
     
-    private final LogCollectClient logCollectClient;
+    private final LogConsumeClient logCollectClient;
 
-    public AbstractLogCollector(final LogCollectClient logCollectClient) {
+    public AbstractLogCollector(final LogConsumeClient logCollectClient) {
         this.logCollectClient = logCollectClient;
-        threadExecutor.execute(this::consume);
+        bufferSize = 
LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize();
+        bufferQueue = new LinkedBlockingDeque<>(bufferSize);
+        if (logCollectClient != null) {
+            threadExecutor.execute(this::consume);
+        }
     }
 
     @Override
-    public void collect(final Object log) {
-        if (log == null) {
+    public void collect(final ShenyuRequestLog log) {
+        if (log == null || logCollectClient == null) {
             return;
         }
         if (bufferQueue.size() < bufferSize) {
-            if (log instanceof String) {
-                bufferQueue.add((String) log);
-            } else {
-                bufferQueue.add(JsonUtils.toJson(log));
-            }
+            bufferQueue.add(log);
         }
     }
 
@@ -81,13 +82,13 @@ public abstract class AbstractLogCollector implements 
LogCollector {
     private void consume() {
         while (started.get()) {
             try {
-                List<String> logs = new ArrayList<>();
+                List<ShenyuRequestLog> logs = new ArrayList<>();
                 int size = bufferQueue.size();
                 long time = System.currentTimeMillis();
                 long timeDiffMs = time - lastPushTime;
                 if (size >= batchSize || timeDiffMs > diffTimeMSForPush) {
                     bufferQueue.drainTo(logs, batchSize);
-                    logCollectClient.collect(logs);
+                    logCollectClient.consume(logs);
                     lastPushTime = time;
                 } else {
                     ThreadUtils.sleep(TimeUnit.MILLISECONDS, 
diffTimeMSForPush);
@@ -102,7 +103,9 @@ public abstract class AbstractLogCollector implements 
LogCollector {
     @Override
     public void close() throws Exception {
         started.set(false);
-        logCollectClient.close();
+        if (logCollectClient != null) {
+            logCollectClient.close();
+        }
     }
     
 }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
similarity index 66%
rename from 
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
rename to 
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
index 9f56009..22acc3f 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
@@ -15,33 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.agent.plugin.logging.rocketmq;
+package org.apache.shenyu.agent.plugin.logging.common;
 
 import org.apache.shenyu.agent.plugin.logging.LogCollector;
-import org.apache.shenyu.agent.plugin.logging.common.AbstractLogCollector;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
 
 import java.util.Objects;
 
 /**
- * queue-based logging collector.
+ * default log collector,depend a LogConsumeClient for consume logs.
  */
-public class RocketMQLogCollector extends AbstractLogCollector {
+public class DefaultLogCollector extends AbstractLogCollector {
 
-    private static RocketMQLogCollector instance;
+    private static LogCollector instance;
 
-    public RocketMQLogCollector(final LogCollectClient logCollectClient) {
-        super(logCollectClient);
+    public DefaultLogCollector(final LogConsumeClient logConsumeClient) {
+        super(logConsumeClient);
         instance = this;
     }
 
     /**
-     * get RocketMQLogCollector instance.
+     * get LogCollector instance.
      *
-     * @return RocketMQLogCollector instance
+     * @return LogCollector instance
      */
     public static LogCollector getInstance() {
         return Objects.requireNonNull(instance);
     }
-
 }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
new file mode 100644
index 0000000..bdce3ff
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * bodyWriter is used to read Body.
+ */
+public class BodyWriter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BodyWriter.class);
+
+    private final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    private final WritableByteChannel channel = Channels.newChannel(stream);
+
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+    /**
+     * write ByteBuffer.
+     *
+     * @param buffer byte buffer
+     */
+    public void write(final ByteBuffer buffer) {
+        if (!isClosed.get()) {
+            try {
+                channel.write(buffer);
+            } catch (IOException e) {
+                isClosed.compareAndSet(false, true);
+                LOG.error("write buffer Failed.", e);
+            }
+        }
+    }
+
+    /**
+     * judge stream is empty.
+     *
+     * @return true: stream is empty
+     */
+    boolean isEmpty() {
+        return stream.size() == 0;
+    }
+
+    /**
+     * get stream size.
+     *
+     * @return size of stream
+     */
+    public int size() {
+        return stream.size();
+    }
+
+    /**
+     * output stream value.
+     *
+     * @return string of stream
+     */
+    public String output() {
+        if (size() == 0) {
+            return "";
+        }
+        try {
+            isClosed.compareAndSet(false, true);
+            return new String(stream.toByteArray(), StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            LOG.error("Write failed: ", e);
+            return "Write failed: " + e.getMessage();
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                LOG.error("Close stream error: ", e);
+            }
+            try {
+                channel.close();
+            } catch (IOException e) {
+                LOG.error("Close channel error: ", e);
+            }
+        }
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
new file mode 100644
index 0000000..4ca22c4
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
+import reactor.core.publisher.Flux;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpRequest for read body.
+ */
+public class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
+    private final ShenyuRequestLog logInfo;
+
+    public LoggingServerHttpRequest(final ServerHttpRequest delegate, final 
ShenyuRequestLog logInfo) {
+        super(delegate);
+        this.logInfo = logInfo;
+    }
+
+    /**
+     * get request body.
+     *
+     * @return Flux
+     */
+    @Override
+    @NonNull
+    public Flux<DataBuffer> getBody() {
+        BodyWriter writer = new BodyWriter();
+        boolean collectRequestBody = 
LogCollectConfigUtils.getLogFieldSwitchConfig().isRequestBody();
+        return super.getBody().doOnNext(dataBuffer -> {
+            if (LogCollectUtils.isNotBinaryType(getHeaders()) && 
collectRequestBody) {
+                writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
+            }
+        }).doFinally(signal -> {
+            int size = writer.size();
+            String body = writer.output();
+            boolean requestBodyTooLarge = 
LogCollectConfigUtils.isRequestBodyTooLarge(size);
+            if (size == 0 || requestBodyTooLarge) {
+                return;
+            }
+            logInfo.setRequestBody(body);
+        });
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
new file mode 100644
index 0000000..aced05e
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.agent.plugin.logging.LogCollector;
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.common.utils.DateUtils;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.annotation.NonNull;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Optional;
+
+/**
+ * decorate ServerHttpResponse for read body.
+ */
+public class LoggingServerHttpResponse extends ServerHttpResponseDecorator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingServerHttpResponse.class);
+
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    private static final String SHENYU_AGENT_TRACE_ID = 
"shenyu-agent-trace-id";
+
+    private final ShenyuRequestLog logInfo;
+
+    private ServerWebExchange exchange;
+
+    private final LogCollector logCollector;
+
+    public LoggingServerHttpResponse(final ServerHttpResponse delegate, final 
ShenyuRequestLog logInfo,
+                                     final LogCollector logCollector) {
+        super(delegate);
+        this.logInfo = logInfo;
+        this.logCollector = logCollector;
+    }
+
+    /**
+     * set relevant ServerWebExchange.
+     *
+     * @param exchange ServerWebExchange
+     */
+    public void setExchange(final ServerWebExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /**
+     * write with a publisher.
+     *
+     * @param body response body
+     * @return Mono
+     */
+    @Override
+    @NonNull
+    public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer> 
body) {
+        return super.writeWith(appendResponse(body));
+    }
+
+    /**
+     * append response.
+     *
+     * @param body publisher
+     * @return wrap Flux
+     */
+    @NonNull
+    private Flux<? extends DataBuffer> appendResponse(final Publisher<? 
extends DataBuffer> body) {
+        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+        assert shenyuContext != null;
+        if (getStatusCode() != null) {
+            logInfo.setStatus(getStatusCode().value());
+        }
+        
logInfo.setResponseHeader(LogCollectUtils.getResponseHeaders(getHeaders()));
+        BodyWriter writer = new BodyWriter();
+        String traceId = (String) 
exchange.getAttributes().get(SHENYU_AGENT_TRACE_ID);
+        logInfo.setTraceId(traceId);
+        boolean collectResponseBody = 
LogCollectConfigUtils.getLogFieldSwitchConfig().isResponseBody();
+        return Flux.from(body).doOnNext(buffer -> {
+            if (LogCollectUtils.isNotBinaryType(getHeaders()) && 
collectResponseBody) {
+                writer.write(buffer.asByteBuffer().asReadOnlyBuffer());
+            }
+        }).doFinally(signal -> logResponse(shenyuContext, writer));
+    }
+
+    /**
+     * record response log.
+     *
+     * @param shenyuContext request context
+     * @param writer        bodyWriter
+     */
+    private void logResponse(final ShenyuContext shenyuContext, final 
BodyWriter writer) {
+        if 
(StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
+            
logInfo.setResponseContentLength(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH));
+        } else {
+            logInfo.setResponseContentLength(writer.size() + "");
+        }
+        
logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+        long costTime = 
DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(), 
LocalDateTime.now());
+        logInfo.setUpstreamResponseTime(costTime);
+        if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+            logInfo.setRpcType(shenyuContext.getRpcType());
+            if (RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType())) 
{
+                URI uri = exchange.getAttribute(Constants.HTTP_URI);
+                if (uri != null) {
+                    logInfo.setUpstreamIp(uri.getHost());
+                } else {
+                    String domain = (String) 
exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+                    try {
+                        URL url = new URL(domain);
+                        logInfo.setUpstreamIp(url.getHost());
+                    } catch (Exception e) {
+                        LOG.error("get upstream ip error");
+                    }
+                }
+            } else {
+                Optional.ofNullable(exchange.getRequest().getRemoteAddress())
+                        .map(InetSocketAddress::getAddress)
+                        .ifPresent(v -> 
logInfo.setUpstreamIp(v.getHostAddress()));
+                logInfo.setMethod(shenyuContext.getMethod());
+            }
+        }
+        int size = writer.size();
+        String body = writer.output();
+        if (size == 0 || LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+            return;
+        }
+        logInfo.setResponseBody(body);
+        // collect log
+        if (logCollector != null) {
+            logCollector.collect(logInfo);
+        }
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
index 423eea0..ddfa0b9 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
@@ -23,6 +23,7 @@ import 
org.apache.shenyu.agent.api.spi.AbstractAgentPluginDefinition;
 import org.apache.shenyu.agent.core.builder.JoinPointBuilderFactory;
 import org.apache.shenyu.agent.core.locator.ShenyuAgentLocator;
 import org.apache.shenyu.agent.core.yaml.ShenyuYamlEngine;
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
 import org.apache.shenyu.spi.Join;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class LoggingAgentPluginDefinition extends 
AbstractAgentPluginDefinition
      */
     @Override
     protected Collection<JoinPointBuilder> joinPointBuilder() {
+        LogCollectConfigUtils.init();
         PointCutConfig config = null;
         try {
             config = 
ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("logging-point.yaml"),
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
new file mode 100644
index 0000000..e64e5fb
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.handler;
+
+import org.apache.shenyu.agent.api.entity.MethodResult;
+import org.apache.shenyu.agent.api.entity.TargetObject;
+import org.apache.shenyu.agent.api.handler.InstanceMethodHandler;
+import org.apache.shenyu.agent.plugin.logging.common.DefaultLogCollector;
+import 
org.apache.shenyu.agent.plugin.logging.common.body.LoggingServerHttpRequest;
+import 
org.apache.shenyu.agent.plugin.logging.common.body.LoggingServerHttpResponse;
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+
+import java.lang.reflect.Method;
+
+
+/**
+ * default logging plugin handler.
+ */
+public class DefaultLoggingPluginHandler implements InstanceMethodHandler {
+
+    private static final String USER_AGENT = "User-Agent";
+
+    private static final String HOST = "Host";
+
+    @Override
+    public Object after(final TargetObject target, final Method method, final 
Object[] args,
+                        final MethodResult methodResult) {
+        Object result = methodResult.getResult();
+        ServerWebExchange exchange = (ServerWebExchange) result;
+        ServerHttpRequest request = exchange.getRequest();
+
+        // control sampling
+        if (!LogCollectConfigUtils.isSampled(request)) {
+            return methodResult.getResult();
+        }
+
+        ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+        requestInfo.setRequestUri(request.getURI().toString());
+        requestInfo.setMethod(request.getMethodValue());
+        
requestInfo.setRequestHeader(LogCollectUtils.getRequestHeaders(request.getHeaders()));
+        requestInfo.setQueryParams(request.getURI().getQuery());
+        requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+        requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
+        requestInfo.setHost(request.getHeaders().getFirst(HOST));
+        requestInfo.setPath(request.getURI().getPath());
+
+        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
+        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
+                requestInfo, DefaultLogCollector.getInstance());
+        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
+                .response(loggingServerHttpResponse).build();
+        loggingServerHttpResponse.setExchange(webExchange);
+        return webExchange;
+    }
+
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
new file mode 100644
index 0000000..f5ca345
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.sampler;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+import java.util.BitSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * used for sample log.
+ */
+public class CountingSampler implements Sampler {
+
+    private final AtomicInteger counter;
+
+    private final BitSet sampleDecisions;
+
+    /**
+     * Fills a bitset with decisions according to the supplied probability.
+     */
+    CountingSampler(final float probability) {
+        this(probability, new Random());
+    }
+
+    /**
+     * Fills a bitset with decisions according to the probability using the 
supplied {@link Random}.
+     */
+    CountingSampler(final float probability, final Random random) {
+        counter = new AtomicInteger();
+        int outOf100 = (int) (probability * 100.0f);
+        this.sampleDecisions = randomBitSet(100, outOf100, random);
+    }
+
+    /**
+     * loops over the pre-canned decisions, resetting to zero when it gets to 
the end.
+     */
+    @Override
+    public boolean isSampled(final ServerHttpRequest request) {
+        return sampleDecisions.get(mod(counter.getAndIncrement(), 100));
+    }
+
+    /**
+     * Returns a non-negative mod.
+     */
+    private int mod(final int dividend, final int divisor) {
+        int result = dividend % divisor;
+        return result >= 0 ? result : divisor + result;
+    }
+
+    /**
+     * Reservoir sampling algorithm borrowed from Stack Overflow.
+     * 
http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+     */
+    /**
+     * Reservoir sampling algorithm borrowed from Stack Overflow.
+     * 
http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+     *
+     * @param size        bitmap size
+     * @param cardinality cardinality
+     * @param rnd         random
+     * @return bitSet
+     */
+    private BitSet randomBitSet(final int size, final int cardinality, final 
Random rnd) {
+        BitSet result = new BitSet(size);
+        int[] chosen = new int[cardinality];
+        int i;
+        for (i = 0; i < cardinality; ++i) {
+            chosen[i] = i;
+            result.set(i);
+        }
+        for (; i < size; ++i) {
+            int j = rnd.nextInt(i + 1);
+            if (j < cardinality) {
+                result.clear(chosen[j]);
+                result.set(i);
+                chosen[j] = i;
+            }
+        }
+        return result;
+    }
+
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
new file mode 100644
index 0000000..278d510
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.sampler;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * sampler interface.
+ */
+public interface Sampler {
+
+    Sampler ALWAYS_SAMPLE = request -> true;
+    Sampler NEVER_SAMPLE = request -> false;
+
+    /**
+     * loops over the pre-canned decisions, resetting to zero when it gets to 
the end.
+     *
+     * @param request request
+     * @return whether sample
+     */
+    boolean isSampled(ServerHttpRequest request);
+
+    /**
+     * create a sampler instance.
+     *
+     * @param probability probability
+     * @return sampler instance
+     */
+    static Sampler create(String probability) {
+        if (StringUtils.isBlank(probability)) {
+            return ALWAYS_SAMPLE;
+        }
+        if ("0".equals(probability)) {
+            return NEVER_SAMPLE;
+        }
+        if ("1".equals(probability) || "1.0".equals(probability) || 
"1.0.0".equals(probability)) {
+            return ALWAYS_SAMPLE;
+        }
+        float parseProbability = NumberUtils.toFloat(probability, 1);
+        if (parseProbability < 0.01f || parseProbability > 1) {
+            throw new IllegalArgumentException(
+                    "probability should be between 0.01 and 1: was " + 
probability);
+        }
+        return new CountingSampler(parseProbability);
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
new file mode 100644
index 0000000..d180cda
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.utils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.agent.api.config.LogCollectConfig;
+import org.apache.shenyu.agent.core.locator.ShenyuAgentLocator;
+import org.apache.shenyu.agent.core.yaml.ShenyuYamlEngine;
+import org.apache.shenyu.agent.plugin.logging.common.sampler.Sampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.util.AntPathMatcher;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * log collect config Utils.
+ */
+public final class LogCollectConfigUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogCollectConfigUtils.class);
+
+    private static final AntPathMatcher MATCHER = new AntPathMatcher();
+
+    private static LogCollectConfig logCollectConfig;
+
+    private static final LogCollectConfig.LogFieldSwitchConfig 
DEFAULT_LOG_FIELD_SWITCH_CONFIG =
+            new LogCollectConfig.LogFieldSwitchConfig();
+
+    private static final LogCollectConfig.GlobalLogConfig 
DEFAULT_GLOBAL_LOG_CONFIG =
+            new LogCollectConfig.GlobalLogConfig();
+
+    private static final Map<String, Sampler> API_SAMPLER_MAP = new 
HashMap<>();
+
+    private static Sampler globalSampler = Sampler.ALWAYS_SAMPLE;
+
+    private LogCollectConfigUtils() {
+    }
+
+    /**
+     * init logging config.
+     */
+    public static void init() {
+        try {
+            logCollectConfig = 
ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("logging-meta.yaml"),
+                    LogCollectConfig.class);
+            if 
(MapUtils.isNotEmpty(logCollectConfig.getLogApiSwitchConfigMap())) {
+                logCollectConfig.getLogApiSwitchConfigMap().forEach((path, 
config) -> {
+                    if (StringUtils.isBlank(config.getSampleRate())) {
+                        API_SAMPLER_MAP.put(path, globalSampler);
+                    } else {
+                        API_SAMPLER_MAP.put(path, 
Sampler.create(config.getSampleRate()));
+                    }
+                });
+            }
+            if (Objects.nonNull(logCollectConfig.getGlobalLogConfig())) {
+                globalSampler = 
Sampler.create(logCollectConfig.getGlobalLogConfig().getSampleRate());
+            }
+
+        } catch (Exception e) {
+            LOG.error("Exception loader logging meta data config error", e);
+        }
+    }
+
+    /**
+     * judge whether sample.
+     * @param request request
+     * @return whether sample
+     */
+    public static boolean isSampled(final ServerHttpRequest request) {
+        if (Objects.isNull(logCollectConfig) || 
MapUtils.isEmpty(logCollectConfig.getLogApiSwitchConfigMap())) {
+            return true;
+        }
+        String path = request.getURI().getPath();
+        Map<String, LogCollectConfig.LogApiConfig> apiConfigMap = 
logCollectConfig.getLogApiSwitchConfigMap();
+        for (Map.Entry<String, LogCollectConfig.LogApiConfig> entry : 
apiConfigMap.entrySet()) {
+            String pattern = entry.getKey();
+            if (MATCHER.match(pattern, path)) {
+                return Optional.ofNullable(API_SAMPLER_MAP.get(pattern))
+                        .map(sampler -> sampler.isSampled(request))
+                        .orElse(globalSampler.isSampled(request));
+            }
+        }
+        return true;
+    }
+
+    /**
+     * judge whether request body too large.
+     * @param bodySize body size
+     * @return whether request body too large
+     */
+    public static boolean isRequestBodyTooLarge(final int bodySize) {
+        if (Objects.isNull(logCollectConfig) || 
Objects.isNull(logCollectConfig.getGlobalLogConfig())) {
+            return false;
+        }
+        return bodySize > 
logCollectConfig.getGlobalLogConfig().getMaxRequestBody();
+    }
+
+    /**
+     * judge whether response body too large.
+     * @param bodySize body size.
+     * @return whether response body too large
+     */
+    public static boolean isResponseBodyTooLarge(final int bodySize) {
+        if (Objects.isNull(logCollectConfig) || 
Objects.isNull(logCollectConfig.getGlobalLogConfig())) {
+            return false;
+        }
+        return bodySize > 
logCollectConfig.getGlobalLogConfig().getMaxResponseBody();
+    }
+
+    /**
+     * get log field switch config.
+     * @return  log field switch config
+     */
+    public static LogCollectConfig.LogFieldSwitchConfig 
getLogFieldSwitchConfig() {
+        return 
Optional.ofNullable(logCollectConfig).map(LogCollectConfig::getLogFieldSwitchConfig)
+                .orElse(DEFAULT_LOG_FIELD_SWITCH_CONFIG);
+    }
+
+    /**
+     * get global log config.
+     * @return global log config
+     */
+    public static LogCollectConfig.GlobalLogConfig getGlobalLogConfig() {
+        return 
Optional.ofNullable(logCollectConfig).map(LogCollectConfig::getGlobalLogConfig)
+                .orElse(DEFAULT_GLOBAL_LOG_CONFIG);
+    }
+
+    /**
+     * get message queue topic.
+     * @param path request path
+     * @return topic
+     */
+    public static String getTopic(final String path) {
+        if (StringUtils.isBlank(path)) {
+            return "";
+        }
+        Map<String, LogCollectConfig.LogApiConfig> apiConfigMap = 
logCollectConfig.getLogApiSwitchConfigMap();
+        if (MapUtils.isEmpty(apiConfigMap)) {
+            return "";
+        }
+        for (Map.Entry<String, LogCollectConfig.LogApiConfig> entry : 
apiConfigMap.entrySet()) {
+            String pattern = entry.getKey().replace("[", "").replace("]", "");
+            if (MATCHER.match(pattern, path)) {
+                return entry.getValue().getTopic();
+            }
+        }
+        return "";
+    }
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
new file mode 100644
index 0000000..0bd600e
--- /dev/null
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.utils;
+
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.springframework.http.HttpHeaders;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * log collect utils.
+ */
+public class LogCollectUtils {
+
+    private static final List<String> BINARY_TYPE_LIST = 
Arrays.asList("image", "multipart", "cbor",
+            "octet-stream", "pdf", "javascript", "css", "html");
+
+    /**
+     * judge whether is binary type.
+     * @param headers request or response header
+     * @return whether binary type
+     */
+    public static boolean isNotBinaryType(final HttpHeaders headers) {
+        return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
+                .map(contentType -> 
!BINARY_TYPE_LIST.contains(contentType.getType())
+                        && 
!BINARY_TYPE_LIST.contains(contentType.getSubtype()))
+                .orElse(true);
+    }
+
+    /**
+     * get request header string.
+     * @param headers request headers
+     * @return header string
+     */
+    public static String getRequestHeaders(final HttpHeaders headers) {
+        boolean requestHeader = 
LogCollectConfigUtils.getLogFieldSwitchConfig().isRequestHeader();
+        if (!requestHeader) {
+            return null;
+        }
+        return JsonUtils.toJson(headers.entrySet());
+    }
+
+    /**
+     * get response header string.
+     * @param headers  response headers
+     * @return response headers
+     */
+    public static String getResponseHeaders(final HttpHeaders headers) {
+        boolean responseHeader = 
LogCollectConfigUtils.getLogFieldSwitchConfig().isResponseHeader();
+        if (!responseHeader) {
+            return null;
+        }
+        return JsonUtils.toJson(headers.entrySet());
+    }
+
+}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
index 293121f..d83a6cd 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
@@ -20,12 +20,13 @@ package org.apache.shenyu.agent.plugin.logging.rocketmq;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.shenyu.agent.api.config.AgentPluginConfig;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
+import 
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
 import org.apache.shenyu.agent.plugin.logging.constant.LoggingConstant;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.common.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,17 +34,16 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * queue-based logging collector.
  */
-public class RocketMQLogCollectClient implements LogCollectClient {
-    
+public class RocketMQLogCollectClient implements LogConsumeClient {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQLogCollectClient.class);
-    
+
     private DefaultMQProducer producer;
-    
+
     private String topic;
 
     public RocketMQLogCollectClient(final AgentPluginConfig agentPluginConfig) 
{
@@ -85,25 +85,21 @@ public class RocketMQLogCollectClient implements 
LogCollectClient {
      * @throws Exception produce exception
      */
     @Override
-    public void collect(final List<String> logs) throws Exception {
+    public void consume(final List<ShenyuRequestLog> logs) throws Exception {
         if (CollectionUtils.isEmpty(logs)) {
             return;
         }
-        List<Message> messageList = logs.stream().map(log -> new 
Message(topic, log.getBytes(StandardCharsets.UTF_8)))
-                .collect(Collectors.toList());
-        producer.send(messageList, new SendCallback() {
-            @Override
-            public void onSuccess(final SendResult sendResult) {
-                LOG.info("rocketmq push logs success");
-            }
-
-            @Override
-            public void onException(final Throwable e) {
+        logs.stream().forEach(log -> {
+            String logTopic = 
StringUtils.defaultIfBlank(LogCollectConfigUtils.getTopic(log.getPath()), 
topic);
+            Message message = new Message(logTopic, 
JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8));
+            try {
+                producer.sendOneway(message);
+            } catch (Exception e) {
                 LOG.error("rocketmq push logs error", e);
             }
         });
     }
-    
+
     /**
      * close producer.
      *
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
index 391de38..707b493 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
@@ -19,8 +19,8 @@ package org.apache.shenyu.agent.plugin.logging.rocketmq.boot;
 
 import org.apache.shenyu.agent.api.config.AgentPluginConfig;
 import org.apache.shenyu.agent.api.spi.AgentPluginBootService;
+import org.apache.shenyu.agent.plugin.logging.common.DefaultLogCollector;
 import 
org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollectClient;
-import org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollector;
 import org.apache.shenyu.common.utils.JsonUtils;
 import org.apache.shenyu.spi.Join;
 import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class RocketMQAgentPluginBootService implements 
AgentPluginBootService {
     public void start(final AgentPluginConfig agentPluginConfig) {
         LOG.info("start RocketMQAgentPluginBootService, config:{}", 
JsonUtils.toJson(agentPluginConfig));
         RocketMQLogCollectClient client = new 
RocketMQLogCollectClient(agentPluginConfig);
-        new RocketMQLogCollector(client);
+        new DefaultLogCollector(client);
     }
 
     /**
@@ -50,7 +50,7 @@ public class RocketMQAgentPluginBootService implements 
AgentPluginBootService {
     @Override
     public void close() {
         try {
-            RocketMQLogCollector.getInstance().close();
+            DefaultLogCollector.getInstance().close();
         } catch (Exception e) {
             LOG.error("close RocketMQLogCollector error", e);
         }
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
deleted file mode 100644
index bcd9df6..0000000
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.agent.plugin.logging.rocketmq.handler;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.agent.api.entity.MethodResult;
-import org.apache.shenyu.agent.api.entity.TargetObject;
-import org.apache.shenyu.agent.api.handler.InstanceMethodHandler;
-import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
-import org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollector;
-import org.apache.shenyu.common.constant.Constants;
-import org.apache.shenyu.common.enums.RpcTypeEnum;
-import org.apache.shenyu.common.utils.DateUtils;
-import org.apache.shenyu.plugin.api.context.ShenyuContext;
-import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
-import org.springframework.http.server.reactive.ServerHttpResponse;
-import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.annotation.NonNull;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Method interceptor is used to  collect log.
- */
-public class RocketMQGlobalPluginHandler implements InstanceMethodHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQGlobalPluginHandler.class);
-
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
-
-    private static final List<String> BINARY_TYPE_LIST = 
Arrays.asList("image", "multipart", "cbor",
-            "octet-stream", "pdf");
-
-    @Override
-    public Object after(final TargetObject target, final Method method, final 
Object[] args,
-                        final MethodResult methodResult) {
-        Object result = methodResult.getResult();
-        ServerWebExchange exchange = (ServerWebExchange) result;
-        ServerHttpRequest request = exchange.getRequest();
-        ServerHttpResponse response = exchange.getResponse();
-
-        ShenyuRequestLog requestInfo = new ShenyuRequestLog();
-        requestInfo.setRequestUri(request.getURI().toString());
-        requestInfo.setMethod(request.getMethodValue());
-        requestInfo.setRequestHeader(getHeaders(request.getHeaders()));
-        requestInfo.setQueryParams(request.getURI().getQuery());
-        requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
-        requestInfo.setUserAgent(request.getHeaders().getFirst("User-Agent"));
-        requestInfo.setHost(request.getHeaders().getFirst("Host"));
-
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(response, requestInfo);
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return webExchange;
-    }
-    
-
-    /**
-     * getHeader string.
-     *
-     * @return header
-     */
-    private static String getHeaders(final HttpHeaders headers) {
-        StringBuilder sb = new StringBuilder();
-        Set<Map.Entry<String, List<String>>> entrySet = headers.entrySet();
-        entrySet.forEach(entry -> {
-            String key = entry.getKey();
-            List<String> value = entry.getValue();
-            sb.append(key).append(": ").append(StringUtils.join(value, 
",")).append(System.lineSeparator());
-        });
-        return sb.toString();
-    }
-
-    /**
-     * judge this header is binary type.
-     *
-     * @param headers httpHeaders
-     * @return true: is not binary type
-     */
-    private static boolean isNotBinaryType(final HttpHeaders headers) {
-        return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
-                .map(contentType -> 
!BINARY_TYPE_LIST.contains(contentType.getType())
-                        && 
!BINARY_TYPE_LIST.contains(contentType.getSubtype()))
-                .orElse(true);
-    }
-
-
-    /**
-     * wrap ServerHttpRequest.
-     */
-    static class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
-        private final ShenyuRequestLog logInfo;
-
-        LoggingServerHttpRequest(final ServerHttpRequest delegate, final 
ShenyuRequestLog logInfo) {
-            super(delegate);
-            this.logInfo = logInfo;
-        }
-
-        /**
-         * get request body.
-         *
-         * @return Flux
-         */
-        @Override
-        @NonNull
-        public Flux<DataBuffer> getBody() {
-            BodyWriter writer = new BodyWriter();
-            return super.getBody().doOnNext(dataBuffer -> 
writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer()))
-                    .doFinally(signal -> {
-                        if (!writer.isEmpty() && 
isNotBinaryType(getHeaders())) {
-                            logInfo.setRequestBody(writer.output());
-                        } else {
-                            writer.output();
-                        }
-                    });
-        }
-    }
-
-    /**
-     * wrap serverHttpResponse.
-     */
-    static class LoggingServerHttpResponse extends ServerHttpResponseDecorator 
{
-
-        private final ShenyuRequestLog logInfo;
-
-        private ServerWebExchange exchange;
-
-        LoggingServerHttpResponse(final ServerHttpResponse delegate, final 
ShenyuRequestLog logInfo) {
-            super(delegate);
-            this.logInfo = logInfo;
-        }
-
-        /**
-         * set relevant ServerWebExchange.
-         *
-         * @param exchange ServerWebExchange
-         */
-        public void setExchange(final ServerWebExchange exchange) {
-            this.exchange = exchange;
-        }
-
-        /**
-         * write with a publisher.
-         *
-         * @param body response body
-         * @return Mono
-         */
-        @Override
-        @NonNull
-        public Mono<Void> writeWith(@NonNull final Publisher<? extends 
DataBuffer> body) {
-            return super.writeWith(appendResponse(body));
-        }
-
-        /**
-         * append response.
-         *
-         * @param body publisher
-         * @return wrap Flux
-         */
-        @NonNull
-        private Flux<? extends DataBuffer> appendResponse(final Publisher<? 
extends DataBuffer> body) {
-            ShenyuContext shenyuContext = 
exchange.getAttribute(Constants.CONTEXT);
-            assert shenyuContext != null;
-            if (getStatusCode() != null) {
-                logInfo.setStatus(getStatusCode().value());
-            }
-            
logInfo.setResponseHeader(RocketMQGlobalPluginHandler.getHeaders(getHeaders()));
-            BodyWriter writer = new BodyWriter();
-            return Flux.from(body).doOnNext(buffer -> 
writer.write(buffer.asByteBuffer().asReadOnlyBuffer()))
-                    .doFinally(signal -> logResponse(shenyuContext, writer));
-        }
-
-        /**
-         * record response log.
-         *
-         * @param shenyuContext request context
-         * @param writer        bodyWriter
-         */
-        private void logResponse(final ShenyuContext shenyuContext, final 
BodyWriter writer) {
-            if 
(StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
-                
logInfo.setResponseContentLength(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH));
-            } else {
-                logInfo.setResponseContentLength(writer.size() + "");
-            }
-            
logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
-            long costTime = 
DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(),
-                    LocalDateTime.now());
-            logInfo.setUpstreamResponseTime(costTime);
-            if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
-                logInfo.setRpcType(shenyuContext.getRpcType());
-                if 
(RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType())) {
-                    URI uri = exchange.getAttribute(Constants.HTTP_URI);
-                    if (uri != null) {
-                        logInfo.setUpstreamIp(uri.getHost());
-                    } else {
-                        String domain = (String) 
exchange.getAttributes().get(Constants.HTTP_DOMAIN);
-                        try {
-                            URL url = new URL(domain);
-                            logInfo.setUpstreamIp(url.getHost());
-                        } catch (Exception e) {
-                            LOG.error("get upstream ip error");
-                        }
-                    }
-                } else {
-                    
Optional.ofNullable(exchange.getRequest().getRemoteAddress())
-                            .map(InetSocketAddress::getAddress)
-                            .ifPresent(v -> 
logInfo.setUpstreamIp(v.getHostAddress()));
-                    logInfo.setMethod(shenyuContext.getMethod());
-                }
-            }
-            String body = writer.output();
-            if (isNotBinaryType(getHeaders())) {
-                logInfo.setResponseBody(body);
-            }
-            // collect log
-            RocketMQLogCollector.getInstance().collect(logInfo);
-        }
-    }
-
-    /**
-     * bodyWriter is used to read Body.
-     */
-    static class BodyWriter {
-
-        private final ByteArrayOutputStream stream = new 
ByteArrayOutputStream();
-
-        private final WritableByteChannel channel = 
Channels.newChannel(stream);
-
-        private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
-        /**
-         * write ByteBuffer.
-         *
-         * @param buffer byte buffer
-         */
-        void write(final ByteBuffer buffer) {
-            if (!isClosed.get()) {
-                try {
-                    channel.write(buffer);
-                } catch (IOException e) {
-                    isClosed.compareAndSet(false, true);
-                    LOG.error("Parse Failed.", e);
-                }
-            }
-        }
-
-        /**
-         * judge stream is empty.
-         *
-         * @return true: stream is empty
-         */
-        boolean isEmpty() {
-            return stream.size() == 0;
-        }
-
-        /**
-         * get stream size.
-         *
-         * @return size of stream
-         */
-        int size() {
-            return stream.size();
-        }
-
-        /**
-         * output stream value.
-         *
-         * @return string of stream
-         */
-        String output() {
-            try {
-                isClosed.compareAndSet(false, true);
-                return new String(stream.toByteArray(), 
StandardCharsets.UTF_8);
-            } catch (Exception e) {
-                LOG.error("Write failed: ", e);
-                return "Write failed: " + e.getMessage();
-            } finally {
-                try {
-                    stream.close();
-                } catch (IOException e) {
-                    LOG.error("Close stream error: ", e);
-                }
-                try {
-                    channel.close();
-                } catch (IOException e) {
-                    LOG.error("Close channel error: ", e);
-                }
-            }
-        }
-    }
-}
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
index b09464e..7267be7 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
@@ -44,6 +44,8 @@ public final class TracingConstants {
 
     public static final String SHENYU_AGENT_TRACE_ZIPKIN = 
"shenyu-agent-trace-zipkin";
 
+    public static final String SHENYU_AGENT_TRACE_ID = "shenyu-agent-trace-id";
+
 
     /**
      * The type Error log tags.
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
index 32a57b6..9b45cc7 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
@@ -50,6 +50,7 @@ public final class JaegerGlobalPluginHandler implements 
InstanceMethodHandler {
 
         Span span = jaegerSpanManager.add(TracingConstants.ROOT_SPAN, tagMap);
         
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_JAEGER, 
jaegerSpanManager);
+        exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID, 
span.context().toTraceId());
         target.setContext(span);
     }
 
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
index 934a4df..f39fe5b 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
@@ -51,6 +51,7 @@ public final class OpenTelemetryGlobalPluginHandler 
implements InstanceMethodHan
 
         Span span = spanManager.startAndRecord(TracingConstants.ROOT_SPAN, 
attributesMap);
         
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_OPENTELEMETRY, 
spanManager);
+        exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID, 
span.getSpanContext().getTraceId());
         target.setContext(span);
     }
 
diff --git 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
index f745b01..549f035 100644
--- 
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
+++ 
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
@@ -50,6 +50,7 @@ public final class ZipkinGlobalPluginHandler implements 
InstanceMethodHandler {
 
         Span span = zipkinSpanManager.start(TracingConstants.ROOT_SPAN, 
tagMap);
         
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ZIPKIN, 
zipkinSpanManager);
+        exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID, 
span.context().traceIdString());
         target.setContext(span);
     }
 
diff --git 
a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml 
b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
similarity index 73%
copy from 
shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
copy to shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
index 698f2b3..a0ebaf2 100644
--- a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
+++ b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
@@ -15,12 +15,17 @@
 # limitations under the License.
 #
 
-pointCuts:
-  - targetClass: org.springframework.web.server.adapter.HttpWebHandlerAdapter
-    points:
-      - type: instanceMethod
-        name: createExchange
-    handlers:
-      rocketmq:
-        - 
org.apache.shenyu.agent.plugin.logging.rocketmq.handler.RocketMQGlobalPluginHandler
-    
\ No newline at end of file
+logFieldSwitchConfig:
+  requestHeader: true
+  responseHeader: true
+  requestBody: true
+  responseBody: true
+
+globalLogConfig:
+  sampleRate: "1"
+  bufferQueueSize: 50000
+
+logApiSwitchConfigMap:
+  "[/http/test/**]":
+    sampleRate: "0.5"
+    topic: "shenyu-agent-logging-test"
diff --git 
a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml 
b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
index 698f2b3..ed3c3a5 100644
--- a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
+++ b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
@@ -22,5 +22,5 @@ pointCuts:
         name: createExchange
     handlers:
       rocketmq:
-        - 
org.apache.shenyu.agent.plugin.logging.rocketmq.handler.RocketMQGlobalPluginHandler
+        - 
org.apache.shenyu.agent.plugin.logging.common.handler.DefaultLoggingPluginHandler
     
\ No newline at end of file

Reply via email to