This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 10f680f Optimize the configurable Shenyu agent log collection #2907
(#3013)
10f680f is described below
commit 10f680fd4a6e2b0647340a2d294ed7fec57fee6c
Author: hutaishi <[email protected]>
AuthorDate: Thu Mar 10 15:25:50 2022 +0800
Optimize the configurable Shenyu agent log collection #2907 (#3013)
---
.../shenyu/agent/api/config/LogCollectConfig.java | 359 +++++++++++++++++++++
.../shenyu/agent/plugin/logging/LogCollector.java | 6 +-
...LogCollectClient.java => LogConsumeClient.java} | 9 +-
.../plugin/logging/entity/ShenyuRequestLog.java | 42 +++
.../logging/common/AbstractLogCollector.java | 37 ++-
.../logging/common/DefaultLogCollector.java} | 20 +-
.../plugin/logging/common/body/BodyWriter.java | 107 ++++++
.../common/body/LoggingServerHttpRequest.java | 64 ++++
.../common/body/LoggingServerHttpResponse.java | 166 ++++++++++
.../definition/LoggingAgentPluginDefinition.java | 2 +
.../handler/DefaultLoggingPluginHandler.java | 76 +++++
.../logging/common/sampler/CountingSampler.java | 99 ++++++
.../plugin/logging/common/sampler/Sampler.java | 63 ++++
.../common/utils/LogCollectConfigUtils.java | 170 ++++++++++
.../logging/common/utils/LogCollectUtils.java | 73 +++++
.../logging/rocketmq/RocketMQLogCollectClient.java | 36 +--
.../boot/RocketMQAgentPluginBootService.java | 6 +-
.../handler/RocketMQGlobalPluginHandler.java | 336 -------------------
.../tracing/common/constant/TracingConstants.java | 2 +
.../jaeger/handler/JaegerGlobalPluginHandler.java | 1 +
.../handler/OpenTelemetryGlobalPluginHandler.java | 1 +
.../zipkin/handler/ZipkinGlobalPluginHandler.java | 1 +
.../conf/{logging-point.yaml => logging-meta.yaml} | 23 +-
.../src/main/resources/conf/logging-point.yaml | 2 +-
24 files changed, 1299 insertions(+), 402 deletions(-)
diff --git
a/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
b/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
new file mode 100644
index 0000000..e7e9ed9
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-api/src/main/java/org/apache/shenyu/agent/api/config/LogCollectConfig.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.api.config;
+
+import java.util.Map;
+
+/**
+ * log collect config.
+ */
+public class LogCollectConfig {
+
+ private LogFieldSwitchConfig logFieldSwitchConfig;
+
+ private GlobalLogConfig globalLogConfig;
+
+ private Map<String, LogApiConfig> logApiSwitchConfigMap;
+
+ /**
+ * get LogFieldSwitchConfig.
+ *
+ * @return LogFieldSwitchConfig
+ */
+ public LogFieldSwitchConfig getLogFieldSwitchConfig() {
+ return logFieldSwitchConfig;
+ }
+
+ /**
+ * set LogFieldSwitchConfig.
+ *
+ * @param logFieldSwitchConfig LogFieldSwitchConfig
+ */
+ public void setLogFieldSwitchConfig(final LogFieldSwitchConfig
logFieldSwitchConfig) {
+ this.logFieldSwitchConfig = logFieldSwitchConfig;
+ }
+
+ /**
+ * get global log config.
+ *
+ * @return global log config
+ */
+ public GlobalLogConfig getGlobalLogConfig() {
+ return globalLogConfig;
+ }
+
+ /**
+ * set global log config.
+ *
+ * @param globalLogConfig global log config.
+ */
+ public void setGlobalLogConfig(final GlobalLogConfig globalLogConfig) {
+ this.globalLogConfig = globalLogConfig;
+ }
+
+ /**
+ * get log api switch config map.
+ *
+ * @return log api switch config map
+ */
+ public Map<String, LogApiConfig> getLogApiSwitchConfigMap() {
+ return logApiSwitchConfigMap;
+ }
+
+ /**
+ * set log api switch config.
+ *
+ * @param logApiSwitchConfigMap log api switch config
+ */
+ public void setLogApiSwitchConfigMap(final Map<String, LogApiConfig>
logApiSwitchConfigMap) {
+ this.logApiSwitchConfigMap = logApiSwitchConfigMap;
+ }
+
+ /**
+ * global log config.
+ */
+ public static class GlobalLogConfig {
+ private String topic;
+
+ private String sampleRate = "1";
+
+ private boolean compress;
+
+ /**
+ * default 512KB.
+ */
+ private int maxResponseBody = 524288;
+
+ /**
+ * default 512kb.
+ */
+ private int maxRequestBody = 524288;
+
+ private int bufferQueueSize = 50000;
+
+ /**
+ * get sample rate.
+ *
+ * @return sample
+ */
+ public String getSampleRate() {
+ return sampleRate;
+ }
+
+ /**
+ * set sample rate.
+ *
+ * @param sampleRate rate
+ */
+ public void setSampleRate(final String sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ /**
+ * whether compress.
+ *
+ * @return compress or not
+ */
+ public boolean isCompress() {
+ return compress;
+ }
+
+ /**
+ * set compress.
+ *
+ * @param compress true: compress, false not compress
+ */
+ public void setCompress(final boolean compress) {
+ this.compress = compress;
+ }
+
+ /**
+ * get message queue topic.
+ *
+ * @return message queue topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * topic,used for message queue.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * get max response body.
+ *
+ * @return get max response body
+ */
+ public int getMaxResponseBody() {
+ return maxResponseBody;
+ }
+
+ /**
+ * set max response body.
+ *
+ * @param maxResponseBody max response body
+ */
+ public void setMaxResponseBody(final int maxResponseBody) {
+ this.maxResponseBody = maxResponseBody;
+ }
+
+ /**
+ * get max request body.
+ *
+ * @return max request body
+ */
+ public int getMaxRequestBody() {
+ return maxRequestBody;
+ }
+
+ /**
+ * set max request body.
+ *
+ * @param maxRequestBody max request body
+ */
+ public void setMaxRequestBody(final int maxRequestBody) {
+ this.maxRequestBody = maxRequestBody;
+ }
+
+ /**
+ * get buffer queue size.
+ *
+ * @return buffer queue size
+ */
+ public int getBufferQueueSize() {
+ return bufferQueueSize;
+ }
+
+ /**
+ * set buffer queue size.
+ *
+ * @param bufferQueueSize buffer queue size
+ */
+ public void setBufferQueueSize(final int bufferQueueSize) {
+ this.bufferQueueSize = bufferQueueSize;
+ }
+ }
+
+ /**
+ * api log config.
+ */
+ public static class LogApiConfig {
+
+ /**
+ * 0 means never sample, 1 means always sample. Minimum probability is
0.01, or 1% of logging
+ */
+ private String sampleRate;
+
+ /**
+ * This topic is useful if you use message queuing to collect logs.
+ */
+ private String topic;
+
+ /**
+ * get sample rate.
+ *
+ * @return sample rate
+ */
+ public String getSampleRate() {
+ return sampleRate;
+ }
+
+ /**
+ * set sample rate.
+ *
+ * @param sampleRate sample rate
+ */
+ public void setSampleRate(final String sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ /**
+ * get mq topic.
+ *
+ * @return mq topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * set mq topic.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+ }
+
+
+ /**
+ * log field switch config, default value is true.
+ */
+ public static class LogFieldSwitchConfig {
+
+ private boolean requestHeader = true;
+
+ private boolean responseHeader = true;
+
+ private boolean requestBody = true;
+
+ private boolean responseBody = true;
+
+ /**
+ * whether to collect request header.
+ *
+ * @return true: collect , false:not collect
+ */
+ public boolean isRequestHeader() {
+ return requestHeader;
+ }
+
+
+ /**
+ * set whether to collect request header.
+ *
+ * @param requestHeader whether collect
+ */
+ public void setRequestHeader(final boolean requestHeader) {
+ this.requestHeader = requestHeader;
+ }
+
+ /**
+ * whether collect response header.
+ *
+ * @return collect or not
+ */
+ public boolean isResponseHeader() {
+ return responseHeader;
+ }
+
+ /**
+ * whether collect response header.
+ *
+ * @param responseHeader boolean
+ */
+ public void setResponseHeader(final boolean responseHeader) {
+ this.responseHeader = responseHeader;
+ }
+
+
+ /**
+ * whether collect request body.
+ *
+ * @return collect or not
+ */
+ public boolean isRequestBody() {
+ return requestBody;
+ }
+
+ /**
+ * whether collect request body.
+ *
+ * @param requestBody boolean
+ */
+ public void setRequestBody(final boolean requestBody) {
+ this.requestBody = requestBody;
+ }
+
+ /**
+ * whether collect response body.
+ *
+ * @return collect or not
+ */
+ public boolean isResponseBody() {
+ return responseBody;
+ }
+
+ /**
+ * whether collect response body.
+ *
+ * @param responseBody boolean
+ */
+ public void setResponseBody(final boolean responseBody) {
+ this.responseBody = responseBody;
+ }
+
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
index 132dd08..4ff3238 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogCollector.java
@@ -17,8 +17,10 @@
package org.apache.shenyu.agent.plugin.logging;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+
/**
- * Collect logs and save them.
+ * Collect logs and put into buffer queue.
*/
public interface LogCollector extends AutoCloseable {
@@ -27,5 +29,5 @@ public interface LogCollector extends AutoCloseable {
*
* @param log access log
*/
- void collect(Object log);
+ void collect(ShenyuRequestLog log);
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
similarity index 81%
rename from
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
rename to
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
index 5829d90..ac398c6 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/spi/LogCollectClient.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/LogConsumeClient.java
@@ -15,14 +15,17 @@
* limitations under the License.
*/
-package org.apache.shenyu.agent.plugin.logging.spi;
+package org.apache.shenyu.agent.plugin.logging;
+
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
import java.util.List;
/**
* Used to collect logs, which can be stored in remote or local files or
databases, or others.
*/
-public interface LogCollectClient extends AutoCloseable {
+
+public interface LogConsumeClient extends AutoCloseable {
/**
@@ -31,6 +34,6 @@ public interface LogCollectClient extends AutoCloseable {
* @param logs list of log
* @throws Exception produce exception
*/
- void collect(List<String> logs) throws Exception;
+ void consume(List<ShenyuRequestLog> logs) throws Exception;
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
index 8335eb0..ad18524 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-api/src/main/java/org/apache/shenyu/agent/plugin/logging/entity/ShenyuRequestLog.java
@@ -56,6 +56,13 @@ public class ShenyuRequestLog {
private String module;
+ private String traceId;
+
+ /**
+ * path.
+ */
+ private String path;
+
/**
* get module.
*
@@ -362,4 +369,39 @@ public class ShenyuRequestLog {
this.upstreamResponseTime = upstreamResponseTime;
}
+ /**
+ * get traceId.
+ *
+ * @return traceId
+ */
+ public String getTraceId() {
+ return traceId;
+ }
+
+ /**
+ * set traceId.
+ *
+ * @param traceId tracing id
+ */
+ public void setTraceId(final String traceId) {
+ this.traceId = traceId;
+ }
+
+ /**
+ * get request path.
+ *
+ * @return request path
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * request path.
+ *
+ * @param path request path
+ */
+ public void setPath(final String path) {
+ this.path = path;
+ }
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
index 1907e10..1128485 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/AbstractLogCollector.java
@@ -18,8 +18,9 @@
package org.apache.shenyu.agent.plugin.logging.common;
import org.apache.shenyu.agent.plugin.logging.LogCollector;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
-import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
import org.apache.shenyu.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,36 +43,36 @@ public abstract class AbstractLogCollector implements
LogCollector {
private final ExecutorService threadExecutor =
Executors.newSingleThreadExecutor();
- private final int bufferSize = 50000;
+ private final int bufferSize;
private final int batchSize = 100;
private final int diffTimeMSForPush = 100;
- private final BlockingQueue<String> bufferQueue = new
LinkedBlockingDeque<>(bufferSize);
+ private final BlockingQueue<ShenyuRequestLog> bufferQueue;
private long lastPushTime;
private final AtomicBoolean started = new AtomicBoolean(true);
- private final LogCollectClient logCollectClient;
+ private final LogConsumeClient logCollectClient;
- public AbstractLogCollector(final LogCollectClient logCollectClient) {
+ public AbstractLogCollector(final LogConsumeClient logCollectClient) {
this.logCollectClient = logCollectClient;
- threadExecutor.execute(this::consume);
+ bufferSize =
LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize();
+ bufferQueue = new LinkedBlockingDeque<>(bufferSize);
+ if (logCollectClient != null) {
+ threadExecutor.execute(this::consume);
+ }
}
@Override
- public void collect(final Object log) {
- if (log == null) {
+ public void collect(final ShenyuRequestLog log) {
+ if (log == null || logCollectClient == null) {
return;
}
if (bufferQueue.size() < bufferSize) {
- if (log instanceof String) {
- bufferQueue.add((String) log);
- } else {
- bufferQueue.add(JsonUtils.toJson(log));
- }
+ bufferQueue.add(log);
}
}
@@ -81,13 +82,13 @@ public abstract class AbstractLogCollector implements
LogCollector {
private void consume() {
while (started.get()) {
try {
- List<String> logs = new ArrayList<>();
+ List<ShenyuRequestLog> logs = new ArrayList<>();
int size = bufferQueue.size();
long time = System.currentTimeMillis();
long timeDiffMs = time - lastPushTime;
if (size >= batchSize || timeDiffMs > diffTimeMSForPush) {
bufferQueue.drainTo(logs, batchSize);
- logCollectClient.collect(logs);
+ logCollectClient.consume(logs);
lastPushTime = time;
} else {
ThreadUtils.sleep(TimeUnit.MILLISECONDS,
diffTimeMSForPush);
@@ -102,7 +103,9 @@ public abstract class AbstractLogCollector implements
LogCollector {
@Override
public void close() throws Exception {
started.set(false);
- logCollectClient.close();
+ if (logCollectClient != null) {
+ logCollectClient.close();
+ }
}
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
similarity index 66%
rename from
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
rename to
shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
index 9f56009..22acc3f 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollector.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/DefaultLogCollector.java
@@ -15,33 +15,31 @@
* limitations under the License.
*/
-package org.apache.shenyu.agent.plugin.logging.rocketmq;
+package org.apache.shenyu.agent.plugin.logging.common;
import org.apache.shenyu.agent.plugin.logging.LogCollector;
-import org.apache.shenyu.agent.plugin.logging.common.AbstractLogCollector;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
import java.util.Objects;
/**
- * queue-based logging collector.
+ * default log collector,depend a LogConsumeClient for consume logs.
*/
-public class RocketMQLogCollector extends AbstractLogCollector {
+public class DefaultLogCollector extends AbstractLogCollector {
- private static RocketMQLogCollector instance;
+ private static LogCollector instance;
- public RocketMQLogCollector(final LogCollectClient logCollectClient) {
- super(logCollectClient);
+ public DefaultLogCollector(final LogConsumeClient logConsumeClient) {
+ super(logConsumeClient);
instance = this;
}
/**
- * get RocketMQLogCollector instance.
+ * get LogCollector instance.
*
- * @return RocketMQLogCollector instance
+ * @return LogCollector instance
*/
public static LogCollector getInstance() {
return Objects.requireNonNull(instance);
}
-
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
new file mode 100644
index 0000000..bdce3ff
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/BodyWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * bodyWriter is used to read Body.
+ */
+public class BodyWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BodyWriter.class);
+
+ private final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+ private final WritableByteChannel channel = Channels.newChannel(stream);
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+ /**
+ * write ByteBuffer.
+ *
+ * @param buffer byte buffer
+ */
+ public void write(final ByteBuffer buffer) {
+ if (!isClosed.get()) {
+ try {
+ channel.write(buffer);
+ } catch (IOException e) {
+ isClosed.compareAndSet(false, true);
+ LOG.error("write buffer Failed.", e);
+ }
+ }
+ }
+
+ /**
+ * judge stream is empty.
+ *
+ * @return true: stream is empty
+ */
+ boolean isEmpty() {
+ return stream.size() == 0;
+ }
+
+ /**
+ * get stream size.
+ *
+ * @return size of stream
+ */
+ public int size() {
+ return stream.size();
+ }
+
+ /**
+ * output stream value.
+ *
+ * @return string of stream
+ */
+ public String output() {
+ if (size() == 0) {
+ return "";
+ }
+ try {
+ isClosed.compareAndSet(false, true);
+ return new String(stream.toByteArray(), StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ LOG.error("Write failed: ", e);
+ return "Write failed: " + e.getMessage();
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOG.error("Close stream error: ", e);
+ }
+ try {
+ channel.close();
+ } catch (IOException e) {
+ LOG.error("Close channel error: ", e);
+ }
+ }
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
new file mode 100644
index 0000000..4ca22c4
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
+import reactor.core.publisher.Flux;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpRequest for read body.
+ */
+public class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
+ private final ShenyuRequestLog logInfo;
+
+ public LoggingServerHttpRequest(final ServerHttpRequest delegate, final
ShenyuRequestLog logInfo) {
+ super(delegate);
+ this.logInfo = logInfo;
+ }
+
+ /**
+ * get request body.
+ *
+ * @return Flux
+ */
+ @Override
+ @NonNull
+ public Flux<DataBuffer> getBody() {
+ BodyWriter writer = new BodyWriter();
+ boolean collectRequestBody =
LogCollectConfigUtils.getLogFieldSwitchConfig().isRequestBody();
+ return super.getBody().doOnNext(dataBuffer -> {
+ if (LogCollectUtils.isNotBinaryType(getHeaders()) &&
collectRequestBody) {
+ writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
+ }
+ }).doFinally(signal -> {
+ int size = writer.size();
+ String body = writer.output();
+ boolean requestBodyTooLarge =
LogCollectConfigUtils.isRequestBodyTooLarge(size);
+ if (size == 0 || requestBodyTooLarge) {
+ return;
+ }
+ logInfo.setRequestBody(body);
+ });
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
new file mode 100644
index 0000000..aced05e
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/body/LoggingServerHttpResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.body;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.agent.plugin.logging.LogCollector;
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.common.utils.DateUtils;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.annotation.NonNull;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Optional;
+
+/**
+ * decorate ServerHttpResponse for read body.
+ */
+public class LoggingServerHttpResponse extends ServerHttpResponseDecorator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggingServerHttpResponse.class);
+
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+ private static final String SHENYU_AGENT_TRACE_ID =
"shenyu-agent-trace-id";
+
+ private final ShenyuRequestLog logInfo;
+
+ private ServerWebExchange exchange;
+
+ private final LogCollector logCollector;
+
+ public LoggingServerHttpResponse(final ServerHttpResponse delegate, final
ShenyuRequestLog logInfo,
+ final LogCollector logCollector) {
+ super(delegate);
+ this.logInfo = logInfo;
+ this.logCollector = logCollector;
+ }
+
+ /**
+ * set relevant ServerWebExchange.
+ *
+ * @param exchange ServerWebExchange
+ */
+ public void setExchange(final ServerWebExchange exchange) {
+ this.exchange = exchange;
+ }
+
+ /**
+ * write with a publisher.
+ *
+ * @param body response body
+ * @return Mono
+ */
+ @Override
+ @NonNull
+ public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer>
body) {
+ return super.writeWith(appendResponse(body));
+ }
+
+ /**
+ * append response.
+ *
+ * @param body publisher
+ * @return wrap Flux
+ */
+ @NonNull
+ private Flux<? extends DataBuffer> appendResponse(final Publisher<?
extends DataBuffer> body) {
+ ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+ assert shenyuContext != null;
+ if (getStatusCode() != null) {
+ logInfo.setStatus(getStatusCode().value());
+ }
+
logInfo.setResponseHeader(LogCollectUtils.getResponseHeaders(getHeaders()));
+ BodyWriter writer = new BodyWriter();
+ String traceId = (String)
exchange.getAttributes().get(SHENYU_AGENT_TRACE_ID);
+ logInfo.setTraceId(traceId);
+ boolean collectResponseBody =
LogCollectConfigUtils.getLogFieldSwitchConfig().isResponseBody();
+ return Flux.from(body).doOnNext(buffer -> {
+ if (LogCollectUtils.isNotBinaryType(getHeaders()) &&
collectResponseBody) {
+ writer.write(buffer.asByteBuffer().asReadOnlyBuffer());
+ }
+ }).doFinally(signal -> logResponse(shenyuContext, writer));
+ }
+
+ /**
+ * record response log.
+ *
+ * @param shenyuContext request context
+ * @param writer bodyWriter
+ */
+ private void logResponse(final ShenyuContext shenyuContext, final
BodyWriter writer) {
+ if
(StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
+
logInfo.setResponseContentLength(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH));
+ } else {
+ logInfo.setResponseContentLength(writer.size() + "");
+ }
+
logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+ long costTime =
DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(),
LocalDateTime.now());
+ logInfo.setUpstreamResponseTime(costTime);
+ if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+ logInfo.setRpcType(shenyuContext.getRpcType());
+ if (RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType()))
{
+ URI uri = exchange.getAttribute(Constants.HTTP_URI);
+ if (uri != null) {
+ logInfo.setUpstreamIp(uri.getHost());
+ } else {
+ String domain = (String)
exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+ try {
+ URL url = new URL(domain);
+ logInfo.setUpstreamIp(url.getHost());
+ } catch (Exception e) {
+ LOG.error("get upstream ip error");
+ }
+ }
+ } else {
+ Optional.ofNullable(exchange.getRequest().getRemoteAddress())
+ .map(InetSocketAddress::getAddress)
+ .ifPresent(v ->
logInfo.setUpstreamIp(v.getHostAddress()));
+ logInfo.setMethod(shenyuContext.getMethod());
+ }
+ }
+ int size = writer.size();
+ String body = writer.output();
+ if (size == 0 || LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+ return;
+ }
+ logInfo.setResponseBody(body);
+ // collect log
+ if (logCollector != null) {
+ logCollector.collect(logInfo);
+ }
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
index 423eea0..ddfa0b9 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/definition/LoggingAgentPluginDefinition.java
@@ -23,6 +23,7 @@ import
org.apache.shenyu.agent.api.spi.AbstractAgentPluginDefinition;
import org.apache.shenyu.agent.core.builder.JoinPointBuilderFactory;
import org.apache.shenyu.agent.core.locator.ShenyuAgentLocator;
import org.apache.shenyu.agent.core.yaml.ShenyuYamlEngine;
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.spi.Join;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class LoggingAgentPluginDefinition extends
AbstractAgentPluginDefinition
*/
@Override
protected Collection<JoinPointBuilder> joinPointBuilder() {
+ LogCollectConfigUtils.init();
PointCutConfig config = null;
try {
config =
ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("logging-point.yaml"),
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
new file mode 100644
index 0000000..e64e5fb
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/handler/DefaultLoggingPluginHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.handler;
+
+import org.apache.shenyu.agent.api.entity.MethodResult;
+import org.apache.shenyu.agent.api.entity.TargetObject;
+import org.apache.shenyu.agent.api.handler.InstanceMethodHandler;
+import org.apache.shenyu.agent.plugin.logging.common.DefaultLogCollector;
+import
org.apache.shenyu.agent.plugin.logging.common.body.LoggingServerHttpRequest;
+import
org.apache.shenyu.agent.plugin.logging.common.body.LoggingServerHttpResponse;
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
+import org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectUtils;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+
+import java.lang.reflect.Method;
+
+
+/**
+ * default logging plugin handler.
+ */
+public class DefaultLoggingPluginHandler implements InstanceMethodHandler {
+
+ private static final String USER_AGENT = "User-Agent";
+
+ private static final String HOST = "Host";
+
+ @Override
+ public Object after(final TargetObject target, final Method method, final
Object[] args,
+ final MethodResult methodResult) {
+ Object result = methodResult.getResult();
+ ServerWebExchange exchange = (ServerWebExchange) result;
+ ServerHttpRequest request = exchange.getRequest();
+
+ // control sampling
+ if (!LogCollectConfigUtils.isSampled(request)) {
+ return methodResult.getResult();
+ }
+
+ ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+ requestInfo.setRequestUri(request.getURI().toString());
+ requestInfo.setMethod(request.getMethodValue());
+
requestInfo.setRequestHeader(LogCollectUtils.getRequestHeaders(request.getHeaders()));
+ requestInfo.setQueryParams(request.getURI().getQuery());
+ requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+ requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
+ requestInfo.setHost(request.getHeaders().getFirst(HOST));
+ requestInfo.setPath(request.getURI().getPath());
+
+ LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
+ LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
+ requestInfo, DefaultLogCollector.getInstance());
+ ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
+ .response(loggingServerHttpResponse).build();
+ loggingServerHttpResponse.setExchange(webExchange);
+ return webExchange;
+ }
+
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
new file mode 100644
index 0000000..f5ca345
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/CountingSampler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.sampler;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+import java.util.BitSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * used for sample log.
+ */
+public class CountingSampler implements Sampler {
+
+ private final AtomicInteger counter;
+
+ private final BitSet sampleDecisions;
+
+ /**
+ * Fills a bitset with decisions according to the supplied probability.
+ */
+ CountingSampler(final float probability) {
+ this(probability, new Random());
+ }
+
+ /**
+ * Fills a bitset with decisions according to the probability using the
supplied {@link Random}.
+ */
+ CountingSampler(final float probability, final Random random) {
+ counter = new AtomicInteger();
+ int outOf100 = (int) (probability * 100.0f);
+ this.sampleDecisions = randomBitSet(100, outOf100, random);
+ }
+
+ /**
+ * loops over the pre-canned decisions, resetting to zero when it gets to
the end.
+ */
+ @Override
+ public boolean isSampled(final ServerHttpRequest request) {
+ return sampleDecisions.get(mod(counter.getAndIncrement(), 100));
+ }
+
+ /**
+ * Returns a non-negative mod.
+ */
+ private int mod(final int dividend, final int divisor) {
+ int result = dividend % divisor;
+ return result >= 0 ? result : divisor + result;
+ }
+
+ /**
+ * Reservoir sampling algorithm borrowed from Stack Overflow.
+ *
http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+ */
+ /**
+ * Reservoir sampling algorithm borrowed from Stack Overflow.
+ *
http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+ *
+ * @param size bitmap size
+ * @param cardinality cardinality
+ * @param rnd random
+ * @return bitSet
+ */
+ private BitSet randomBitSet(final int size, final int cardinality, final
Random rnd) {
+ BitSet result = new BitSet(size);
+ int[] chosen = new int[cardinality];
+ int i;
+ for (i = 0; i < cardinality; ++i) {
+ chosen[i] = i;
+ result.set(i);
+ }
+ for (; i < size; ++i) {
+ int j = rnd.nextInt(i + 1);
+ if (j < cardinality) {
+ result.clear(chosen[j]);
+ result.set(i);
+ chosen[j] = i;
+ }
+ }
+ return result;
+ }
+
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
new file mode 100644
index 0000000..278d510
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/sampler/Sampler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.sampler;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * sampler interface.
+ */
+public interface Sampler {
+
+ Sampler ALWAYS_SAMPLE = request -> true;
+ Sampler NEVER_SAMPLE = request -> false;
+
+ /**
+ * loops over the pre-canned decisions, resetting to zero when it gets to
the end.
+ *
+ * @param request request
+ * @return whether sample
+ */
+ boolean isSampled(ServerHttpRequest request);
+
+ /**
+ * create a sampler instance.
+ *
+ * @param probability probability
+ * @return sampler instance
+ */
+ static Sampler create(String probability) {
+ if (StringUtils.isBlank(probability)) {
+ return ALWAYS_SAMPLE;
+ }
+ if ("0".equals(probability)) {
+ return NEVER_SAMPLE;
+ }
+ if ("1".equals(probability) || "1.0".equals(probability) ||
"1.0.0".equals(probability)) {
+ return ALWAYS_SAMPLE;
+ }
+ float parseProbability = NumberUtils.toFloat(probability, 1);
+ if (parseProbability < 0.01f || parseProbability > 1) {
+ throw new IllegalArgumentException(
+ "probability should be between 0.01 and 1: was " +
probability);
+ }
+ return new CountingSampler(parseProbability);
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
new file mode 100644
index 0000000..d180cda
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectConfigUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.utils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.agent.api.config.LogCollectConfig;
+import org.apache.shenyu.agent.core.locator.ShenyuAgentLocator;
+import org.apache.shenyu.agent.core.yaml.ShenyuYamlEngine;
+import org.apache.shenyu.agent.plugin.logging.common.sampler.Sampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.util.AntPathMatcher;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * log collect config Utils.
+ */
+public final class LogCollectConfigUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LogCollectConfigUtils.class);
+
+ private static final AntPathMatcher MATCHER = new AntPathMatcher();
+
+ private static LogCollectConfig logCollectConfig;
+
+ private static final LogCollectConfig.LogFieldSwitchConfig
DEFAULT_LOG_FIELD_SWITCH_CONFIG =
+ new LogCollectConfig.LogFieldSwitchConfig();
+
+ private static final LogCollectConfig.GlobalLogConfig
DEFAULT_GLOBAL_LOG_CONFIG =
+ new LogCollectConfig.GlobalLogConfig();
+
+ private static final Map<String, Sampler> API_SAMPLER_MAP = new
HashMap<>();
+
+ private static Sampler globalSampler = Sampler.ALWAYS_SAMPLE;
+
+ private LogCollectConfigUtils() {
+ }
+
+ /**
+ * init logging config.
+ */
+ public static void init() {
+ try {
+ logCollectConfig =
ShenyuYamlEngine.unmarshal(ShenyuAgentLocator.locatorConf("logging-meta.yaml"),
+ LogCollectConfig.class);
+ if
(MapUtils.isNotEmpty(logCollectConfig.getLogApiSwitchConfigMap())) {
+ logCollectConfig.getLogApiSwitchConfigMap().forEach((path,
config) -> {
+ if (StringUtils.isBlank(config.getSampleRate())) {
+ API_SAMPLER_MAP.put(path, globalSampler);
+ } else {
+ API_SAMPLER_MAP.put(path,
Sampler.create(config.getSampleRate()));
+ }
+ });
+ }
+ if (Objects.nonNull(logCollectConfig.getGlobalLogConfig())) {
+ globalSampler =
Sampler.create(logCollectConfig.getGlobalLogConfig().getSampleRate());
+ }
+
+ } catch (Exception e) {
+ LOG.error("Exception loader logging meta data config error", e);
+ }
+ }
+
+ /**
+ * judge whether sample.
+ * @param request request
+ * @return whether sample
+ */
+ public static boolean isSampled(final ServerHttpRequest request) {
+ if (Objects.isNull(logCollectConfig) ||
MapUtils.isEmpty(logCollectConfig.getLogApiSwitchConfigMap())) {
+ return true;
+ }
+ String path = request.getURI().getPath();
+ Map<String, LogCollectConfig.LogApiConfig> apiConfigMap =
logCollectConfig.getLogApiSwitchConfigMap();
+ for (Map.Entry<String, LogCollectConfig.LogApiConfig> entry :
apiConfigMap.entrySet()) {
+ String pattern = entry.getKey();
+ if (MATCHER.match(pattern, path)) {
+ return Optional.ofNullable(API_SAMPLER_MAP.get(pattern))
+ .map(sampler -> sampler.isSampled(request))
+ .orElse(globalSampler.isSampled(request));
+ }
+ }
+ return true;
+ }
+
+ /**
+ * judge whether request body too large.
+ * @param bodySize body size
+ * @return whether request body too large
+ */
+ public static boolean isRequestBodyTooLarge(final int bodySize) {
+ if (Objects.isNull(logCollectConfig) ||
Objects.isNull(logCollectConfig.getGlobalLogConfig())) {
+ return false;
+ }
+ return bodySize >
logCollectConfig.getGlobalLogConfig().getMaxRequestBody();
+ }
+
+ /**
+ * judge whether response body too large.
+ * @param bodySize body size.
+ * @return whether response body too large
+ */
+ public static boolean isResponseBodyTooLarge(final int bodySize) {
+ if (Objects.isNull(logCollectConfig) ||
Objects.isNull(logCollectConfig.getGlobalLogConfig())) {
+ return false;
+ }
+ return bodySize >
logCollectConfig.getGlobalLogConfig().getMaxResponseBody();
+ }
+
+ /**
+ * get log field switch config.
+ * @return log field switch config
+ */
+ public static LogCollectConfig.LogFieldSwitchConfig
getLogFieldSwitchConfig() {
+ return
Optional.ofNullable(logCollectConfig).map(LogCollectConfig::getLogFieldSwitchConfig)
+ .orElse(DEFAULT_LOG_FIELD_SWITCH_CONFIG);
+ }
+
+ /**
+ * get global log config.
+ * @return global log config
+ */
+ public static LogCollectConfig.GlobalLogConfig getGlobalLogConfig() {
+ return
Optional.ofNullable(logCollectConfig).map(LogCollectConfig::getGlobalLogConfig)
+ .orElse(DEFAULT_GLOBAL_LOG_CONFIG);
+ }
+
+ /**
+ * get message queue topic.
+ * @param path request path
+ * @return topic
+ */
+ public static String getTopic(final String path) {
+ if (StringUtils.isBlank(path)) {
+ return "";
+ }
+ Map<String, LogCollectConfig.LogApiConfig> apiConfigMap =
logCollectConfig.getLogApiSwitchConfigMap();
+ if (MapUtils.isEmpty(apiConfigMap)) {
+ return "";
+ }
+ for (Map.Entry<String, LogCollectConfig.LogApiConfig> entry :
apiConfigMap.entrySet()) {
+ String pattern = entry.getKey().replace("[", "").replace("]", "");
+ if (MATCHER.match(pattern, path)) {
+ return entry.getValue().getTopic();
+ }
+ }
+ return "";
+ }
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
new file mode 100644
index 0000000..0bd600e
--- /dev/null
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-common/src/main/java/org/apache/shenyu/agent/plugin/logging/common/utils/LogCollectUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.agent.plugin.logging.common.utils;
+
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.springframework.http.HttpHeaders;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * log collect utils.
+ */
+public class LogCollectUtils {
+
+ private static final List<String> BINARY_TYPE_LIST =
Arrays.asList("image", "multipart", "cbor",
+ "octet-stream", "pdf", "javascript", "css", "html");
+
+ /**
+ * judge whether is binary type.
+ * @param headers request or response header
+ * @return whether binary type
+ */
+ public static boolean isNotBinaryType(final HttpHeaders headers) {
+ return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
+ .map(contentType ->
!BINARY_TYPE_LIST.contains(contentType.getType())
+ &&
!BINARY_TYPE_LIST.contains(contentType.getSubtype()))
+ .orElse(true);
+ }
+
+ /**
+ * get request header string.
+ * @param headers request headers
+ * @return header string
+ */
+ public static String getRequestHeaders(final HttpHeaders headers) {
+ boolean requestHeader =
LogCollectConfigUtils.getLogFieldSwitchConfig().isRequestHeader();
+ if (!requestHeader) {
+ return null;
+ }
+ return JsonUtils.toJson(headers.entrySet());
+ }
+
+ /**
+ * get response header string.
+ * @param headers response headers
+ * @return response headers
+ */
+ public static String getResponseHeaders(final HttpHeaders headers) {
+ boolean responseHeader =
LogCollectConfigUtils.getLogFieldSwitchConfig().isResponseHeader();
+ if (!responseHeader) {
+ return null;
+ }
+ return JsonUtils.toJson(headers.entrySet());
+ }
+
+}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
index 293121f..d83a6cd 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/RocketMQLogCollectClient.java
@@ -20,12 +20,13 @@ package org.apache.shenyu.agent.plugin.logging.rocketmq;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.shenyu.agent.api.config.AgentPluginConfig;
+import org.apache.shenyu.agent.plugin.logging.LogConsumeClient;
+import
org.apache.shenyu.agent.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.agent.plugin.logging.constant.LoggingConstant;
-import org.apache.shenyu.agent.plugin.logging.spi.LogCollectClient;
+import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
+import org.apache.shenyu.common.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,17 +34,16 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
-import java.util.stream.Collectors;
/**
* queue-based logging collector.
*/
-public class RocketMQLogCollectClient implements LogCollectClient {
-
+public class RocketMQLogCollectClient implements LogConsumeClient {
+
private static final Logger LOG =
LoggerFactory.getLogger(RocketMQLogCollectClient.class);
-
+
private DefaultMQProducer producer;
-
+
private String topic;
public RocketMQLogCollectClient(final AgentPluginConfig agentPluginConfig)
{
@@ -85,25 +85,21 @@ public class RocketMQLogCollectClient implements
LogCollectClient {
* @throws Exception produce exception
*/
@Override
- public void collect(final List<String> logs) throws Exception {
+ public void consume(final List<ShenyuRequestLog> logs) throws Exception {
if (CollectionUtils.isEmpty(logs)) {
return;
}
- List<Message> messageList = logs.stream().map(log -> new
Message(topic, log.getBytes(StandardCharsets.UTF_8)))
- .collect(Collectors.toList());
- producer.send(messageList, new SendCallback() {
- @Override
- public void onSuccess(final SendResult sendResult) {
- LOG.info("rocketmq push logs success");
- }
-
- @Override
- public void onException(final Throwable e) {
+ logs.stream().forEach(log -> {
+ String logTopic =
StringUtils.defaultIfBlank(LogCollectConfigUtils.getTopic(log.getPath()),
topic);
+ Message message = new Message(logTopic,
JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8));
+ try {
+ producer.sendOneway(message);
+ } catch (Exception e) {
LOG.error("rocketmq push logs error", e);
}
});
}
-
+
/**
* close producer.
*
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
index 391de38..707b493 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/boot/RocketMQAgentPluginBootService.java
@@ -19,8 +19,8 @@ package org.apache.shenyu.agent.plugin.logging.rocketmq.boot;
import org.apache.shenyu.agent.api.config.AgentPluginConfig;
import org.apache.shenyu.agent.api.spi.AgentPluginBootService;
+import org.apache.shenyu.agent.plugin.logging.common.DefaultLogCollector;
import
org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollectClient;
-import org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollector;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.spi.Join;
import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class RocketMQAgentPluginBootService implements
AgentPluginBootService {
public void start(final AgentPluginConfig agentPluginConfig) {
LOG.info("start RocketMQAgentPluginBootService, config:{}",
JsonUtils.toJson(agentPluginConfig));
RocketMQLogCollectClient client = new
RocketMQLogCollectClient(agentPluginConfig);
- new RocketMQLogCollector(client);
+ new DefaultLogCollector(client);
}
/**
@@ -50,7 +50,7 @@ public class RocketMQAgentPluginBootService implements
AgentPluginBootService {
@Override
public void close() {
try {
- RocketMQLogCollector.getInstance().close();
+ DefaultLogCollector.getInstance().close();
} catch (Exception e) {
LOG.error("close RocketMQLogCollector error", e);
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
deleted file mode 100644
index bcd9df6..0000000
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-logging/shenyu-agent-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/agent/plugin/logging/rocketmq/handler/RocketMQGlobalPluginHandler.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shenyu.agent.plugin.logging.rocketmq.handler;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.agent.api.entity.MethodResult;
-import org.apache.shenyu.agent.api.entity.TargetObject;
-import org.apache.shenyu.agent.api.handler.InstanceMethodHandler;
-import org.apache.shenyu.agent.plugin.logging.entity.ShenyuRequestLog;
-import org.apache.shenyu.agent.plugin.logging.rocketmq.RocketMQLogCollector;
-import org.apache.shenyu.common.constant.Constants;
-import org.apache.shenyu.common.enums.RpcTypeEnum;
-import org.apache.shenyu.common.utils.DateUtils;
-import org.apache.shenyu.plugin.api.context.ShenyuContext;
-import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
-import org.springframework.http.server.reactive.ServerHttpResponse;
-import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.annotation.NonNull;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Method interceptor is used to collect log.
- */
-public class RocketMQGlobalPluginHandler implements InstanceMethodHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RocketMQGlobalPluginHandler.class);
-
- private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
-
- private static final List<String> BINARY_TYPE_LIST =
Arrays.asList("image", "multipart", "cbor",
- "octet-stream", "pdf");
-
- @Override
- public Object after(final TargetObject target, final Method method, final
Object[] args,
- final MethodResult methodResult) {
- Object result = methodResult.getResult();
- ServerWebExchange exchange = (ServerWebExchange) result;
- ServerHttpRequest request = exchange.getRequest();
- ServerHttpResponse response = exchange.getResponse();
-
- ShenyuRequestLog requestInfo = new ShenyuRequestLog();
- requestInfo.setRequestUri(request.getURI().toString());
- requestInfo.setMethod(request.getMethodValue());
- requestInfo.setRequestHeader(getHeaders(request.getHeaders()));
- requestInfo.setQueryParams(request.getURI().getQuery());
- requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
- requestInfo.setUserAgent(request.getHeaders().getFirst("User-Agent"));
- requestInfo.setHost(request.getHeaders().getFirst("Host"));
-
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(response, requestInfo);
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return webExchange;
- }
-
-
- /**
- * getHeader string.
- *
- * @return header
- */
- private static String getHeaders(final HttpHeaders headers) {
- StringBuilder sb = new StringBuilder();
- Set<Map.Entry<String, List<String>>> entrySet = headers.entrySet();
- entrySet.forEach(entry -> {
- String key = entry.getKey();
- List<String> value = entry.getValue();
- sb.append(key).append(": ").append(StringUtils.join(value,
",")).append(System.lineSeparator());
- });
- return sb.toString();
- }
-
- /**
- * judge this header is binary type.
- *
- * @param headers httpHeaders
- * @return true: is not binary type
- */
- private static boolean isNotBinaryType(final HttpHeaders headers) {
- return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
- .map(contentType ->
!BINARY_TYPE_LIST.contains(contentType.getType())
- &&
!BINARY_TYPE_LIST.contains(contentType.getSubtype()))
- .orElse(true);
- }
-
-
- /**
- * wrap ServerHttpRequest.
- */
- static class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
- private final ShenyuRequestLog logInfo;
-
- LoggingServerHttpRequest(final ServerHttpRequest delegate, final
ShenyuRequestLog logInfo) {
- super(delegate);
- this.logInfo = logInfo;
- }
-
- /**
- * get request body.
- *
- * @return Flux
- */
- @Override
- @NonNull
- public Flux<DataBuffer> getBody() {
- BodyWriter writer = new BodyWriter();
- return super.getBody().doOnNext(dataBuffer ->
writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer()))
- .doFinally(signal -> {
- if (!writer.isEmpty() &&
isNotBinaryType(getHeaders())) {
- logInfo.setRequestBody(writer.output());
- } else {
- writer.output();
- }
- });
- }
- }
-
- /**
- * wrap serverHttpResponse.
- */
- static class LoggingServerHttpResponse extends ServerHttpResponseDecorator
{
-
- private final ShenyuRequestLog logInfo;
-
- private ServerWebExchange exchange;
-
- LoggingServerHttpResponse(final ServerHttpResponse delegate, final
ShenyuRequestLog logInfo) {
- super(delegate);
- this.logInfo = logInfo;
- }
-
- /**
- * set relevant ServerWebExchange.
- *
- * @param exchange ServerWebExchange
- */
- public void setExchange(final ServerWebExchange exchange) {
- this.exchange = exchange;
- }
-
- /**
- * write with a publisher.
- *
- * @param body response body
- * @return Mono
- */
- @Override
- @NonNull
- public Mono<Void> writeWith(@NonNull final Publisher<? extends
DataBuffer> body) {
- return super.writeWith(appendResponse(body));
- }
-
- /**
- * append response.
- *
- * @param body publisher
- * @return wrap Flux
- */
- @NonNull
- private Flux<? extends DataBuffer> appendResponse(final Publisher<?
extends DataBuffer> body) {
- ShenyuContext shenyuContext =
exchange.getAttribute(Constants.CONTEXT);
- assert shenyuContext != null;
- if (getStatusCode() != null) {
- logInfo.setStatus(getStatusCode().value());
- }
-
logInfo.setResponseHeader(RocketMQGlobalPluginHandler.getHeaders(getHeaders()));
- BodyWriter writer = new BodyWriter();
- return Flux.from(body).doOnNext(buffer ->
writer.write(buffer.asByteBuffer().asReadOnlyBuffer()))
- .doFinally(signal -> logResponse(shenyuContext, writer));
- }
-
- /**
- * record response log.
- *
- * @param shenyuContext request context
- * @param writer bodyWriter
- */
- private void logResponse(final ShenyuContext shenyuContext, final
BodyWriter writer) {
- if
(StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
-
logInfo.setResponseContentLength(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH));
- } else {
- logInfo.setResponseContentLength(writer.size() + "");
- }
-
logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
- long costTime =
DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(),
- LocalDateTime.now());
- logInfo.setUpstreamResponseTime(costTime);
- if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
- logInfo.setRpcType(shenyuContext.getRpcType());
- if
(RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType())) {
- URI uri = exchange.getAttribute(Constants.HTTP_URI);
- if (uri != null) {
- logInfo.setUpstreamIp(uri.getHost());
- } else {
- String domain = (String)
exchange.getAttributes().get(Constants.HTTP_DOMAIN);
- try {
- URL url = new URL(domain);
- logInfo.setUpstreamIp(url.getHost());
- } catch (Exception e) {
- LOG.error("get upstream ip error");
- }
- }
- } else {
-
Optional.ofNullable(exchange.getRequest().getRemoteAddress())
- .map(InetSocketAddress::getAddress)
- .ifPresent(v ->
logInfo.setUpstreamIp(v.getHostAddress()));
- logInfo.setMethod(shenyuContext.getMethod());
- }
- }
- String body = writer.output();
- if (isNotBinaryType(getHeaders())) {
- logInfo.setResponseBody(body);
- }
- // collect log
- RocketMQLogCollector.getInstance().collect(logInfo);
- }
- }
-
- /**
- * bodyWriter is used to read Body.
- */
- static class BodyWriter {
-
- private final ByteArrayOutputStream stream = new
ByteArrayOutputStream();
-
- private final WritableByteChannel channel =
Channels.newChannel(stream);
-
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-
- /**
- * write ByteBuffer.
- *
- * @param buffer byte buffer
- */
- void write(final ByteBuffer buffer) {
- if (!isClosed.get()) {
- try {
- channel.write(buffer);
- } catch (IOException e) {
- isClosed.compareAndSet(false, true);
- LOG.error("Parse Failed.", e);
- }
- }
- }
-
- /**
- * judge stream is empty.
- *
- * @return true: stream is empty
- */
- boolean isEmpty() {
- return stream.size() == 0;
- }
-
- /**
- * get stream size.
- *
- * @return size of stream
- */
- int size() {
- return stream.size();
- }
-
- /**
- * output stream value.
- *
- * @return string of stream
- */
- String output() {
- try {
- isClosed.compareAndSet(false, true);
- return new String(stream.toByteArray(),
StandardCharsets.UTF_8);
- } catch (Exception e) {
- LOG.error("Write failed: ", e);
- return "Write failed: " + e.getMessage();
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- LOG.error("Close stream error: ", e);
- }
- try {
- channel.close();
- } catch (IOException e) {
- LOG.error("Close channel error: ", e);
- }
- }
- }
- }
-}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
index b09464e..7267be7 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-common/src/main/java/org/apache/shenyu/agent/plugin/tracing/common/constant/TracingConstants.java
@@ -44,6 +44,8 @@ public final class TracingConstants {
public static final String SHENYU_AGENT_TRACE_ZIPKIN =
"shenyu-agent-trace-zipkin";
+ public static final String SHENYU_AGENT_TRACE_ID = "shenyu-agent-trace-id";
+
/**
* The type Error log tags.
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
index 32a57b6..9b45cc7 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-jaeger/src/main/java/org/apache/shenyu/agent/plugin/tracing/jaeger/handler/JaegerGlobalPluginHandler.java
@@ -50,6 +50,7 @@ public final class JaegerGlobalPluginHandler implements
InstanceMethodHandler {
Span span = jaegerSpanManager.add(TracingConstants.ROOT_SPAN, tagMap);
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_JAEGER,
jaegerSpanManager);
+ exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID,
span.context().toTraceId());
target.setContext(span);
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
index 934a4df..f39fe5b 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-opentelemetry/src/main/java/org/apache/shenyu/agent/plugin/tracing/opentelemetry/handler/OpenTelemetryGlobalPluginHandler.java
@@ -51,6 +51,7 @@ public final class OpenTelemetryGlobalPluginHandler
implements InstanceMethodHan
Span span = spanManager.startAndRecord(TracingConstants.ROOT_SPAN,
attributesMap);
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_OPENTELEMETRY,
spanManager);
+ exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID,
span.getSpanContext().getTraceId());
target.setContext(span);
}
diff --git
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
index f745b01..549f035 100644
---
a/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
+++
b/shenyu-agent/shenyu-agent-plugins/shenyu-agent-plugin-tracing/shenyu-agent-plugin-tracing-zipkin/src/main/java/org/apache/shenyu/agent/plugin/tracing/zipkin/handler/ZipkinGlobalPluginHandler.java
@@ -50,6 +50,7 @@ public final class ZipkinGlobalPluginHandler implements
InstanceMethodHandler {
Span span = zipkinSpanManager.start(TracingConstants.ROOT_SPAN,
tagMap);
exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ZIPKIN,
zipkinSpanManager);
+ exchange.getAttributes().put(TracingConstants.SHENYU_AGENT_TRACE_ID,
span.context().traceIdString());
target.setContext(span);
}
diff --git
a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
similarity index 73%
copy from
shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
copy to shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
index 698f2b3..a0ebaf2 100644
--- a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
+++ b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-meta.yaml
@@ -15,12 +15,17 @@
# limitations under the License.
#
-pointCuts:
- - targetClass: org.springframework.web.server.adapter.HttpWebHandlerAdapter
- points:
- - type: instanceMethod
- name: createExchange
- handlers:
- rocketmq:
- -
org.apache.shenyu.agent.plugin.logging.rocketmq.handler.RocketMQGlobalPluginHandler
-
\ No newline at end of file
+logFieldSwitchConfig:
+ requestHeader: true
+ responseHeader: true
+ requestBody: true
+ responseBody: true
+
+globalLogConfig:
+ sampleRate: "1"
+ bufferQueueSize: 50000
+
+logApiSwitchConfigMap:
+ "[/http/test/**]":
+ sampleRate: "0.5"
+ topic: "shenyu-agent-logging-test"
diff --git
a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
index 698f2b3..ed3c3a5 100644
--- a/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
+++ b/shenyu-dist/shenyu-agent-dist/src/main/resources/conf/logging-point.yaml
@@ -22,5 +22,5 @@ pointCuts:
name: createExchange
handlers:
rocketmq:
- -
org.apache.shenyu.agent.plugin.logging.rocketmq.handler.RocketMQGlobalPluginHandler
+ -
org.apache.shenyu.agent.plugin.logging.common.handler.DefaultLoggingPluginHandler
\ No newline at end of file