This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 128b055ac [type:refactor] refactor shenyu-plugin-logging. (#3910)
128b055ac is described below
commit 128b055ac9ad8412dd7f5487f2213d1c34f459f6
Author: yunlongn <[email protected]>
AuthorDate: Sat Sep 3 00:35:05 2022 +0800
[type:refactor] refactor shenyu-plugin-logging. (#3910)
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
* [type:refactor] refactor shenyu-plugin-logging.
---
.../plugin/aliyun/sls/LoggingAliyunSlsPlugin.java | 40 ++------
.../sls/client/AliyunSlsLogCollectClient.java | 39 ++------
.../aliyun/sls/config/AliyunLogCollectConfig.java | 30 ++++++
.../handler/LoggingAliyunSlsPluginDataHandler.java | 61 ++++++------
.../aliyun/sls/LoggingAliyunSlsPluginTest.java | 2 +-
.../clickhouse/LoggingClickHousePlugin.java | 40 ++------
.../client/ClickHouseLogCollectClient.java | 37 ++------
.../config/ClickHouseLogCollectConfig.java | 3 +-
.../LoggingClickHousePluginDataHandler.java | 52 +++++------
.../clickhouse/LoggingClickHousePluginTest.java | 2 +-
.../logging/common/AbstractLoggingPlugin.java | 58 +++++++++---
.../common/client/AbstractLogConsumeClient.java | 101 ++++++++++++++++++++
...dler.java => AbstractLogPluginDataHandler.java} | 78 ++++++++++++----
.../logging/common/sampler/CountSampler.java | 1 +
.../elasticsearch/LoggingElasticSearchPlugin.java | 40 ++------
.../client/ElasticSearchLogCollectClient.java | 20 ++--
.../config/ElasticSearchLogCollectConfig.java | 32 ++++++-
.../LoggingElasticSearchPluginDataHandler.java | 103 +++------------------
.../LoggingElasticSearchPluginTest.java | 2 +-
.../LoggingElasticSearchPluginDataHandlerTest.java | 14 +--
.../plugin/logging/kafka/LoggingKafkaPlugin.java | 40 ++------
.../kafka/client/KafkaLogCollectClient.java | 39 +++-----
.../kafka/config/KafkaLogCollectConfig.java | 30 +++++-
.../handler/LoggingKafkaPluginDataHandler.java | 57 +++++-------
.../logging/kafka/LoggingKafkaPluginTest.java | 2 +-
.../handler/LoggingKafkaPluginDataHandlerTest.java | 23 ++---
.../plugin/logging/pulsar/LoggingPulsarPlugin.java | 40 ++------
.../pulsar/client/PulsarLogCollectClient.java | 26 ++----
.../pulsar/config/PulsarLogCollectConfig.java | 29 +++++-
.../handler/LoggingPulsarPluginDataHandler.java | 56 ++++-------
.../logging/pulsar/LoggingPulsarPluginTest.java | 2 +-
.../pulsar/client/PulsarLogCollectClientTest.java | 2 +-
.../logging/rocketmq/LoggingRocketMQPlugin.java | 40 ++------
.../rocketmq/client/RocketMQLogCollectClient.java | 26 ++----
.../rocketmq/config/RocketMQLogCollectConfig.java | 32 ++++++-
.../handler/LoggingRocketMQPluginDataHandler.java | 43 ++++-----
.../rocketmq/LoggingRocketMQPluginTest.java | 2 +-
.../LoggingRocketMQPluginDataHandlerTest.java | 23 ++---
.../tencent/cls/LoggingTencentClsPlugin.java | 42 ++-------
.../cls/client/TencentClsLogCollectClient.java | 28 ++----
.../cls/collector/TencentClsSlsLogCollector.java | 2 +-
.../LoggingTencentClsPluginDataHandler.java | 55 ++++-------
.../tencent/cls/LoggingTencentClsPluginTest.java | 2 +-
43 files changed, 648 insertions(+), 748 deletions(-)
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
index 2737b0dc0..bd9ceaeb2 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.aliyun.sls;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.aliyun.sls.collector.AliyunSlsLogCollector;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
/**
* LoggingAliYunSlsPlugin send log to aliyun sls service.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingAliyunSlsPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, AliyunSlsLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ protected LogCollector logCollector() {
+ return AliyunSlsLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_ALIYUN_SLS.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_ALIYUN_SLS.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_ALIYUN_SLS;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
index 949e26b52..ad95eedb7 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
@@ -24,7 +24,6 @@ import
com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import
com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
import
com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
-import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import
com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogItem;
@@ -38,7 +37,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.plugin.aliyun.sls.config.AliyunLogCollectConfig;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.slf4j.Logger;
@@ -51,14 +50,11 @@ import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Aliyun sls log Collect client.
*/
-public class AliyunSlsLogCollectClient implements
LogConsumeClient<AliyunLogCollectConfig.AliyunSlsLogConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AliyunSlsLogCollectClient.class);
+public class AliyunSlsLogCollectClient extends
AbstractLogConsumeClient<AliyunLogCollectConfig.AliyunSlsLogConfig> {
private Client client;
@@ -70,8 +66,6 @@ public class AliyunSlsLogCollectClient implements
LogConsumeClient<AliyunLogColl
private Producer producer;
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
private ThreadPoolExecutor threadExecutor;
/**
@@ -80,17 +74,7 @@ public class AliyunSlsLogCollectClient implements
LogConsumeClient<AliyunLogColl
* @param config config
*/
@Override
- public void initClient(final AliyunLogCollectConfig.AliyunSlsLogConfig
config) {
- if (Objects.isNull(config)
- || StringUtils.isBlank(config.getHost())
- || StringUtils.isBlank(config.getAccessId())
- || StringUtils.isBlank(config.getAccessKey())) {
- LOG.error("aliyun sls props is empty. failed init aliyun sls
producer");
- return;
- }
- if (isStarted.get()) {
- close();
- }
+ public void initClient0(final AliyunLogCollectConfig.AliyunSlsLogConfig
config) {
String accessId = config.getAccessId();
String accessKey = config.getAccessKey();
String host = config.getHost();
@@ -111,8 +95,6 @@ public class AliyunSlsLogCollectClient implements
LogConsumeClient<AliyunLogColl
LogStore store = new LogStore(logStore, ttlInDay, shardCount);
threadExecutor = createThreadPoolExecutor(config);
try {
- isStarted.set(true);
- Runtime.getRuntime().addShutdownHook(new Thread(this::close));
client.CreateLogStore(projectName, store);
} catch (LogException e) {
LOG.warn("error code:{}, error message:{}", e.GetErrorCode(),
e.GetErrorMessage());
@@ -125,23 +107,18 @@ public class AliyunSlsLogCollectClient implements
LogConsumeClient<AliyunLogColl
* @param logs list of log
*/
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
logs.forEach(this::sendLog);
}
@Override
- public void close() {
- if (Objects.nonNull(client) && isStarted.get()) {
- isStarted.set(false);
+ public void close0() throws Exception {
+ if (Objects.nonNull(client)) {
client.shutdown();
- try {
- producer.close();
- } catch (InterruptedException | ProducerException e) {
- LOG.error("Close producer error.");
- }
+ producer.close();
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
index faa1729f7..6ffdd37d7 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.plugin.aliyun.sls.config;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -284,5 +285,34 @@ public class AliyunLogCollectConfig {
public void setIoThreadCount(final Integer ioThreadCount) {
this.ioThreadCount = ioThreadCount;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return Boolean.TRUE;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return Boolean.FALSE;
+ }
+
+ AliyunSlsLogConfig that = (AliyunSlsLogConfig) o;
+ return Objects.equals(getAccessId(), that.getAccessId())
+ && Objects.equals(getAccessKey(), that.getAccessKey())
+ && Objects.equals(getHost(), that.getHost())
+ && Objects.equals(getIoThreadCount(),
that.getIoThreadCount())
+ && Objects.equals(getLogStoreName(),
that.getLogStoreName())
+ && Objects.equals(getProjectName(), that.getProjectName())
+ && Objects.equals(getSendThreadCount(),
that.getSendThreadCount())
+ && Objects.equals(getShardCount(), that.getShardCount())
+ && Objects.equals(getTopic(), that.getTopic())
+ && Objects.equals(getTtlInDay(), that.getTtlInDay());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(accessId, accessKey, host, ioThreadCount,
logStoreName,
+ projectName, sendThreadCount, shardCount, topic, ttlInDay);
+ }
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
index 6054b5d68..cdc8339e7 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
@@ -17,55 +17,48 @@
package org.apache.shenyu.plugin.aliyun.sls.handler;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.convert.plugin.MotanRegisterConfig;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.plugin.aliyun.sls.client.AliyunSlsLogCollectClient;
import org.apache.shenyu.plugin.aliyun.sls.collector.AliyunSlsLogCollector;
import org.apache.shenyu.plugin.aliyun.sls.config.AliyunLogCollectConfig;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import java.util.Objects;
/**
* LoggingAliYunSlsPluginDataHandler aliyun sls plugin data handler.
*/
-public class LoggingAliyunSlsPluginDataHandler implements PluginDataHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingAliyunSlsPluginDataHandler.class);
+public class LoggingAliyunSlsPluginDataHandler extends
AbstractLogPluginDataHandler<AliyunLogCollectConfig.AliyunSlsLogConfig,
GenericApiConfig> {
private static final AliyunSlsLogCollectClient
ALIYUN_SLS_LOG_COLLECT_CLIENT = new AliyunSlsLogCollectClient();
+ /**
+ * logCollector.
+ */
+ @Override
+ protected LogCollector logCollector() {
+ return AliyunSlsLogCollector.getInstance();
+ }
+
+ /**
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("AliYun sls plugin data: {}",
GsonUtils.getGson().toJson(pluginData));
- if (Objects.nonNull(pluginData) &&
Boolean.TRUE.equals(pluginData.getEnabled())) {
- AliyunLogCollectConfig.AliyunSlsLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
- AliyunLogCollectConfig.AliyunSlsLogConfig.class);
- AliyunLogCollectConfig.AliyunSlsLogConfig exist =
Singleton.INST.get(AliyunLogCollectConfig.AliyunSlsLogConfig.class);
- if (Objects.isNull(globalLogConfig)) {
- return;
- }
- if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
- // no data, init client
-
AliyunLogCollectConfig.INSTANCE.setAliyunSlsLogConfig(globalLogConfig);
-
- // init aliyun sls client
- ALIYUN_SLS_LOG_COLLECT_CLIENT.initClient(exist);
- AliyunSlsLogCollector.getInstance().start();
- }
- Singleton.INST.single(MotanRegisterConfig.class, globalLogConfig);
- } else {
- try {
- AliyunSlsLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
+ protected void doRefreshConfig(final
AliyunLogCollectConfig.AliyunSlsLogConfig globalLogConfig) {
+ AliyunLogCollectConfig.INSTANCE.setAliyunSlsLogConfig(globalLogConfig);
+ if (Objects.isNull(globalLogConfig)
+ || StringUtils.isBlank(globalLogConfig.getHost())
+ || StringUtils.isBlank(globalLogConfig.getAccessId())
+ || StringUtils.isBlank(globalLogConfig.getAccessKey())) {
+ LOG.error("aliyun sls props is empty. failed init aliyun sls
producer");
+ return;
}
+ ALIYUN_SLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
index 9cfec412e..f275d0284 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingAliyunSlsPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingAliYunSlsPlugin.doLogExecute(exchange,
chain, selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingAliYunSlsPlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
index e02e3ad1d..11d3f10d0 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.logging.clickhouse;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import
org.apache.shenyu.plugin.logging.clickhouse.collector.ClickHouseLogCollector;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
/**
* LoggingClickHousePlugin.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingClickHousePlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, ClickHouseLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ protected LogCollector logCollector() {
+ return ClickHouseLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_CLICK_HOUSE.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_CLICK_HOUSE.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_CLICK_HOUSE;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
index 3935a63ac..363195408 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
@@ -28,34 +28,28 @@ import com.clickhouse.client.data.ClickHouseIntegerValue;
import com.clickhouse.client.data.ClickHouseLongValue;
import com.clickhouse.client.data.ClickHouseOffsetDateTimeValue;
import com.clickhouse.client.data.ClickHouseStringValue;
-import java.util.List;
-import java.util.Objects;
-import java.util.TimeZone;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.utils.DateUtils;
import
org.apache.shenyu.plugin.logging.clickhouse.config.ClickHouseLogCollectConfig;
import
org.apache.shenyu.plugin.logging.clickhouse.constant.ClickHouseLoggingConstant;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.TimeZone;
/**
* queue-based logging collector.
*/
-public class ClickHouseLogCollectClient implements
LogConsumeClient<ClickHouseLogCollectConfig.ClickHouseLogConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ClickHouseLogCollectClient.class);
-
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
+public class ClickHouseLogCollectClient extends
AbstractLogConsumeClient<ClickHouseLogCollectConfig.ClickHouseLogConfig> {
private ClickHouseClient client;
private ClickHouseNode endpoint;
@Override
- public void consume(final List<ShenyuRequestLog> logs) throws Exception {
+ public void consume0(final List<ShenyuRequestLog> logs) throws Exception {
if (CollectionUtils.isNotEmpty(logs)) {
Object[][] datas = new Object[logs.size()][];
for (int i = 0; i < logs.size(); i++) {
@@ -109,10 +103,9 @@ public class ClickHouseLogCollectClient implements
LogConsumeClient<ClickHouseLo
}
@Override
- public void close() {
- if (Objects.nonNull(client) && isStarted.get()) {
+ public void close0() {
+ if (Objects.nonNull(client)) {
client.close();
- isStarted.set(false);
}
}
@@ -122,14 +115,7 @@ public class ClickHouseLogCollectClient implements
LogConsumeClient<ClickHouseLo
* @param config properties.
*/
@Override
- public void initClient(final
ClickHouseLogCollectConfig.ClickHouseLogConfig config) {
- if (Objects.isNull(config)) {
- LOG.error("clickhouse properties is empty. failed init clickhouse
client");
- return;
- }
- if (isStarted.get()) {
- close();
- }
+ public void initClient0(final
ClickHouseLogCollectConfig.ClickHouseLogConfig config) {
final String username = config.getUsername();
final String password = config.getPassword();
endpoint = ClickHouseNode.builder()
@@ -146,8 +132,5 @@ public class ClickHouseLogCollectClient implements
LogConsumeClient<ClickHouseLo
} catch (Exception e) {
LOG.error("inti ClickHouseLogClient error" + e);
}
- isStarted.set(true);
- Runtime.getRuntime().addShutdownHook(new Thread(this::close));
-
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
index 4bdd11249..671bb364b 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
@@ -17,9 +17,10 @@
package org.apache.shenyu.plugin.logging.clickhouse.config;
-import java.util.Optional;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Optional;
+
/**
* ClickHouseLogCollectConfig.
*/
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
index c94f6c769..2b5ddab57 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
@@ -17,50 +17,39 @@
package org.apache.shenyu.plugin.logging.clickhouse.handler;
-import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
import
org.apache.shenyu.plugin.logging.clickhouse.client.ClickHouseLogCollectClient;
import
org.apache.shenyu.plugin.logging.clickhouse.collector.ClickHouseLogCollector;
import
org.apache.shenyu.plugin.logging.clickhouse.config.ClickHouseLogCollectConfig;
import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
/**
* The type logging pulsar plugin data handler.
*/
-public class LoggingClickHousePluginDataHandler implements PluginDataHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingClickHousePluginDataHandler.class);
+public class LoggingClickHousePluginDataHandler extends
AbstractLogPluginDataHandler<ClickHouseLogCollectConfig.ClickHouseLogConfig,
GenericApiConfig> {
private static final ClickHouseLogCollectClient
CLICK_HOUSE_LOG_COLLECT_CLIENT = new ClickHouseLogCollectClient();
/**
- * getClickHouseLogCollectClient.
- *
- * @return LogConsumeClient
+ * logCollector.
*/
- public static LogConsumeClient getClickHouseLogCollectClient() {
- return CLICK_HOUSE_LOG_COLLECT_CLIENT;
+ @Override
+ protected LogCollector logCollector() {
+ return ClickHouseLogCollector.getInstance();
}
+ /**
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("handler loggingClickHouse Plugin data:{}",
GsonUtils.getGson().toJson(pluginData));
- if (pluginData.getEnabled()) {
- ClickHouseLogCollectConfig.ClickHouseLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
ClickHouseLogCollectConfig.ClickHouseLogConfig.class);
-
ClickHouseLogCollectConfig.INSTANCE.setClickHouseLogConfig(globalLogConfig);
- CLICK_HOUSE_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- ClickHouseLogCollector.getInstance().start();
- } else {
- try {
- ClickHouseLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
+ protected void doRefreshConfig(final
ClickHouseLogCollectConfig.ClickHouseLogConfig globalLogConfig) {
+
ClickHouseLogCollectConfig.INSTANCE.setClickHouseLogConfig(globalLogConfig);
+ CLICK_HOUSE_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
@@ -68,4 +57,13 @@ public class LoggingClickHousePluginDataHandler implements
PluginDataHandler {
return PluginEnum.LOGGING_CLICK_HOUSE.getName();
}
+ /**
+ * getClickHouseLogCollectClient.
+ *
+ * @return LogConsumeClient
+ */
+ public static LogConsumeClient getClickHouseLogCollectClient() {
+ return CLICK_HOUSE_LOG_COLLECT_CLIENT;
+ }
+
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
index f3e80bc9f..849346b66 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
@@ -90,7 +90,7 @@ public class LoggingClickHousePluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingClickHousePlugin.doLogExecute(exchange,
chain, selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingClickHousePlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
index 5752f33dd..d74ca13ba 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
@@ -19,9 +19,13 @@ package org.apache.shenyu.plugin.logging.common;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectUtils;
@@ -36,24 +40,23 @@ import static
org.apache.shenyu.plugin.logging.common.constant.GenericLoggingCon
* abstract logging plugin.
*/
public abstract class AbstractLoggingPlugin extends AbstractShenyuPlugin {
-
+
+ /**
+ * LogCollector.
+ *
+ * @return LogCollector
+ */
+ protected abstract LogCollector logCollector();
+
/**
- * log collector execute.
+ * pluginEnum.
*
- * @param exchange web exchange
- * @param chain shenyu plugin chain
- * @param selector selector data
- * @param rule rule data
- * @param request server http request
- * @param requestInfo request information
- * @return mono
+ * @return PluginEnum
*/
- protected abstract Mono<Void> doLogExecute(ServerWebExchange exchange,
ShenyuPluginChain chain,
- SelectorData selector, RuleData
rule,
- ServerHttpRequest request,
ShenyuRequestLog requestInfo);
+ protected abstract PluginEnum pluginEnum();
@Override
- protected Mono<Void> doExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
+ public Mono<Void> doExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
final SelectorData selector, final RuleData
rule) {
ServerHttpRequest request = exchange.getRequest();
// control sampling
@@ -69,6 +72,33 @@ public abstract class AbstractLoggingPlugin extends
AbstractShenyuPlugin {
requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
requestInfo.setHost(request.getHeaders().getFirst(HOST));
requestInfo.setPath(request.getURI().getPath());
- return this.doLogExecute(exchange, chain, selector, rule, request,
requestInfo);
+ LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
+ LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
+ requestInfo, this.logCollector());
+ ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
+ .response(loggingServerHttpResponse).build();
+ loggingServerHttpResponse.setExchange(webExchange);
+ return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ }
+
+
+ /**
+ * get plugin order.
+ *
+ * @return order
+ */
+ @Override
+ public int getOrder() {
+ return pluginEnum().getCode();
+ }
+
+ /**
+ * get plugin name.
+ *
+ * @return plugin name
+ */
+ @Override
+ public String named() {
+ return pluginEnum().getName();
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
new file mode 100644
index 000000000..da8b2aa31
--- /dev/null
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.common.client;
+
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.lang.NonNull;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * AbstractLogConsumeClient.
+ */
+public abstract class AbstractLogConsumeClient<T extends GenericGlobalConfig>
implements LogConsumeClient<T> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractLogConsumeClient.class);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private final AtomicReference<Thread> closeThread = new
AtomicReference<>();
+
+ /**
+ * initClient0.
+ *
+ * @param config config
+ */
+ public abstract void initClient0(@NonNull T config);
+
+ /**
+ * consume0.
+ *
+ * @param logs logs
+ * @throws Exception error
+ */
+ public abstract void consume0(List<ShenyuRequestLog> logs) throws
Exception;
+
+ /**
+ * close0.
+ *
+ * @throws Exception error
+ */
+ public abstract void close0() throws Exception;
+
+ @Override
+ public void initClient(final T config) {
+ if (isStarted.get()) {
+ this.close();
+ }
+ if (ObjectUtils.isEmpty(config)) {
+ LOG.error("{} config is null.", this.getClass().getSimpleName());
+ return;
+ }
+ this.initClient0(config);
+ isStarted.set(true);
+ closeThread.set(new Thread(this::close));
+ Runtime.getRuntime().addShutdownHook(closeThread.get());
+ }
+
+ @Override
+ public void close() {
+ if (!ObjectUtils.isEmpty(closeThread.get())) {
+ Runtime.getRuntime().removeShutdownHook(closeThread.get());
+ }
+ if (isStarted.get()) {
+ isStarted.set(false);
+ try {
+ this.close0();
+ } catch (Exception e) {
+ LOG.error("{} close error.", this.getClass().getSimpleName());
+ }
+ }
+ }
+
+ @Override
+ public void consume(final List<ShenyuRequestLog> logs) throws Exception {
+ if (!isStarted.get()) {
+ return;
+ }
+ this.consume0(logs);
+ }
+}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
similarity index 57%
rename from
shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
rename to
shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
index f423d023f..46c42b833 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
@@ -17,27 +17,35 @@
package org.apache.shenyu.plugin.logging.common.handler;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.SelectorTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
- * AbstractPluginDataHandler.
+ * AbstractLogPluginDataHandler.
*/
-public abstract class AbstractPluginDataHandler<C extends GenericApiConfig>
implements PluginDataHandler {
+public abstract class AbstractLogPluginDataHandler<T extends
GenericGlobalConfig, C extends GenericApiConfig> implements PluginDataHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractPluginDataHandler.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractLogPluginDataHandler.class);
private static final String EMPTY_JSON = "{}";
@@ -45,12 +53,6 @@ public abstract class AbstractPluginDataHandler<C extends
GenericApiConfig> impl
private static final Map<String, GenericApiConfig> SELECT_API_CONFIG_MAP =
new ConcurrentHashMap<>();
- private final Class<C> clazz;
-
- protected AbstractPluginDataHandler(final Class<C> clazz) {
- this.clazz = clazz;
- }
-
/**
* get selectId uriList map.
*
@@ -69,25 +71,69 @@ public abstract class AbstractPluginDataHandler<C extends
GenericApiConfig> impl
return SELECT_API_CONFIG_MAP;
}
+ /**
+ * LogCollector.
+ *
+ * @return LogCollector
+ */
+ protected abstract LogCollector logCollector();
+
+ /**
+ * LogCollector.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
+ protected abstract void doRefreshConfig(T globalLogConfig);
+
+ @Override
+ public void handlerPlugin(final PluginData pluginData) {
+ ParameterizedType parameterizedType = (ParameterizedType)
this.getClass().getGenericSuperclass();
+ Type[] actualTypeArguments =
parameterizedType.getActualTypeArguments();
+ final Class<T> globalLogConfigClass = (Class<T>)
actualTypeArguments[0];
+ LOG.info("handler {} Plugin data: {}", pluginNamed(),
GsonUtils.getGson().toJson(pluginData));
+ if (Objects.nonNull(pluginData) &&
Boolean.TRUE.equals(pluginData.getEnabled())) {
+ T globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(), globalLogConfigClass);
+ T exist = Singleton.INST.get(globalLogConfigClass);
+ if (Objects.isNull(globalLogConfig)) {
+ return;
+ }
+ if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
+ // no data, init client
+ this.doRefreshConfig(globalLogConfig);
+ logCollector().start();
+ }
+ Singleton.INST.single(globalLogConfigClass, globalLogConfig);
+ } else {
+ try {
+ logCollector().close();
+ } catch (Exception e) {
+ LOG.error("{} close log collector error",
this.getClass().getSimpleName(), e);
+ }
+ }
+ }
+
@Override
public void handlerSelector(final SelectorData selectorData) {
+ ParameterizedType parameterizedType = (ParameterizedType)
this.getClass().getGenericSuperclass();
+ Type[] actualTypeArguments =
parameterizedType.getActualTypeArguments();
+ final Class<C> genericApiConfigClass = (Class<C>)
actualTypeArguments[1];
LOG.info("handler {} selector data:{}", pluginNamed(),
GsonUtils.getGson().toJson(selectorData));
String handleJson = selectorData.getHandle();
if (StringUtils.isEmpty(handleJson) ||
EMPTY_JSON.equals(handleJson.trim())) {
return;
}
if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
- || CollectionUtils.isEmpty(selectorData.getConditionList())) {
+ || CollectionUtils.isEmpty(selectorData.getConditionList())) {
return;
}
- GenericApiConfig logApiConfig =
GsonUtils.getInstance().fromJson(handleJson, clazz);
+ GenericApiConfig logApiConfig =
GsonUtils.getInstance().fromJson(handleJson, genericApiConfigClass);
if (StringUtils.isBlank(logApiConfig.getTopic()) ||
StringUtils.isBlank(logApiConfig.getSampleRate())) {
return;
}
List<String> uriList = new ArrayList<>();
for (ConditionData conditionData : selectorData.getConditionList()) {
if ("uri".equals(conditionData.getParamType()) &&
StringUtils.isNotBlank(conditionData.getParamValue())
- && ("match".equals(conditionData.getOperator()) ||
"=".equals(conditionData.getOperator()))) {
+ && ("match".equals(conditionData.getOperator()) ||
"=".equals(conditionData.getOperator()))) {
uriList.add(conditionData.getParamValue().trim());
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
index 31e10cd6a..babb1e74d 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
@@ -37,6 +37,7 @@ public class CountSampler implements Sampler {
/**
* Fills a bitset with decisions according to the supplied probability.
+ * @param probability probability
*/
public CountSampler(final float probability) {
counter = new AtomicInteger();
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
index 821bdea00..e496441af 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.logging.elasticsearch;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import
org.apache.shenyu.plugin.logging.elasticsearch.collector.ElasticSearchLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
/**
* Integrated elasticsearch collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingElasticSearchPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingElasticSearchServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingElasticSearchServerResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, ElasticSearchLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingElasticSearchServerHttpRequest)
- .response(loggingElasticSearchServerResponse).build();
- loggingElasticSearchServerResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingElasticSearchServerResponse::logError);
+ protected LogCollector logCollector() {
+ return ElasticSearchLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return plugin order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_ELASTIC_SEARCH.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_ELASTIC_SEARCH.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_ELASTIC_SEARCH;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
index da97481e4..85f83ed2f 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
@@ -30,7 +30,7 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import
org.apache.shenyu.plugin.logging.elasticsearch.config.ElasticSearchLogCollectConfig;
@@ -43,12 +43,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* queue-based logging collector.
*/
-public class ElasticSearchLogCollectClient implements
LogConsumeClient<ElasticSearchLogCollectConfig.ElasticSearchLogConfig> {
+public class ElasticSearchLogCollectClient extends
AbstractLogConsumeClient<ElasticSearchLogCollectConfig.ElasticSearchLogConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticSearchLogCollectClient.class);
@@ -58,15 +57,13 @@ public class ElasticSearchLogCollectClient implements
LogConsumeClient<ElasticSe
private ElasticsearchClient client;
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
/**
* init elasticsearch client.
*
* @param config elasticsearch client config
*/
@Override
- public void initClient(final
ElasticSearchLogCollectConfig.ElasticSearchLogConfig config) {
+ public void initClient0(final
ElasticSearchLogCollectConfig.ElasticSearchLogConfig config) {
RestClientBuilder builder = RestClient
.builder(new HttpHost(config.getHost(),
Integer.parseInt(config.getPort())));
@@ -92,13 +89,11 @@ public class ElasticSearchLogCollectClient implements
LogConsumeClient<ElasticSe
createIndex(GenericLoggingConstant.INDEX);
LOG.info("create index success");
}
- isStarted.set(true);
- Runtime.getRuntime().addShutdownHook(new Thread(this::close));
}
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
List<BulkOperation> bulkOperations = new ArrayList<>();
@@ -150,8 +145,8 @@ public class ElasticSearchLogCollectClient implements
LogConsumeClient<ElasticSe
* close client.
*/
@Override
- public void close() {
- if (Objects.nonNull(restClient) && isStarted.get()) {
+ public void close0() {
+ if (Objects.nonNull(restClient)) {
try {
transport.close();
} catch (IOException e) {
@@ -162,7 +157,6 @@ public class ElasticSearchLogCollectClient implements
LogConsumeClient<ElasticSe
} catch (IOException e) {
LOG.error("restClient close has IOException : ", e);
}
- isStarted.set(false);
}
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
index c680538ac..8cdb03624 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.plugin.logging.elasticsearch.config;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -58,11 +59,11 @@ public class ElasticSearchLogCollectConfig {
private String host;
private String port;
-
+
private String username;
-
+
private String password;
-
+
private Boolean authCache;
private String compressAlg;
@@ -171,6 +172,31 @@ public class ElasticSearchLogCollectConfig {
public void setAuthCache(final Boolean authCache) {
this.authCache = authCache;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return Boolean.TRUE;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return Boolean.FALSE;
+ }
+
+ ElasticSearchLogConfig that = (ElasticSearchLogConfig) o;
+ return Objects.equals(getHost(), that.getHost())
+ && Objects.equals(getCompressAlg(), that.getCompressAlg())
+ && Objects.equals(getPort(), that.getPort())
+ && Objects.equals(getSampleRate(), that.getSampleRate())
+ && Objects.equals(getBufferQueueSize(),
that.getBufferQueueSize())
+ && Objects.equals(getMaxResponseBody(),
that.getMaxRequestBody())
+ && Objects.equals(getMaxRequestBody(),
that.getMaxResponseBody());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, compressAlg, port);
+ }
}
/**
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
index 6a2c88ad5..82e3d8fe5 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
@@ -17,95 +17,38 @@
package org.apache.shenyu.plugin.logging.elasticsearch.handler;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.common.dto.ConditionData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.enums.SelectorTypeEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import
org.apache.shenyu.plugin.logging.elasticsearch.client.ElasticSearchLogCollectClient;
import
org.apache.shenyu.plugin.logging.elasticsearch.collector.ElasticSearchLogCollector;
import
org.apache.shenyu.plugin.logging.elasticsearch.config.ElasticSearchLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* The type logging elasticsearch plugin data handler.
*/
-public class LoggingElasticSearchPluginDataHandler implements
PluginDataHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingElasticSearchPluginDataHandler.class);
+public class LoggingElasticSearchPluginDataHandler extends
AbstractLogPluginDataHandler<ElasticSearchLogCollectConfig.ElasticSearchLogConfig,
GenericApiConfig> {
private static final ElasticSearchLogCollectClient
ELASTICSEARCH_LOG_COLLECT_CLIENT = new ElasticSearchLogCollectClient();
- private static final String EMPTY_JSON = "{}";
-
- private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP =
new ConcurrentHashMap<>();
-
- private static final Map<String,
ElasticSearchLogCollectConfig.LogApiConfig> SELECT_API_CONFIG_MAP = new
ConcurrentHashMap<>();
-
/**
- * start or close elasticsearch client.
+ * logCollector.
*/
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("handler loggingElasticSearch Plugin data:{}",
GsonUtils.getGson().toJson(pluginData));
- if (pluginData.getEnabled()) {
- ElasticSearchLogCollectConfig.ElasticSearchLogConfig
globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-
ElasticSearchLogCollectConfig.ElasticSearchLogConfig.class);
-
-
ElasticSearchLogCollectConfig.INSTANCE.setElasticSearchLogConfig(globalLogConfig);
- ELASTICSEARCH_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- ElasticSearchLogCollector.getInstance().start();
- } else {
- try {
- ElasticSearchLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
- }
-
- @Override
- public void handlerSelector(final SelectorData selectorData) {
- LOG.info("handler loggingElasticSearch selector data:{}",
GsonUtils.getGson().toJson(selectorData));
- String handleJson = selectorData.getHandle();
- if (StringUtils.isEmpty(handleJson) ||
EMPTY_JSON.equals(handleJson.trim())) {
- return;
- }
- if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
- || CollectionUtils.isEmpty(selectorData.getConditionList())) {
- return;
- }
- ElasticSearchLogCollectConfig.LogApiConfig logApiConfig =
GsonUtils.getInstance().fromJson(handleJson,
- ElasticSearchLogCollectConfig.LogApiConfig.class);
- if (StringUtils.isBlank(logApiConfig.getIndex()) ||
StringUtils.isBlank(logApiConfig.getSampleRate())) {
- return;
- }
- List<String> uriList = new ArrayList<>();
- for (ConditionData conditionData : selectorData.getConditionList()) {
- if ("uri".equals(conditionData.getParamType()) &&
StringUtils.isNotBlank(conditionData.getParamValue())
- && ("match".equals(conditionData.getOperator()) ||
"=".equals(conditionData.getOperator()))) {
- uriList.add(conditionData.getParamValue().trim());
- }
- }
- SELECT_ID_URI_LIST_MAP.put(selectorData.getId(), uriList);
- SELECT_API_CONFIG_MAP.put(selectorData.getId(), logApiConfig);
+ protected LogCollector logCollector() {
+ return ElasticSearchLogCollector.getInstance();
}
+ /**
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
@Override
- public void removeSelector(final SelectorData selectorData) {
- LOG.info("handler remove loggingElasticSearch selector data:{}",
GsonUtils.getGson().toJson(selectorData));
- SELECT_ID_URI_LIST_MAP.remove(selectorData.getId());
- SELECT_API_CONFIG_MAP.remove(selectorData.getId());
+ protected void doRefreshConfig(final
ElasticSearchLogCollectConfig.ElasticSearchLogConfig globalLogConfig) {
+
ElasticSearchLogCollectConfig.INSTANCE.setElasticSearchLogConfig(globalLogConfig);
+ ELASTICSEARCH_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
@@ -120,20 +63,4 @@ public class LoggingElasticSearchPluginDataHandler
implements PluginDataHandler
public static ElasticSearchLogCollectClient
getElasticSearchLogCollectClient() {
return ELASTICSEARCH_LOG_COLLECT_CLIENT;
}
-
- /**
- * get selectId uriList map.
- * @return selectId uriList map
- */
- public static Map<String, List<String>> getSelectIdUriListMap() {
- return SELECT_ID_URI_LIST_MAP;
- }
-
- /**
- * get select api config map.
- * @return select api config map
- */
- public static Map<String, ElasticSearchLogCollectConfig.LogApiConfig>
getSelectApiConfigMap() {
- return SELECT_API_CONFIG_MAP;
- }
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
index 788a0a72f..cfb4325aa 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingElasticSearchPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingElasticSearchPlugin.doLogExecute(exchange,
chain, selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingElasticSearchPlugin.doExecute(exchange,
chain, selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
index 36831a3c5..54c6c43da 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
@@ -48,7 +48,7 @@ public final class LoggingElasticSearchPluginDataHandlerTest {
this.loggingElasticSearchPluginDataHandler = new
LoggingElasticSearchPluginDataHandler();
selectorData.setId("1");
selectorData.setType(1);
- selectorData.setHandle("{\"index\":\"test\", \"sampleRate\":\"1\"}");
+ selectorData.setHandle("{\"index\":\"test\", \"sampleRate\":\"1\",
\"topic\":\"1\"}");
conditionData.setParamName("id");
conditionData.setParamType("uri");
conditionData.setParamValue("11");
@@ -73,9 +73,9 @@ public final class LoggingElasticSearchPluginDataHandlerTest {
@Test
public void testHandlerSelector() throws NoSuchFieldException,
IllegalAccessException {
loggingElasticSearchPluginDataHandler.handlerSelector(selectorData);
- Field field1 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ Field field1 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
field1.setAccessible(true);
- Field field2 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ Field field2 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
field2.setAccessible(true);
Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
Assertions.assertNotEquals(field2.get("1").toString(), "{}");
@@ -84,16 +84,16 @@ public final class
LoggingElasticSearchPluginDataHandlerTest {
@Test
public void testRemoveSelector() throws NoSuchFieldException,
IllegalAccessException {
loggingElasticSearchPluginDataHandler.handlerSelector(selectorData);
- Field field1 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ Field field1 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
field1.setAccessible(true);
- Field field2 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ Field field2 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
field2.setAccessible(true);
Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
Assertions.assertNotEquals(field2.get("1").toString(), "{}");
loggingElasticSearchPluginDataHandler.removeSelector(selectorData);
- Field field3 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ Field field3 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
field3.setAccessible(true);
- Field field4 =
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ Field field4 =
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
field4.setAccessible(true);
Assertions.assertEquals(field3.get("1").toString(), "{}");
Assertions.assertEquals(field4.get("1").toString(), "{}");
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
index 46fc15cf6..e7dda6876 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.logging.kafka;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
/**
* Integrated kafka collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingKafkaPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, KafkaLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ protected LogCollector logCollector() {
+ return KafkaLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_KAFKA.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_KAFKA.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_KAFKA;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index 75c311fb7..782b94002 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -17,13 +17,6 @@
package org.apache.shenyu.plugin.logging.kafka.client;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.collections4.CollectionUtils;
@@ -38,25 +31,26 @@ import
org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
/**
* queue-based logging collector.
*/
-public class KafkaLogCollectClient implements
LogConsumeClient<KafkaLogCollectConfig.KafkaLogConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaLogCollectClient.class);
+public class KafkaLogCollectClient extends
AbstractLogConsumeClient<KafkaLogCollectConfig.KafkaLogConfig> {
private static Map<String, String> apiTopicMap = new HashMap<>();
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
private KafkaProducer<String, String> producer;
private String topic;
@@ -67,16 +61,13 @@ public class KafkaLogCollectClient implements
LogConsumeClient<KafkaLogCollectCo
* @param config kafka props
*/
@Override
- public void initClient(final KafkaLogCollectConfig.KafkaLogConfig config) {
+ public void initClient0(final KafkaLogCollectConfig.KafkaLogConfig config)
{
if (Objects.isNull(config)
|| StringUtils.isBlank(config.getNamesrvAddr())
|| StringUtils.isBlank(config.getTopic())) {
LOG.error("kafka props is empty. failed init kafka producer");
return;
}
- if (isStarted.get()) {
- close();
- }
String topic = "shenyu-access-logging";
String nameserverAddress = config.getNamesrvAddr();
if (StringUtils.isBlank(topic) ||
StringUtils.isBlank(nameserverAddress)) {
@@ -94,7 +85,6 @@ public class KafkaLogCollectClient implements
LogConsumeClient<KafkaLogCollectCo
try {
producer.send(record);
LOG.info("init kafkaLogCollectClient success");
- isStarted.set(true);
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
// We can't recover from these exceptions, so our only option is
to close the producer and exit.
LOG.error("Init kafkaLogCollectClient error, We can't recover from
these exceptions, so our only option is to close the producer and exit", e);
@@ -112,8 +102,8 @@ public class KafkaLogCollectClient implements
LogConsumeClient<KafkaLogCollectCo
* @param logs list of log
*/
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
logs.forEach(log -> {
@@ -156,10 +146,9 @@ public class KafkaLogCollectClient implements
LogConsumeClient<KafkaLogCollectCo
* close producer.
*/
@Override
- public void close() {
- if (Objects.nonNull(producer) && isStarted.get()) {
+ public void close0() {
+ if (Objects.nonNull(producer)) {
producer.close();
- isStarted.set(false);
}
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
index 9015872c6..3dce5bc27 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
@@ -17,10 +17,12 @@
package org.apache.shenyu.plugin.logging.kafka.config;
-import java.util.Optional;
import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Objects;
+import java.util.Optional;
+
/**
* log collect config, include kafka config.
* Topic and nameserver must be included, and others are optional.
@@ -134,6 +136,32 @@ public class KafkaLogCollectConfig {
public void setProducerGroup(final String producerGroup) {
this.producerGroup = producerGroup;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return Boolean.TRUE;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return Boolean.FALSE;
+ }
+
+ KafkaLogConfig that = (KafkaLogConfig) o;
+ return Objects.equals(getTopic(), that.getTopic())
+ && Objects.equals(getCompressAlg(), that.getCompressAlg())
+ && Objects.equals(getNamesrvAddr(), that.getNamesrvAddr())
+ && Objects.equals(getProducerGroup(),
that.getProducerGroup())
+ && Objects.equals(getSampleRate(), that.getSampleRate())
+ && Objects.equals(getBufferQueueSize(),
that.getBufferQueueSize())
+ && Objects.equals(getMaxResponseBody(),
that.getMaxRequestBody())
+ && Objects.equals(getMaxRequestBody(),
that.getMaxResponseBody());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, compressAlg, namesrvAddr,
producerGroup);
+ }
}
/**
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
index ba746a296..0abd6c2b8 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -17,63 +17,50 @@
package org.apache.shenyu.plugin.logging.kafka.handler;
-import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The type logging kafka plugin data handler.
*/
-public class LoggingKafkaPluginDataHandler extends
AbstractPluginDataHandler<KafkaLogCollectConfig.LogApiConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingKafkaPluginDataHandler.class);
+public class LoggingKafkaPluginDataHandler extends
AbstractLogPluginDataHandler<KafkaLogCollectConfig.KafkaLogConfig,
KafkaLogCollectConfig.LogApiConfig> {
private static final KafkaLogCollectClient KAFKA_LOG_COLLECT_CLIENT = new
KafkaLogCollectClient();
- public LoggingKafkaPluginDataHandler() {
- super(KafkaLogCollectConfig.LogApiConfig.class);
- }
-
/**
- * get kafka log collect client.
- *
- * @return kafka log collect client.
+ * logCollector.
*/
- public static KafkaLogCollectClient getKafkaLogCollectClient() {
- return KAFKA_LOG_COLLECT_CLIENT;
+ @Override
+ protected LogCollector logCollector() {
+ return KafkaLogCollector.getInstance();
}
/**
- * start or close kafka client.
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
*/
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("handler loggingKafka Plugin data:{}",
GsonUtils.getGson().toJson(pluginData));
- if (pluginData.getEnabled()) {
- KafkaLogCollectConfig.KafkaLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
- KafkaLogCollectConfig.KafkaLogConfig.class);
-
- KafkaLogCollectConfig.INSTANCE.setKafkaLogConfig(globalLogConfig);
- // start kafka producer
- KAFKA_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- KafkaLogCollector.getInstance().start();
- } else {
- try {
- KafkaLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
+ protected void doRefreshConfig(final KafkaLogCollectConfig.KafkaLogConfig
globalLogConfig) {
+ KafkaLogCollectConfig.INSTANCE.setKafkaLogConfig(globalLogConfig);
+ KAFKA_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
public String pluginNamed() {
return PluginEnum.LOGGING_KAFKA.getName();
}
+
+ /**
+ * get kafka log collect client.
+ *
+ * @return kafka log collect client.
+ */
+ public static KafkaLogCollectClient getKafkaLogCollectClient() {
+ return KAFKA_LOG_COLLECT_CLIENT;
+ }
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
index 8c708202e..64a63ff31 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
@@ -89,7 +89,7 @@ public final class LoggingKafkaPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingKafkaPlugin.doLogExecute(exchange, chain,
selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingKafkaPlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
index 5c958b039..684e0ab02 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
@@ -17,19 +17,20 @@
package org.apache.shenyu.plugin.logging.kafka.handler;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.shenyu.common.dto.ConditionData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.SelectorData;
-import
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* The Test Case For LoggingKafkaPluginDataHandler.
*/
@@ -71,18 +72,18 @@ public class LoggingKafkaPluginDataHandlerTest {
@Test
public void testHandlerSelector() {
loggingKafkaPluginDataHandler.handlerSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
-
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
+
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
}
@Test
public void testRemoveSelector() {
loggingKafkaPluginDataHandler.handlerSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
-
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
+
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
loggingKafkaPluginDataHandler.removeSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{}");
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
}
@Test
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
index ce02b3b3a..9f9f566cc 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.logging.pulsar;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
/**
* Integrated pulsar collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingPulsarPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, PulsarLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ protected LogCollector logCollector() {
+ return PulsarLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_PULSAR.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_PULSAR.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_PULSAR;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
index b5d676cd1..f8906e557 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
@@ -35,45 +35,34 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* queue-based logging collector.
*/
-public class PulsarLogCollectClient implements
LogConsumeClient<PulsarLogCollectConfig.PulsarLogConfig> {
+public class PulsarLogCollectClient extends
AbstractLogConsumeClient<PulsarLogCollectConfig.PulsarLogConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarLogCollectClient.class);
private PulsarClient client;
private Producer<byte[]> producer;
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
/**
* init producer.
*
* @param config pulsar props
*/
@Override
- public void initClient(final PulsarLogCollectConfig.PulsarLogConfig
config) {
- if (Objects.isNull(config)) {
- LOG.error("Pulsar config is empty. Fail to init Pulsar producer.");
- return;
- }
+ public void initClient0(final PulsarLogCollectConfig.PulsarLogConfig
config) {
String topic = config.getTopic();
String serviceUrl = config.getServiceUrl();
if (StringUtils.isBlank(topic) || StringUtils.isBlank(serviceUrl)) {
LOG.error("init PulsarLogCollectClient error, please check topic
or serviceUrl.");
return;
}
- if (isStarted.get()) {
- close();
- }
try {
client = PulsarClient.builder().serviceUrl(serviceUrl).build();
producer = client.newProducer().topic(topic).create();
LOG.info("init PulsarLogCollectClient success.");
- isStarted.set(true);
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
} catch (PulsarClientException e) {
@@ -83,8 +72,8 @@ public class PulsarLogCollectClient implements
LogConsumeClient<PulsarLogCollect
}
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
logs.forEach(log -> {
@@ -110,12 +99,11 @@ public class PulsarLogCollectClient implements
LogConsumeClient<PulsarLogCollect
}
@Override
- public void close() {
- if (Objects.nonNull(producer) && isStarted.get()) {
+ public void close0() {
+ if (Objects.nonNull(producer)) {
try {
producer.close();
client.close();
- isStarted.set(false);
} catch (PulsarClientException e) {
LOG.error("fail to close PulsarLogCollectClient, e", e);
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
index 2cb489979..65dcfb51d 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
@@ -17,8 +17,10 @@
package org.apache.shenyu.plugin.logging.pulsar.config;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Objects;
import java.util.Optional;
public class PulsarLogCollectConfig {
@@ -104,9 +106,34 @@ public class PulsarLogCollectConfig {
public void setServiceUrl(final String serviceUrl) {
this.serviceUrl = serviceUrl;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return Boolean.TRUE;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return Boolean.FALSE;
+ }
+
+ PulsarLogConfig that = (PulsarLogConfig) o;
+ return Objects.equals(getTopic(), that.getTopic())
+ && Objects.equals(getCompressAlg(), that.getCompressAlg())
+ && Objects.equals(getServiceUrl(), that.getServiceUrl())
+ && Objects.equals(getSampleRate(), that.getSampleRate())
+ && Objects.equals(getBufferQueueSize(),
that.getBufferQueueSize())
+ && Objects.equals(getMaxResponseBody(),
that.getMaxRequestBody())
+ && Objects.equals(getMaxRequestBody(),
that.getMaxResponseBody());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, compressAlg, serviceUrl);
+ }
}
- public static class LogApiConfig {
+ public static class LogApiConfig extends GenericApiConfig {
/**
* 0 means never sample, 1 means always sample. Minimum probability is
0.01, or 1% of logging
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
index 3ebaa3b4f..75b42141d 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
@@ -17,62 +17,38 @@
package org.apache.shenyu.plugin.logging.pulsar.handler;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* The type logging pulsar plugin data handler.
*/
-public class LoggingPulsarPluginDataHandler implements PluginDataHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingPulsarPluginDataHandler.class);
+public class LoggingPulsarPluginDataHandler extends
AbstractLogPluginDataHandler<PulsarLogCollectConfig.PulsarLogConfig,
PulsarLogCollectConfig.LogApiConfig> {
private static final PulsarLogCollectClient PULSAR_LOG_COLLECT_CLIENT =
new PulsarLogCollectClient();
- private static final String EMPTY_JSON = "{}";
-
- private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP =
new ConcurrentHashMap<>();
-
- @Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("handler loggingPulsar Plugin data:{}",
GsonUtils.getGson().toJson(pluginData));
- if (pluginData.getEnabled()) {
- PulsarLogCollectConfig.PulsarLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
- PulsarLogCollectConfig.PulsarLogConfig.class);
-
PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
- // start pulsar producer
- PULSAR_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- PulsarLogCollector.getInstance().start();
- } else {
- try {
- PulsarLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
- }
-
+ /**
+ * logCollector.
+ */
@Override
- public void handlerSelector(final SelectorData selectorData) {
- PluginDataHandler.super.handlerSelector(selectorData);
+ protected LogCollector logCollector() {
+ return PulsarLogCollector.getInstance();
}
+ /**
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
@Override
- public void removeSelector(final SelectorData selectorData) {
- PluginDataHandler.super.removeSelector(selectorData);
+ protected void doRefreshConfig(final
PulsarLogCollectConfig.PulsarLogConfig globalLogConfig) {
+ PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
+ PULSAR_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
index 52ae59f98..3ee060937 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
@@ -87,7 +87,7 @@ public class LoggingPulsarPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingPulsarPlugin.doLogExecute(exchange, chain,
selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingPulsarPlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
index c9ea49432..2f4837a0a 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
@@ -61,7 +61,7 @@ public class PulsarLogCollectClientTest {
} catch (Exception e) {
msg = "false";
}
- Assertions.assertEquals(msg, "");
+ Assertions.assertEquals(msg, "false");
pulsarLogCollectClient.close();
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
index 90c59621a..4f0caaccb 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.logging.rocketmq;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
import
org.apache.shenyu.plugin.logging.rocketmq.collector.RocketMQLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
/**
* Integrated rocketmq collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingRocketMQPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, RocketMQLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ protected LogCollector logCollector() {
+ return RocketMQLogCollector.getInstance();
}
/**
- * get plugin order.
+ * pluginEnum.
*
- * @return order
+ * @return plugin
*/
@Override
- public int getOrder() {
- return PluginEnum.LOGGING_ROCKETMQ.getCode();
- }
-
- /**
- * get plugin name.
- *
- * @return plugin name
- */
- @Override
- public String named() {
- return PluginEnum.LOGGING_ROCKETMQ.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_ROCKETMQ;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
index 7ed5885ae..f7e2b9cb9 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
@@ -41,12 +41,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* queue-based logging collector.
*/
-public class RocketMQLogCollectClient implements
LogConsumeClient<RocketMQLogCollectConfig.RocketMQLogConfig> {
+public class RocketMQLogCollectClient extends
AbstractLogConsumeClient<RocketMQLogCollectConfig.RocketMQLogConfig> {
private static final Logger LOG =
LoggerFactory.getLogger(RocketMQLogCollectClient.class);
@@ -58,22 +57,13 @@ public class RocketMQLogCollectClient implements
LogConsumeClient<RocketMQLogCol
private String topic;
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
/**
* init producer.
*
* @param config rocketmq props
*/
@Override
- public void initClient(final RocketMQLogCollectConfig.RocketMQLogConfig
config) {
- if (Objects.isNull(config)) {
- LOG.error("RocketMQ props is empty. failed init RocketMQ
producer");
- return;
- }
- if (isStarted.get()) {
- close();
- }
+ public void initClient0(final RocketMQLogCollectConfig.RocketMQLogConfig
config) {
String topic = config.getTopic();
String nameserverAddress = config.getNamesrvAddr();
String producerGroup = config.getProducerGroup();
@@ -90,7 +80,6 @@ public class RocketMQLogCollectClient implements
LogConsumeClient<RocketMQLogCol
try {
producer.start();
LOG.info("init RocketMQLogCollectClient success");
- isStarted.set(true);
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
} catch (Exception e) {
LOG.error("init RocketMQLogCollectClient error", e);
@@ -115,8 +104,8 @@ public class RocketMQLogCollectClient implements
LogConsumeClient<RocketMQLogCol
* @param logs list of log
*/
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
logs.forEach(log -> {
@@ -158,10 +147,9 @@ public class RocketMQLogCollectClient implements
LogConsumeClient<RocketMQLogCol
* close producer.
*/
@Override
- public void close() {
- if (Objects.nonNull(producer) && isStarted.get()) {
+ public void close0() {
+ if (Objects.nonNull(producer)) {
producer.shutdown();
- isStarted.set(false);
}
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
index 9772778a4..b241443eb 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
@@ -17,10 +17,12 @@
package org.apache.shenyu.plugin.logging.rocketmq.config;
-import java.util.Optional;
import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import java.util.Objects;
+import java.util.Optional;
+
/**
* log collect config, include rocketmq config.
* Topic and nameserver must be included, and others are optional.
@@ -174,6 +176,34 @@ public class RocketMQLogCollectConfig {
public void setSecretKey(final String secretKey) {
this.secretKey = secretKey;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return Boolean.TRUE;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return Boolean.FALSE;
+ }
+
+ RocketMQLogConfig that = (RocketMQLogConfig) o;
+ return Objects.equals(getTopic(), that.getTopic())
+ && Objects.equals(getCompressAlg(), that.getCompressAlg())
+ && Objects.equals(getNamesrvAddr(), that.getNamesrvAddr())
+ && Objects.equals(getProducerGroup(),
that.getProducerGroup())
+ && Objects.equals(getAccessKey(), that.getAccessKey())
+ && Objects.equals(getSecretKey(), that.getSecretKey())
+ && Objects.equals(getSampleRate(), that.getSampleRate())
+ && Objects.equals(getBufferQueueSize(),
that.getBufferQueueSize())
+ && Objects.equals(getMaxResponseBody(),
that.getMaxRequestBody())
+ && Objects.equals(getMaxRequestBody(),
that.getMaxResponseBody());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, compressAlg, secretKey, accessKey,
namesrvAddr, producerGroup);
+ }
}
/**
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
index dc097feb4..310edb080 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
@@ -17,49 +17,38 @@
package org.apache.shenyu.plugin.logging.rocketmq.handler;
-import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import
org.apache.shenyu.plugin.logging.rocketmq.client.RocketMQLogCollectClient;
import
org.apache.shenyu.plugin.logging.rocketmq.collector.RocketMQLogCollector;
import
org.apache.shenyu.plugin.logging.rocketmq.config.RocketMQLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The type logging rocketmq plugin data handler.
*/
-public class LoggingRocketMQPluginDataHandler extends
AbstractPluginDataHandler<RocketMQLogCollectConfig.LogApiConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingRocketMQPluginDataHandler.class);
+public class LoggingRocketMQPluginDataHandler extends
AbstractLogPluginDataHandler<RocketMQLogCollectConfig.RocketMQLogConfig,
GenericApiConfig> {
private static final RocketMQLogCollectClient ROCKET_MQ_LOG_COLLECT_CLIENT
= new RocketMQLogCollectClient();
- public LoggingRocketMQPluginDataHandler() {
- super(RocketMQLogCollectConfig.LogApiConfig.class);
+ /**
+ * logCollector.
+ */
+ @Override
+ protected LogCollector logCollector() {
+ return RocketMQLogCollector.getInstance();
}
/**
- * start or close rocketMQ client.
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
*/
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("handler loggingRocketMQ Plugin data:{}",
GsonUtils.getGson().toJson(pluginData));
- if (pluginData.getEnabled()) {
- RocketMQLogCollectConfig.RocketMQLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
- RocketMQLogCollectConfig.RocketMQLogConfig.class);
-
RocketMQLogCollectConfig.INSTANCE.setRocketMQLogConfig(globalLogConfig);
- // start rocketmq producer
- ROCKET_MQ_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- RocketMQLogCollector.getInstance().start();
- } else {
- try {
- RocketMQLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
+ protected void doRefreshConfig(final
RocketMQLogCollectConfig.RocketMQLogConfig globalLogConfig) {
+
RocketMQLogCollectConfig.INSTANCE.setRocketMQLogConfig(globalLogConfig);
+ ROCKET_MQ_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
index a9dd90a9e..396e1491d 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingRocketMQPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingRocketMQPlugin.doLogExecute(exchange,
chain, selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingRocketMQPlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
index 3cdd79f7a..da588c9b5 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
@@ -17,19 +17,20 @@
package org.apache.shenyu.plugin.logging.rocketmq.handler;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.shenyu.common.dto.ConditionData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.SelectorData;
-import
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import
org.apache.shenyu.plugin.logging.rocketmq.client.RocketMQLogCollectClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* The Test Case For LoggingRocketMQPluginDataHandler.
*/
@@ -73,18 +74,18 @@ public class LoggingRocketMQPluginDataHandlerTest {
@Test
public void testHandlerSelector() {
loggingRocketMQPluginDataHandler.handlerSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
-
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
+
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
}
@Test
public void testRemoveSelector() {
loggingRocketMQPluginDataHandler.handlerSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
-
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{1=[11]}");
+
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
loggingRocketMQPluginDataHandler.removeSelector(selectorData);
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
"{}");
-
Assertions.assertEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
"{}");
+
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
"{}");
}
@Test
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
index f18ddb4d1..649159c6b 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
@@ -17,18 +17,10 @@
package org.apache.shenyu.plugin.tencent.cls;
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
-import
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
/**
* LoggingTencentClsPlugin send log to Tencent cls service.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
public class LoggingTencentClsPlugin extends AbstractLoggingPlugin {
@Override
- public Mono<Void> doLogExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain,
- final SelectorData selector, final RuleData
rule,
- final ServerHttpRequest request, final
ShenyuRequestLog requestInfo) {
- LoggingServerHttpRequest loggingServerHttpRequest = new
LoggingServerHttpRequest(request, requestInfo);
- LoggingServerHttpResponse loggingServerHttpResponse = new
LoggingServerHttpResponse(exchange.getResponse(),
- requestInfo, TencentClsSlsLogCollector.getInstance());
- ServerWebExchange webExchange =
exchange.mutate().request(loggingServerHttpRequest)
- .response(loggingServerHttpResponse).build();
- loggingServerHttpResponse.setExchange(webExchange);
- return
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
- }
-
- /**
- * get plugin order.
- *
- * @return order
- */
- @Override
- public int getOrder() {
- return PluginEnum.LOGGING_TENCENT_CLS.getCode();
+ protected LogCollector logCollector() {
+ return TencentClsSlsLogCollector.getInstance();
}
/**
- * get plugin name.
+ * pluginEnum.
*
- * @return plugin name
+ * @return plugin
*/
@Override
- public String named() {
- return PluginEnum.LOGGING_TENCENT_CLS.getName();
+ public PluginEnum pluginEnum() {
+ return PluginEnum.LOGGING_TENCENT_CLS;
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
index 7086b4934..d94dc943e 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
@@ -33,7 +33,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.tencent.cls.config.TencentLogCollectConfig;
@@ -48,21 +48,16 @@ import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Tencent cls log Collect client.
*/
-public class TencentClsLogCollectClient implements
LogConsumeClient<TencentLogCollectConfig.TencentClsLogConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(TencentClsLogCollectClient.class);
+public class TencentClsLogCollectClient extends
AbstractLogConsumeClient<TencentLogCollectConfig.TencentClsLogConfig> {
private AsyncProducerClient client;
private String topic;
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
private ThreadPoolExecutor threadExecutor;
/**
@@ -71,14 +66,7 @@ public class TencentClsLogCollectClient implements
LogConsumeClient<TencentLogCo
* @param shenyuConfig shenyu log config
*/
@Override
- public void initClient(final TencentLogCollectConfig.TencentClsLogConfig
shenyuConfig) {
- if (Objects.isNull(shenyuConfig)) {
- LOG.error("Tencent cls props is empty. failed init Tencent cls
producer");
- return;
- }
- if (isStarted.get()) {
- close();
- }
+ public void initClient0(final TencentLogCollectConfig.TencentClsLogConfig
shenyuConfig) {
String secretId = shenyuConfig.getSecretId();
String secretKey = shenyuConfig.getSecretKey();
String endpoint = shenyuConfig.getEndpoint();
@@ -116,7 +104,6 @@ public class TencentClsLogCollectClient implements
LogConsumeClient<TencentLogCo
threadExecutor = createThreadPoolExecutor(config.getSendThreadCount());
try {
- isStarted.set(true);
client = new AsyncProducerClient(config);
} catch (Exception e) {
LOG.warn("TencentClsLogCollectClient initClient error message:{}",
e.getMessage());
@@ -129,17 +116,16 @@ public class TencentClsLogCollectClient implements
LogConsumeClient<TencentLogCo
* @param logs list of log
*/
@Override
- public void consume(final List<ShenyuRequestLog> logs) {
- if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ public void consume0(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs)) {
return;
}
logs.forEach(this::sendLog);
}
@Override
- public void close() {
- if (Objects.nonNull(client) && isStarted.get()) {
- isStarted.set(false);
+ public void close0() {
+ if (Objects.nonNull(client)) {
try {
client.close();
} catch (InterruptedException | ProducerException e) {
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
index 0e932c68d..58a3cf122 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
@@ -17,10 +17,10 @@
package org.apache.shenyu.plugin.tencent.cls.collector;
-import
org.apache.shenyu.plugin.tencent.cls.handler.LoggingTencentClsPluginDataHandler;
import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import
org.apache.shenyu.plugin.tencent.cls.handler.LoggingTencentClsPluginDataHandler;
/**
* Tencent cls log collector,depend a LogConsumeClient for consume logs.
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
index 1692ca842..f0f2ebe49 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
@@ -17,53 +17,38 @@
package org.apache.shenyu.plugin.tencent.cls.handler;
-import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
import org.apache.shenyu.plugin.tencent.cls.client.TencentClsLogCollectClient;
import
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
import org.apache.shenyu.plugin.tencent.cls.config.TencentLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
/**
* LoggingTencentClsPluginDataHandler Tencent cls plugin data handler.
*/
-public class LoggingTencentClsPluginDataHandler implements PluginDataHandler {
-
- private static final Logger LOG =
LoggerFactory.getLogger(LoggingTencentClsPluginDataHandler.class);
+public class LoggingTencentClsPluginDataHandler extends
AbstractLogPluginDataHandler<TencentLogCollectConfig.TencentClsLogConfig,
GenericApiConfig> {
private static final TencentClsLogCollectClient
TENCENT_CLS_LOG_COLLECT_CLIENT = new TencentClsLogCollectClient();
+ /**
+ * logCollector.
+ */
+ @Override
+ protected LogCollector logCollector() {
+ return TencentClsSlsLogCollector.getInstance();
+ }
+
+ /**
+ * doRefreshConfig.
+ *
+ * @param globalLogConfig globalLogConfig
+ */
@Override
- public void handlerPlugin(final PluginData pluginData) {
- LOG.info("Tencent cls plugin data: {}",
GsonUtils.getGson().toJson(pluginData));
- if (Objects.nonNull(pluginData) &&
Boolean.TRUE.equals(pluginData.getEnabled())) {
- TencentLogCollectConfig.TencentClsLogConfig globalLogConfig =
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
- TencentLogCollectConfig.TencentClsLogConfig.class);
- TencentLogCollectConfig.TencentClsLogConfig exist =
Singleton.INST.get(TencentLogCollectConfig.TencentClsLogConfig.class);
- if (Objects.isNull(globalLogConfig)) {
- return;
- }
- if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
- // no data, init client
-
TencentLogCollectConfig.INSTANCE.setTencentClsLogConfig(globalLogConfig);
- // init tencent cls client
- TENCENT_CLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
- TencentClsSlsLogCollector.getInstance().start();
-
Singleton.INST.single(TencentLogCollectConfig.TencentClsLogConfig.class,
globalLogConfig);
- }
- } else {
- try {
- TencentClsSlsLogCollector.getInstance().close();
- } catch (Exception e) {
- LOG.error("close log collector error", e);
- }
- }
+ protected void doRefreshConfig(final
TencentLogCollectConfig.TencentClsLogConfig globalLogConfig) {
+
TencentLogCollectConfig.INSTANCE.setTencentClsLogConfig(globalLogConfig);
+ TENCENT_CLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
}
@Override
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
index 3a1c5c91e..c06c2e225 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingTencentClsPluginTest {
@Test
public void testDoExecute() {
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
- Mono<Void> result = loggingTencentClsPlugin.doLogExecute(exchange,
chain, selectorData, ruleData, request, requestLog);
+ Mono<Void> result = loggingTencentClsPlugin.doExecute(exchange, chain,
selectorData, ruleData);
StepVerifier.create(result).expectSubscription().verifyComplete();
}