This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 128b055ac [type:refactor] refactor shenyu-plugin-logging. (#3910)
128b055ac is described below

commit 128b055ac9ad8412dd7f5487f2213d1c34f459f6
Author: yunlongn <[email protected]>
AuthorDate: Sat Sep 3 00:35:05 2022 +0800

    [type:refactor] refactor shenyu-plugin-logging. (#3910)
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
    
    * [type:refactor] refactor shenyu-plugin-logging.
---
 .../plugin/aliyun/sls/LoggingAliyunSlsPlugin.java  |  40 ++------
 .../sls/client/AliyunSlsLogCollectClient.java      |  39 ++------
 .../aliyun/sls/config/AliyunLogCollectConfig.java  |  30 ++++++
 .../handler/LoggingAliyunSlsPluginDataHandler.java |  61 ++++++------
 .../aliyun/sls/LoggingAliyunSlsPluginTest.java     |   2 +-
 .../clickhouse/LoggingClickHousePlugin.java        |  40 ++------
 .../client/ClickHouseLogCollectClient.java         |  37 ++------
 .../config/ClickHouseLogCollectConfig.java         |   3 +-
 .../LoggingClickHousePluginDataHandler.java        |  52 +++++------
 .../clickhouse/LoggingClickHousePluginTest.java    |   2 +-
 .../logging/common/AbstractLoggingPlugin.java      |  58 +++++++++---
 .../common/client/AbstractLogConsumeClient.java    | 101 ++++++++++++++++++++
 ...dler.java => AbstractLogPluginDataHandler.java} |  78 ++++++++++++----
 .../logging/common/sampler/CountSampler.java       |   1 +
 .../elasticsearch/LoggingElasticSearchPlugin.java  |  40 ++------
 .../client/ElasticSearchLogCollectClient.java      |  20 ++--
 .../config/ElasticSearchLogCollectConfig.java      |  32 ++++++-
 .../LoggingElasticSearchPluginDataHandler.java     | 103 +++------------------
 .../LoggingElasticSearchPluginTest.java            |   2 +-
 .../LoggingElasticSearchPluginDataHandlerTest.java |  14 +--
 .../plugin/logging/kafka/LoggingKafkaPlugin.java   |  40 ++------
 .../kafka/client/KafkaLogCollectClient.java        |  39 +++-----
 .../kafka/config/KafkaLogCollectConfig.java        |  30 +++++-
 .../handler/LoggingKafkaPluginDataHandler.java     |  57 +++++-------
 .../logging/kafka/LoggingKafkaPluginTest.java      |   2 +-
 .../handler/LoggingKafkaPluginDataHandlerTest.java |  23 ++---
 .../plugin/logging/pulsar/LoggingPulsarPlugin.java |  40 ++------
 .../pulsar/client/PulsarLogCollectClient.java      |  26 ++----
 .../pulsar/config/PulsarLogCollectConfig.java      |  29 +++++-
 .../handler/LoggingPulsarPluginDataHandler.java    |  56 ++++-------
 .../logging/pulsar/LoggingPulsarPluginTest.java    |   2 +-
 .../pulsar/client/PulsarLogCollectClientTest.java  |   2 +-
 .../logging/rocketmq/LoggingRocketMQPlugin.java    |  40 ++------
 .../rocketmq/client/RocketMQLogCollectClient.java  |  26 ++----
 .../rocketmq/config/RocketMQLogCollectConfig.java  |  32 ++++++-
 .../handler/LoggingRocketMQPluginDataHandler.java  |  43 ++++-----
 .../rocketmq/LoggingRocketMQPluginTest.java        |   2 +-
 .../LoggingRocketMQPluginDataHandlerTest.java      |  23 ++---
 .../tencent/cls/LoggingTencentClsPlugin.java       |  42 ++-------
 .../cls/client/TencentClsLogCollectClient.java     |  28 ++----
 .../cls/collector/TencentClsSlsLogCollector.java   |   2 +-
 .../LoggingTencentClsPluginDataHandler.java        |  55 ++++-------
 .../tencent/cls/LoggingTencentClsPluginTest.java   |   2 +-
 43 files changed, 648 insertions(+), 748 deletions(-)

diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
index 2737b0dc0..bd9ceaeb2 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.aliyun.sls;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.plugin.aliyun.sls.collector.AliyunSlsLogCollector;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 
 /**
  * LoggingAliYunSlsPlugin send log to aliyun sls service.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingAliyunSlsPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, AliyunSlsLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    protected LogCollector logCollector() {
+        return AliyunSlsLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_ALIYUN_SLS.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_ALIYUN_SLS.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_ALIYUN_SLS;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
index 949e26b52..ad95eedb7 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/client/AliyunSlsLogCollectClient.java
@@ -24,7 +24,6 @@ import 
com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
 import com.aliyun.openservices.aliyun.log.producer.Result;
 import 
com.aliyun.openservices.aliyun.log.producer.errors.LogSizeTooLargeException;
 import 
com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
-import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
 import 
com.aliyun.openservices.aliyun.log.producer.errors.ResultFailedException;
 import com.aliyun.openservices.log.Client;
 import com.aliyun.openservices.log.common.LogItem;
@@ -38,7 +37,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.plugin.aliyun.sls.config.AliyunLogCollectConfig;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.slf4j.Logger;
@@ -51,14 +50,11 @@ import java.util.Objects;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Aliyun sls log Collect client.
  */
-public class AliyunSlsLogCollectClient implements 
LogConsumeClient<AliyunLogCollectConfig.AliyunSlsLogConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AliyunSlsLogCollectClient.class);
+public class AliyunSlsLogCollectClient extends 
AbstractLogConsumeClient<AliyunLogCollectConfig.AliyunSlsLogConfig> {
 
     private Client client;
 
@@ -70,8 +66,6 @@ public class AliyunSlsLogCollectClient implements 
LogConsumeClient<AliyunLogColl
 
     private Producer producer;
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     private ThreadPoolExecutor threadExecutor;
 
     /**
@@ -80,17 +74,7 @@ public class AliyunSlsLogCollectClient implements 
LogConsumeClient<AliyunLogColl
      * @param config config
      */
     @Override
-    public void initClient(final AliyunLogCollectConfig.AliyunSlsLogConfig 
config) {
-        if (Objects.isNull(config) 
-                || StringUtils.isBlank(config.getHost())
-                || StringUtils.isBlank(config.getAccessId())
-                || StringUtils.isBlank(config.getAccessKey())) {
-            LOG.error("aliyun sls props is empty. failed init aliyun sls 
producer");
-            return;
-        }
-        if (isStarted.get()) {
-            close();
-        }
+    public void initClient0(final AliyunLogCollectConfig.AliyunSlsLogConfig 
config) {
         String accessId = config.getAccessId();
         String accessKey = config.getAccessKey();
         String host = config.getHost();
@@ -111,8 +95,6 @@ public class AliyunSlsLogCollectClient implements 
LogConsumeClient<AliyunLogColl
         LogStore store = new LogStore(logStore, ttlInDay, shardCount);
         threadExecutor = createThreadPoolExecutor(config);
         try {
-            isStarted.set(true);
-            Runtime.getRuntime().addShutdownHook(new Thread(this::close));
             client.CreateLogStore(projectName, store);
         } catch (LogException e) {
             LOG.warn("error code:{}, error message:{}", e.GetErrorCode(), 
e.GetErrorMessage());
@@ -125,23 +107,18 @@ public class AliyunSlsLogCollectClient implements 
LogConsumeClient<AliyunLogColl
      * @param logs list of log
      */
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         logs.forEach(this::sendLog);
     }
 
     @Override
-    public void close() {
-        if (Objects.nonNull(client) && isStarted.get()) {
-            isStarted.set(false);
+    public void close0() throws Exception {
+        if (Objects.nonNull(client)) {
             client.shutdown();
-            try {
-                producer.close();
-            } catch (InterruptedException | ProducerException e) {
-                LOG.error("Close producer error.");
-            }
+            producer.close();
         }
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
index faa1729f7..6ffdd37d7 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/config/AliyunLogCollectConfig.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.plugin.aliyun.sls.config;
 
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -284,5 +285,34 @@ public class AliyunLogCollectConfig {
         public void setIoThreadCount(final Integer ioThreadCount) {
             this.ioThreadCount = ioThreadCount;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return Boolean.TRUE;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return Boolean.FALSE;
+            }
+
+            AliyunSlsLogConfig that = (AliyunSlsLogConfig) o;
+            return Objects.equals(getAccessId(), that.getAccessId())
+                    && Objects.equals(getAccessKey(), that.getAccessKey())
+                    && Objects.equals(getHost(), that.getHost())
+                    && Objects.equals(getIoThreadCount(), 
that.getIoThreadCount())
+                    && Objects.equals(getLogStoreName(), 
that.getLogStoreName())
+                    && Objects.equals(getProjectName(), that.getProjectName())
+                    && Objects.equals(getSendThreadCount(), 
that.getSendThreadCount())
+                    && Objects.equals(getShardCount(), that.getShardCount())
+                    && Objects.equals(getTopic(), that.getTopic())
+                    && Objects.equals(getTtlInDay(), that.getTtlInDay());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(accessId, accessKey, host, ioThreadCount, 
logStoreName,
+                    projectName, sendThreadCount, shardCount, topic, ttlInDay);
+        }
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
index 6054b5d68..cdc8339e7 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/main/java/org/apache/shenyu/plugin/aliyun/sls/handler/LoggingAliyunSlsPluginDataHandler.java
@@ -17,55 +17,48 @@
 
 package org.apache.shenyu.plugin.aliyun.sls.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.convert.plugin.MotanRegisterConfig;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.Singleton;
 import org.apache.shenyu.plugin.aliyun.sls.client.AliyunSlsLogCollectClient;
 import org.apache.shenyu.plugin.aliyun.sls.collector.AliyunSlsLogCollector;
 import org.apache.shenyu.plugin.aliyun.sls.config.AliyunLogCollectConfig;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 
 import java.util.Objects;
 
 /**
  * LoggingAliYunSlsPluginDataHandler aliyun sls plugin data handler.
  */
-public class LoggingAliyunSlsPluginDataHandler implements PluginDataHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingAliyunSlsPluginDataHandler.class);
+public class LoggingAliyunSlsPluginDataHandler extends 
AbstractLogPluginDataHandler<AliyunLogCollectConfig.AliyunSlsLogConfig, 
GenericApiConfig> {
 
     private static final AliyunSlsLogCollectClient 
ALIYUN_SLS_LOG_COLLECT_CLIENT = new AliyunSlsLogCollectClient();
 
+    /**
+     * logCollector.
+     */
+    @Override
+    protected LogCollector logCollector() {
+        return AliyunSlsLogCollector.getInstance();
+    }
+
+    /**
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("AliYun sls plugin data: {}", 
GsonUtils.getGson().toJson(pluginData));
-        if (Objects.nonNull(pluginData) && 
Boolean.TRUE.equals(pluginData.getEnabled())) {
-            AliyunLogCollectConfig.AliyunSlsLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                    AliyunLogCollectConfig.AliyunSlsLogConfig.class);
-            AliyunLogCollectConfig.AliyunSlsLogConfig exist = 
Singleton.INST.get(AliyunLogCollectConfig.AliyunSlsLogConfig.class);
-            if (Objects.isNull(globalLogConfig)) {
-                return;
-            }
-            if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
-                // no data, init client
-                
AliyunLogCollectConfig.INSTANCE.setAliyunSlsLogConfig(globalLogConfig);
-                
-                // init aliyun sls client
-                ALIYUN_SLS_LOG_COLLECT_CLIENT.initClient(exist);
-                AliyunSlsLogCollector.getInstance().start();
-            }
-            Singleton.INST.single(MotanRegisterConfig.class, globalLogConfig);
-        } else {
-            try {
-                AliyunSlsLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
+    protected void doRefreshConfig(final 
AliyunLogCollectConfig.AliyunSlsLogConfig globalLogConfig) {
+        AliyunLogCollectConfig.INSTANCE.setAliyunSlsLogConfig(globalLogConfig);
+        if (Objects.isNull(globalLogConfig)
+                || StringUtils.isBlank(globalLogConfig.getHost())
+                || StringUtils.isBlank(globalLogConfig.getAccessId())
+                || StringUtils.isBlank(globalLogConfig.getAccessKey())) {
+            LOG.error("aliyun sls props is empty. failed init aliyun sls 
producer");
+            return;
         }
+        ALIYUN_SLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
index 9cfec412e..f275d0284 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-aliyun-sls/src/test/java/org/apache/shenyu/plugin/aliyun/sls/LoggingAliyunSlsPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingAliyunSlsPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingAliYunSlsPlugin.doLogExecute(exchange, 
chain, selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingAliYunSlsPlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
index e02e3ad1d..11d3f10d0 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.clickhouse;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import 
org.apache.shenyu.plugin.logging.clickhouse.collector.ClickHouseLogCollector;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 
 /**
  * LoggingClickHousePlugin.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingClickHousePlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, ClickHouseLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    protected LogCollector logCollector() {
+        return ClickHouseLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_CLICK_HOUSE.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_CLICK_HOUSE.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_CLICK_HOUSE;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
index 3935a63ac..363195408 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/client/ClickHouseLogCollectClient.java
@@ -28,34 +28,28 @@ import com.clickhouse.client.data.ClickHouseIntegerValue;
 import com.clickhouse.client.data.ClickHouseLongValue;
 import com.clickhouse.client.data.ClickHouseOffsetDateTimeValue;
 import com.clickhouse.client.data.ClickHouseStringValue;
-import java.util.List;
-import java.util.Objects;
-import java.util.TimeZone;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.shenyu.common.utils.DateUtils;
 import 
org.apache.shenyu.plugin.logging.clickhouse.config.ClickHouseLogCollectConfig;
 import 
org.apache.shenyu.plugin.logging.clickhouse.constant.ClickHouseLoggingConstant;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.TimeZone;
 
 /**
  * queue-based logging collector.
  */
-public class ClickHouseLogCollectClient implements 
LogConsumeClient<ClickHouseLogCollectConfig.ClickHouseLogConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseLogCollectClient.class);
-
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+public class ClickHouseLogCollectClient extends 
AbstractLogConsumeClient<ClickHouseLogCollectConfig.ClickHouseLogConfig> {
 
     private ClickHouseClient client;
 
     private ClickHouseNode endpoint;
 
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) throws Exception {
+    public void consume0(final List<ShenyuRequestLog> logs) throws Exception {
         if (CollectionUtils.isNotEmpty(logs)) {
             Object[][] datas = new Object[logs.size()][];
             for (int i = 0; i < logs.size(); i++) {
@@ -109,10 +103,9 @@ public class ClickHouseLogCollectClient implements 
LogConsumeClient<ClickHouseLo
     }
 
     @Override
-    public void close() {
-        if (Objects.nonNull(client) && isStarted.get()) {
+    public void close0() {
+        if (Objects.nonNull(client)) {
             client.close();
-            isStarted.set(false);
         }
     }
 
@@ -122,14 +115,7 @@ public class ClickHouseLogCollectClient implements 
LogConsumeClient<ClickHouseLo
      * @param config properties.
      */
     @Override
-    public void initClient(final 
ClickHouseLogCollectConfig.ClickHouseLogConfig config) {
-        if (Objects.isNull(config)) {
-            LOG.error("clickhouse properties is empty. failed init clickhouse 
client");
-            return;
-        }
-        if (isStarted.get()) {
-            close();
-        }
+    public void initClient0(final 
ClickHouseLogCollectConfig.ClickHouseLogConfig config) {
         final String username = config.getUsername();
         final String password = config.getPassword();
         endpoint = ClickHouseNode.builder()
@@ -146,8 +132,5 @@ public class ClickHouseLogCollectClient implements 
LogConsumeClient<ClickHouseLo
         } catch (Exception e) {
             LOG.error("inti ClickHouseLogClient error" + e);
         }
-        isStarted.set(true);
-        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
-
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
index 4bdd11249..671bb364b 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/config/ClickHouseLogCollectConfig.java
@@ -17,9 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.clickhouse.config;
 
-import java.util.Optional;
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Optional;
+
 /**
  * ClickHouseLogCollectConfig.
  */
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
index c94f6c769..2b5ddab57 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/main/java/org/apache/shenyu/plugin/logging/clickhouse/handler/LoggingClickHousePluginDataHandler.java
@@ -17,50 +17,39 @@
 
 package org.apache.shenyu.plugin.logging.clickhouse.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
 import 
org.apache.shenyu.plugin.logging.clickhouse.client.ClickHouseLogCollectClient;
 import 
org.apache.shenyu.plugin.logging.clickhouse.collector.ClickHouseLogCollector;
 import 
org.apache.shenyu.plugin.logging.clickhouse.config.ClickHouseLogCollectConfig;
 import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 
 /**
  * The type logging pulsar plugin data handler.
  */
-public class LoggingClickHousePluginDataHandler implements PluginDataHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingClickHousePluginDataHandler.class);
+public class LoggingClickHousePluginDataHandler extends 
AbstractLogPluginDataHandler<ClickHouseLogCollectConfig.ClickHouseLogConfig, 
GenericApiConfig> {
 
     private static final ClickHouseLogCollectClient 
CLICK_HOUSE_LOG_COLLECT_CLIENT = new ClickHouseLogCollectClient();
 
     /**
-     * getClickHouseLogCollectClient.
-     *
-     * @return LogConsumeClient
+     * logCollector.
      */
-    public static LogConsumeClient getClickHouseLogCollectClient() {
-        return CLICK_HOUSE_LOG_COLLECT_CLIENT;
+    @Override
+    protected LogCollector logCollector() {
+        return ClickHouseLogCollector.getInstance();
     }
 
+    /**
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("handler loggingClickHouse Plugin data:{}", 
GsonUtils.getGson().toJson(pluginData));
-        if (pluginData.getEnabled()) {
-            ClickHouseLogCollectConfig.ClickHouseLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(), 
ClickHouseLogCollectConfig.ClickHouseLogConfig.class);
-            
ClickHouseLogCollectConfig.INSTANCE.setClickHouseLogConfig(globalLogConfig);
-            CLICK_HOUSE_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-            ClickHouseLogCollector.getInstance().start();
-        } else {
-            try {
-                ClickHouseLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
+    protected void doRefreshConfig(final 
ClickHouseLogCollectConfig.ClickHouseLogConfig globalLogConfig) {
+        
ClickHouseLogCollectConfig.INSTANCE.setClickHouseLogConfig(globalLogConfig);
+        CLICK_HOUSE_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
@@ -68,4 +57,13 @@ public class LoggingClickHousePluginDataHandler implements 
PluginDataHandler {
         return PluginEnum.LOGGING_CLICK_HOUSE.getName();
     }
 
+    /**
+     * getClickHouseLogCollectClient.
+     *
+     * @return LogConsumeClient
+     */
+    public static LogConsumeClient getClickHouseLogCollectClient() {
+        return CLICK_HOUSE_LOG_COLLECT_CLIENT;
+    }
+
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
index f3e80bc9f..849346b66 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-clickhouse/src/test/java/org/apache/shenyu/plugin/logging/clickhouse/LoggingClickHousePluginTest.java
@@ -90,7 +90,7 @@ public class LoggingClickHousePluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingClickHousePlugin.doLogExecute(exchange, 
chain, selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingClickHousePlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
index 5752f33dd..d74ca13ba 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/AbstractLoggingPlugin.java
@@ -19,9 +19,13 @@ package org.apache.shenyu.plugin.logging.common;
 
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
 import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
 import org.apache.shenyu.plugin.logging.common.utils.LogCollectUtils;
@@ -36,24 +40,23 @@ import static 
org.apache.shenyu.plugin.logging.common.constant.GenericLoggingCon
  * abstract logging plugin.
  */
 public abstract class AbstractLoggingPlugin extends AbstractShenyuPlugin {
-    
+
+    /**
+     * LogCollector.
+     *
+     * @return LogCollector
+     */
+    protected abstract LogCollector logCollector();
+
     /**
-     * log collector execute.
+     * pluginEnum.
      *
-     * @param exchange web exchange
-     * @param chain shenyu plugin chain
-     * @param selector selector data
-     * @param rule rule data
-     * @param request server http request
-     * @param requestInfo request information
-     * @return mono
+     * @return PluginEnum
      */
-    protected abstract Mono<Void> doLogExecute(ServerWebExchange exchange, 
ShenyuPluginChain chain,
-                                               SelectorData selector, RuleData 
rule,
-                                               ServerHttpRequest request, 
ShenyuRequestLog requestInfo);
+    protected abstract PluginEnum pluginEnum();
 
     @Override
-    protected Mono<Void> doExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
+    public Mono<Void> doExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
                                    final SelectorData selector, final RuleData 
rule) {
         ServerHttpRequest request = exchange.getRequest();
         // control sampling
@@ -69,6 +72,33 @@ public abstract class AbstractLoggingPlugin extends 
AbstractShenyuPlugin {
         requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
         requestInfo.setHost(request.getHeaders().getFirst(HOST));
         requestInfo.setPath(request.getURI().getPath());
-        return this.doLogExecute(exchange, chain, selector, rule, request, 
requestInfo);
+        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
+        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
+                requestInfo, this.logCollector());
+        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
+                .response(loggingServerHttpResponse).build();
+        loggingServerHttpResponse.setExchange(webExchange);
+        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    }
+
+
+    /**
+     * get plugin order.
+     *
+     * @return order
+     */
+    @Override
+    public int getOrder() {
+        return pluginEnum().getCode();
+    }
+
+    /**
+     * get plugin name.
+     *
+     * @return plugin name
+     */
+    @Override
+    public String named() {
+        return pluginEnum().getName();
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
new file mode 100644
index 000000000..da8b2aa31
--- /dev/null
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/client/AbstractLogConsumeClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.common.client;
+
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.lang.NonNull;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * AbstractLogConsumeClient.
+ */
+public abstract class AbstractLogConsumeClient<T extends GenericGlobalConfig> 
implements LogConsumeClient<T> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractLogConsumeClient.class);
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    private final AtomicReference<Thread> closeThread = new 
AtomicReference<>();
+
+    /**
+     * initClient0.
+     *
+     * @param config config
+     */
+    public abstract void initClient0(@NonNull T config);
+
+    /**
+     * consume0.
+     *
+     * @param logs logs
+     * @throws Exception error
+     */
+    public abstract void consume0(List<ShenyuRequestLog> logs) throws 
Exception;
+
+    /**
+     * close0.
+     *
+     * @throws Exception error
+     */
+    public abstract void close0() throws Exception;
+
+    @Override
+    public void initClient(final T config) {
+        if (isStarted.get()) {
+            this.close();
+        }
+        if (ObjectUtils.isEmpty(config)) {
+            LOG.error("{} config is null.", this.getClass().getSimpleName());
+            return;
+        }
+        this.initClient0(config);
+        isStarted.set(true);
+        closeThread.set(new Thread(this::close));
+        Runtime.getRuntime().addShutdownHook(closeThread.get());
+    }
+
+    @Override
+    public void close() {
+        if (!ObjectUtils.isEmpty(closeThread.get())) {
+            Runtime.getRuntime().removeShutdownHook(closeThread.get());
+        }
+        if (isStarted.get()) {
+            isStarted.set(false);
+            try {
+                this.close0();
+            } catch (Exception e) {
+                LOG.error("{} close error.", this.getClass().getSimpleName());
+            }
+        }
+    }
+
+    @Override
+    public void consume(final List<ShenyuRequestLog> logs) throws Exception {
+        if (!isStarted.get()) {
+            return;
+        }
+        this.consume0(logs);
+    }
+}
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
similarity index 57%
rename from 
shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
rename to 
shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
index f423d023f..46c42b833 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/handler/AbstractLogPluginDataHandler.java
@@ -17,27 +17,35 @@
 
 package org.apache.shenyu.plugin.logging.common.handler;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.SelectorTypeEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.common.utils.Singleton;
 import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
- * AbstractPluginDataHandler.
+ * AbstractLogPluginDataHandler.
  */
-public abstract class AbstractPluginDataHandler<C extends GenericApiConfig> 
implements PluginDataHandler {
+public abstract class AbstractLogPluginDataHandler<T extends 
GenericGlobalConfig, C extends GenericApiConfig> implements PluginDataHandler {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractPluginDataHandler.class);
+    protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractLogPluginDataHandler.class);
 
     private static final String EMPTY_JSON = "{}";
 
@@ -45,12 +53,6 @@ public abstract class AbstractPluginDataHandler<C extends 
GenericApiConfig> impl
 
     private static final Map<String, GenericApiConfig> SELECT_API_CONFIG_MAP = 
new ConcurrentHashMap<>();
 
-    private final Class<C> clazz;
-
-    protected AbstractPluginDataHandler(final Class<C> clazz) {
-        this.clazz = clazz;
-    }
-
     /**
      * get selectId uriList map.
      *
@@ -69,25 +71,69 @@ public abstract class AbstractPluginDataHandler<C extends 
GenericApiConfig> impl
         return SELECT_API_CONFIG_MAP;
     }
 
+    /**
+     * LogCollector.
+     *
+     * @return LogCollector
+     */
+    protected abstract LogCollector logCollector();
+
+    /**
+     * LogCollector.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
+    protected abstract void doRefreshConfig(T globalLogConfig);
+
+    @Override
+    public void handlerPlugin(final PluginData pluginData) {
+        ParameterizedType parameterizedType = (ParameterizedType) 
this.getClass().getGenericSuperclass();
+        Type[] actualTypeArguments = 
parameterizedType.getActualTypeArguments();
+        final Class<T> globalLogConfigClass = (Class<T>) 
actualTypeArguments[0];
+        LOG.info("handler {} Plugin data: {}", pluginNamed(), 
GsonUtils.getGson().toJson(pluginData));
+        if (Objects.nonNull(pluginData) && 
Boolean.TRUE.equals(pluginData.getEnabled())) {
+            T globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(), globalLogConfigClass);
+            T exist = Singleton.INST.get(globalLogConfigClass);
+            if (Objects.isNull(globalLogConfig)) {
+                return;
+            }
+            if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
+                // no data, init client
+                this.doRefreshConfig(globalLogConfig);
+                logCollector().start();
+            }
+            Singleton.INST.single(globalLogConfigClass, globalLogConfig);
+        } else {
+            try {
+                logCollector().close();
+            } catch (Exception e) {
+                LOG.error("{} close log collector error", 
this.getClass().getSimpleName(), e);
+            }
+        }
+    }
+
     @Override
     public void handlerSelector(final SelectorData selectorData) {
+        ParameterizedType parameterizedType = (ParameterizedType) 
this.getClass().getGenericSuperclass();
+        Type[] actualTypeArguments = 
parameterizedType.getActualTypeArguments();
+        final Class<C> genericApiConfigClass = (Class<C>) 
actualTypeArguments[1];
         LOG.info("handler {} selector data:{}", pluginNamed(), 
GsonUtils.getGson().toJson(selectorData));
         String handleJson = selectorData.getHandle();
         if (StringUtils.isEmpty(handleJson) || 
EMPTY_JSON.equals(handleJson.trim())) {
             return;
         }
         if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
-            || CollectionUtils.isEmpty(selectorData.getConditionList())) {
+                || CollectionUtils.isEmpty(selectorData.getConditionList())) {
             return;
         }
-        GenericApiConfig logApiConfig = 
GsonUtils.getInstance().fromJson(handleJson, clazz);
+        GenericApiConfig logApiConfig = 
GsonUtils.getInstance().fromJson(handleJson, genericApiConfigClass);
         if (StringUtils.isBlank(logApiConfig.getTopic()) || 
StringUtils.isBlank(logApiConfig.getSampleRate())) {
             return;
         }
         List<String> uriList = new ArrayList<>();
         for (ConditionData conditionData : selectorData.getConditionList()) {
             if ("uri".equals(conditionData.getParamType()) && 
StringUtils.isNotBlank(conditionData.getParamValue())
-                && ("match".equals(conditionData.getOperator()) || 
"=".equals(conditionData.getOperator()))) {
+                    && ("match".equals(conditionData.getOperator()) || 
"=".equals(conditionData.getOperator()))) {
                 uriList.add(conditionData.getParamValue().trim());
             }
         }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
index 31e10cd6a..babb1e74d 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/sampler/CountSampler.java
@@ -37,6 +37,7 @@ public class CountSampler implements Sampler {
 
     /**
      * Fills a bitset with decisions according to the supplied probability.
+     * @param probability probability
      */
     public CountSampler(final float probability) {
         counter = new AtomicInteger();
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
index 821bdea00..e496441af 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.elasticsearch;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import 
org.apache.shenyu.plugin.logging.elasticsearch.collector.ElasticSearchLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
 
 /**
  * Integrated elasticsearch collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingElasticSearchPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingElasticSearchServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingElasticSearchServerResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, ElasticSearchLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingElasticSearchServerHttpRequest)
-                .response(loggingElasticSearchServerResponse).build();
-        loggingElasticSearchServerResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingElasticSearchServerResponse::logError);
+    protected LogCollector logCollector() {
+        return ElasticSearchLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return plugin order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_ELASTIC_SEARCH.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_ELASTIC_SEARCH.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_ELASTIC_SEARCH;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
index da97481e4..85f83ed2f 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/client/ElasticSearchLogCollectClient.java
@@ -30,7 +30,7 @@ import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import 
org.apache.shenyu.plugin.logging.elasticsearch.config.ElasticSearchLogCollectConfig;
@@ -43,12 +43,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * queue-based logging collector.
  */
-public class ElasticSearchLogCollectClient implements 
LogConsumeClient<ElasticSearchLogCollectConfig.ElasticSearchLogConfig> {
+public class ElasticSearchLogCollectClient extends 
AbstractLogConsumeClient<ElasticSearchLogCollectConfig.ElasticSearchLogConfig> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticSearchLogCollectClient.class);
 
@@ -58,15 +57,13 @@ public class ElasticSearchLogCollectClient implements 
LogConsumeClient<ElasticSe
 
     private ElasticsearchClient client;
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     /**
      * init elasticsearch client.
      *
      * @param config elasticsearch client config
      */
     @Override
-    public void initClient(final 
ElasticSearchLogCollectConfig.ElasticSearchLogConfig config) {
+    public void initClient0(final 
ElasticSearchLogCollectConfig.ElasticSearchLogConfig config) {
         RestClientBuilder builder = RestClient
                 .builder(new HttpHost(config.getHost(), 
Integer.parseInt(config.getPort())));
         
@@ -92,13 +89,11 @@ public class ElasticSearchLogCollectClient implements 
LogConsumeClient<ElasticSe
             createIndex(GenericLoggingConstant.INDEX);
             LOG.info("create index success");
         }
-        isStarted.set(true);
-        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
     }
 
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         List<BulkOperation> bulkOperations = new ArrayList<>();
@@ -150,8 +145,8 @@ public class ElasticSearchLogCollectClient implements 
LogConsumeClient<ElasticSe
      * close client.
      */
     @Override
-    public void close() {
-        if (Objects.nonNull(restClient) && isStarted.get()) {
+    public void close0() {
+        if (Objects.nonNull(restClient)) {
             try {
                 transport.close();
             } catch (IOException e) {
@@ -162,7 +157,6 @@ public class ElasticSearchLogCollectClient implements 
LogConsumeClient<ElasticSe
             } catch (IOException e) {
                 LOG.error("restClient close has IOException : ", e);
             }
-            isStarted.set(false);
         }
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
index c680538ac..8cdb03624 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/config/ElasticSearchLogCollectConfig.java
@@ -19,6 +19,7 @@ package org.apache.shenyu.plugin.logging.elasticsearch.config;
 
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -58,11 +59,11 @@ public class ElasticSearchLogCollectConfig {
         private String host;
 
         private String port;
-        
+
         private String username;
-        
+
         private String password;
-        
+
         private Boolean authCache;
 
         private String compressAlg;
@@ -171,6 +172,31 @@ public class ElasticSearchLogCollectConfig {
         public void setAuthCache(final Boolean authCache) {
             this.authCache = authCache;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return Boolean.TRUE;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return Boolean.FALSE;
+            }
+
+            ElasticSearchLogConfig that = (ElasticSearchLogConfig) o;
+            return Objects.equals(getHost(), that.getHost())
+                    && Objects.equals(getCompressAlg(), that.getCompressAlg())
+                    && Objects.equals(getPort(), that.getPort())
+                    && Objects.equals(getSampleRate(), that.getSampleRate())
+                    && Objects.equals(getBufferQueueSize(), 
that.getBufferQueueSize())
+                    && Objects.equals(getMaxResponseBody(), 
that.getMaxRequestBody())
+                    && Objects.equals(getMaxRequestBody(), 
that.getMaxResponseBody());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(host, compressAlg, port);
+        }
     }
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
index 6a2c88ad5..82e3d8fe5 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/main/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandler.java
@@ -17,95 +17,38 @@
 
 package org.apache.shenyu.plugin.logging.elasticsearch.handler;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.common.dto.ConditionData;
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.enums.SelectorTypeEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import 
org.apache.shenyu.plugin.logging.elasticsearch.client.ElasticSearchLogCollectClient;
 import 
org.apache.shenyu.plugin.logging.elasticsearch.collector.ElasticSearchLogCollector;
 import 
org.apache.shenyu.plugin.logging.elasticsearch.config.ElasticSearchLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The type logging elasticsearch plugin data handler.
  */
-public class LoggingElasticSearchPluginDataHandler implements 
PluginDataHandler {
-    
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingElasticSearchPluginDataHandler.class);
+public class LoggingElasticSearchPluginDataHandler extends 
AbstractLogPluginDataHandler<ElasticSearchLogCollectConfig.ElasticSearchLogConfig,
 GenericApiConfig> {
 
     private static final ElasticSearchLogCollectClient 
ELASTICSEARCH_LOG_COLLECT_CLIENT = new ElasticSearchLogCollectClient();
 
-    private static final String EMPTY_JSON = "{}";
-
-    private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = 
new ConcurrentHashMap<>();
-
-    private static final Map<String, 
ElasticSearchLogCollectConfig.LogApiConfig> SELECT_API_CONFIG_MAP = new 
ConcurrentHashMap<>();
-
     /**
-     * start or close elasticsearch client.
+     * logCollector.
      */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("handler loggingElasticSearch Plugin data:{}", 
GsonUtils.getGson().toJson(pluginData));
-        if (pluginData.getEnabled()) {
-            ElasticSearchLogCollectConfig.ElasticSearchLogConfig 
globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                    
ElasticSearchLogCollectConfig.ElasticSearchLogConfig.class);
-
-            
ElasticSearchLogCollectConfig.INSTANCE.setElasticSearchLogConfig(globalLogConfig);
-            ELASTICSEARCH_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-            ElasticSearchLogCollector.getInstance().start();
-        } else {
-            try {
-                ElasticSearchLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
-    }
-
-    @Override
-    public void handlerSelector(final SelectorData selectorData) {
-        LOG.info("handler loggingElasticSearch selector data:{}", 
GsonUtils.getGson().toJson(selectorData));
-        String handleJson = selectorData.getHandle();
-        if (StringUtils.isEmpty(handleJson) || 
EMPTY_JSON.equals(handleJson.trim())) {
-            return;
-        }
-        if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
-                || CollectionUtils.isEmpty(selectorData.getConditionList())) {
-            return;
-        }
-        ElasticSearchLogCollectConfig.LogApiConfig logApiConfig = 
GsonUtils.getInstance().fromJson(handleJson,
-                ElasticSearchLogCollectConfig.LogApiConfig.class);
-        if (StringUtils.isBlank(logApiConfig.getIndex()) || 
StringUtils.isBlank(logApiConfig.getSampleRate())) {
-            return;
-        }
-        List<String> uriList = new ArrayList<>();
-        for (ConditionData conditionData : selectorData.getConditionList()) {
-            if ("uri".equals(conditionData.getParamType()) && 
StringUtils.isNotBlank(conditionData.getParamValue())
-                    && ("match".equals(conditionData.getOperator()) || 
"=".equals(conditionData.getOperator()))) {
-                uriList.add(conditionData.getParamValue().trim());
-            }
-        }
-        SELECT_ID_URI_LIST_MAP.put(selectorData.getId(), uriList);
-        SELECT_API_CONFIG_MAP.put(selectorData.getId(), logApiConfig);
+    protected LogCollector logCollector() {
+        return ElasticSearchLogCollector.getInstance();
     }
 
+    /**
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
     @Override
-    public void removeSelector(final SelectorData selectorData) {
-        LOG.info("handler remove loggingElasticSearch selector data:{}", 
GsonUtils.getGson().toJson(selectorData));
-        SELECT_ID_URI_LIST_MAP.remove(selectorData.getId());
-        SELECT_API_CONFIG_MAP.remove(selectorData.getId());
+    protected void doRefreshConfig(final 
ElasticSearchLogCollectConfig.ElasticSearchLogConfig globalLogConfig) {
+        
ElasticSearchLogCollectConfig.INSTANCE.setElasticSearchLogConfig(globalLogConfig);
+        ELASTICSEARCH_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
@@ -120,20 +63,4 @@ public class LoggingElasticSearchPluginDataHandler 
implements PluginDataHandler
     public static ElasticSearchLogCollectClient 
getElasticSearchLogCollectClient() {
         return ELASTICSEARCH_LOG_COLLECT_CLIENT;
     }
-
-    /**
-     * get selectId uriList map.
-     * @return selectId uriList map
-     */
-    public static Map<String, List<String>> getSelectIdUriListMap() {
-        return SELECT_ID_URI_LIST_MAP;
-    }
-
-    /**
-     * get select api config map.
-     * @return select api config map
-     */
-    public static Map<String, ElasticSearchLogCollectConfig.LogApiConfig> 
getSelectApiConfigMap() {
-        return SELECT_API_CONFIG_MAP;
-    }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
index 788a0a72f..cfb4325aa 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/LoggingElasticSearchPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingElasticSearchPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingElasticSearchPlugin.doLogExecute(exchange, 
chain, selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingElasticSearchPlugin.doExecute(exchange, 
chain, selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
index 36831a3c5..54c6c43da 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-elasticsearch/src/test/java/org/apache/shenyu/plugin/logging/elasticsearch/handler/LoggingElasticSearchPluginDataHandlerTest.java
@@ -48,7 +48,7 @@ public final class LoggingElasticSearchPluginDataHandlerTest {
         this.loggingElasticSearchPluginDataHandler = new 
LoggingElasticSearchPluginDataHandler();
         selectorData.setId("1");
         selectorData.setType(1);
-        selectorData.setHandle("{\"index\":\"test\", \"sampleRate\":\"1\"}");
+        selectorData.setHandle("{\"index\":\"test\", \"sampleRate\":\"1\", 
\"topic\":\"1\"}");
         conditionData.setParamName("id");
         conditionData.setParamType("uri");
         conditionData.setParamValue("11");
@@ -73,9 +73,9 @@ public final class LoggingElasticSearchPluginDataHandlerTest {
     @Test
     public void testHandlerSelector() throws NoSuchFieldException, 
IllegalAccessException {
         loggingElasticSearchPluginDataHandler.handlerSelector(selectorData);
-        Field field1 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        Field field1 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
         field1.setAccessible(true);
-        Field field2 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        Field field2 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
         field2.setAccessible(true);
         Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
         Assertions.assertNotEquals(field2.get("1").toString(), "{}");
@@ -84,16 +84,16 @@ public final class 
LoggingElasticSearchPluginDataHandlerTest {
     @Test
     public void testRemoveSelector() throws NoSuchFieldException, 
IllegalAccessException {
         loggingElasticSearchPluginDataHandler.handlerSelector(selectorData);
-        Field field1 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        Field field1 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
         field1.setAccessible(true);
-        Field field2 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        Field field2 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
         field2.setAccessible(true);
         Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
         Assertions.assertNotEquals(field2.get("1").toString(), "{}");
         loggingElasticSearchPluginDataHandler.removeSelector(selectorData);
-        Field field3 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        Field field3 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
         field3.setAccessible(true);
-        Field field4 = 
loggingElasticSearchPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        Field field4 = 
loggingElasticSearchPluginDataHandler.getClass().getSuperclass().getDeclaredField("SELECT_API_CONFIG_MAP");
         field4.setAccessible(true);
         Assertions.assertEquals(field3.get("1").toString(), "{}");
         Assertions.assertEquals(field4.get("1").toString(), "{}");
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
index 46fc15cf6..e7dda6876 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.kafka;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
 
 /**
  * Integrated kafka collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingKafkaPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, KafkaLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    protected LogCollector logCollector() {
+        return KafkaLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_KAFKA.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_KAFKA.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_KAFKA;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index 75c311fb7..782b94002 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -17,13 +17,6 @@
 
 package org.apache.shenyu.plugin.logging.kafka.client;
 
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.collections4.CollectionUtils;
@@ -38,25 +31,26 @@ import 
org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
 import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
 
 /**
  * queue-based logging collector.
  */
-public class KafkaLogCollectClient implements 
LogConsumeClient<KafkaLogCollectConfig.KafkaLogConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLogCollectClient.class);
+public class KafkaLogCollectClient extends 
AbstractLogConsumeClient<KafkaLogCollectConfig.KafkaLogConfig> {
 
     private static Map<String, String> apiTopicMap = new HashMap<>();
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     private KafkaProducer<String, String> producer;
 
     private String topic;
@@ -67,16 +61,13 @@ public class KafkaLogCollectClient implements 
LogConsumeClient<KafkaLogCollectCo
      * @param config kafka props
      */
     @Override
-    public void initClient(final KafkaLogCollectConfig.KafkaLogConfig config) {
+    public void initClient0(final KafkaLogCollectConfig.KafkaLogConfig config) 
{
         if (Objects.isNull(config)
                 || StringUtils.isBlank(config.getNamesrvAddr())
                 || StringUtils.isBlank(config.getTopic())) {
             LOG.error("kafka props is empty. failed init kafka producer");
             return;
         }
-        if (isStarted.get()) {
-            close();
-        }
         String topic = "shenyu-access-logging";
         String nameserverAddress = config.getNamesrvAddr();
         if (StringUtils.isBlank(topic) || 
StringUtils.isBlank(nameserverAddress)) {
@@ -94,7 +85,6 @@ public class KafkaLogCollectClient implements 
LogConsumeClient<KafkaLogCollectCo
         try {
             producer.send(record);
             LOG.info("init kafkaLogCollectClient success");
-            isStarted.set(true);
         } catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e) {
             // We can't recover from these exceptions, so our only option is 
to close the producer and exit.
             LOG.error("Init kafkaLogCollectClient error, We can't recover from 
these exceptions, so our only option is to close the producer and exit", e);
@@ -112,8 +102,8 @@ public class KafkaLogCollectClient implements 
LogConsumeClient<KafkaLogCollectCo
      * @param logs list of log
      */
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         logs.forEach(log -> {
@@ -156,10 +146,9 @@ public class KafkaLogCollectClient implements 
LogConsumeClient<KafkaLogCollectCo
      * close producer.
      */
     @Override
-    public void close() {
-        if (Objects.nonNull(producer) && isStarted.get()) {
+    public void close0() {
+        if (Objects.nonNull(producer)) {
             producer.close();
-            isStarted.set(false);
         }
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
index 9015872c6..3dce5bc27 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/KafkaLogCollectConfig.java
@@ -17,10 +17,12 @@
 
 package org.apache.shenyu.plugin.logging.kafka.config;
 
-import java.util.Optional;
 import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Objects;
+import java.util.Optional;
+
 /**
  * log collect config, include kafka config.
  * Topic and nameserver must be included, and others are optional.
@@ -134,6 +136,32 @@ public class KafkaLogCollectConfig {
         public void setProducerGroup(final String producerGroup) {
             this.producerGroup = producerGroup;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return Boolean.TRUE;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return Boolean.FALSE;
+            }
+
+            KafkaLogConfig that = (KafkaLogConfig) o;
+            return Objects.equals(getTopic(), that.getTopic())
+                    && Objects.equals(getCompressAlg(), that.getCompressAlg())
+                    && Objects.equals(getNamesrvAddr(), that.getNamesrvAddr())
+                    && Objects.equals(getProducerGroup(), 
that.getProducerGroup())
+                    && Objects.equals(getSampleRate(), that.getSampleRate())
+                    && Objects.equals(getBufferQueueSize(), 
that.getBufferQueueSize())
+                    && Objects.equals(getMaxResponseBody(), 
that.getMaxRequestBody())
+                    && Objects.equals(getMaxRequestBody(), 
that.getMaxResponseBody());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topic, compressAlg, namesrvAddr, 
producerGroup);
+        }
     }
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
index ba746a296..0abd6c2b8 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -17,63 +17,50 @@
 
 package org.apache.shenyu.plugin.logging.kafka.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import 
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
 import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
 import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The type logging kafka plugin data handler.
  */
-public class LoggingKafkaPluginDataHandler extends 
AbstractPluginDataHandler<KafkaLogCollectConfig.LogApiConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingKafkaPluginDataHandler.class);
+public class LoggingKafkaPluginDataHandler extends 
AbstractLogPluginDataHandler<KafkaLogCollectConfig.KafkaLogConfig, 
KafkaLogCollectConfig.LogApiConfig> {
 
     private static final KafkaLogCollectClient KAFKA_LOG_COLLECT_CLIENT = new 
KafkaLogCollectClient();
 
-    public LoggingKafkaPluginDataHandler() {
-        super(KafkaLogCollectConfig.LogApiConfig.class);
-    }
-
     /**
-     * get kafka log collect client.
-     *
-     * @return kafka log collect client.
+     * logCollector.
      */
-    public static KafkaLogCollectClient getKafkaLogCollectClient() {
-        return KAFKA_LOG_COLLECT_CLIENT;
+    @Override
+    protected LogCollector logCollector() {
+        return KafkaLogCollector.getInstance();
     }
 
     /**
-     * start or close kafka client.
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
      */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("handler loggingKafka Plugin data:{}", 
GsonUtils.getGson().toJson(pluginData));
-        if (pluginData.getEnabled()) {
-            KafkaLogCollectConfig.KafkaLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                KafkaLogCollectConfig.KafkaLogConfig.class);
-
-            KafkaLogCollectConfig.INSTANCE.setKafkaLogConfig(globalLogConfig);
-            // start kafka producer
-            KAFKA_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-            KafkaLogCollector.getInstance().start();
-        } else {
-            try {
-                KafkaLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
+    protected void doRefreshConfig(final KafkaLogCollectConfig.KafkaLogConfig 
globalLogConfig) {
+        KafkaLogCollectConfig.INSTANCE.setKafkaLogConfig(globalLogConfig);
+        KAFKA_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
     public String pluginNamed() {
         return PluginEnum.LOGGING_KAFKA.getName();
     }
+
+    /**
+     * get kafka log collect client.
+     *
+     * @return kafka log collect client.
+     */
+    public static KafkaLogCollectClient getKafkaLogCollectClient() {
+        return KAFKA_LOG_COLLECT_CLIENT;
+    }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
index 8c708202e..64a63ff31 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
@@ -89,7 +89,7 @@ public final class LoggingKafkaPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingKafkaPlugin.doLogExecute(exchange, chain, 
selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingKafkaPlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
index 5c958b039..684e0ab02 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.shenyu.plugin.logging.kafka.handler;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.shenyu.common.dto.ConditionData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.SelectorData;
-import 
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * The Test Case For LoggingKafkaPluginDataHandler.
  */
@@ -71,18 +72,18 @@ public class LoggingKafkaPluginDataHandlerTest {
     @Test
     public void testHandlerSelector() {
         loggingKafkaPluginDataHandler.handlerSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
-        
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
+        
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
     }
 
     @Test
     public void testRemoveSelector() {
         loggingKafkaPluginDataHandler.handlerSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
-        
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
+        
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
         loggingKafkaPluginDataHandler.removeSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{}");
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
     }
 
     @Test
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
index ce02b3b3a..9f9f566cc 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.pulsar;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
 
 /**
  * Integrated pulsar collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingPulsarPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, PulsarLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    protected LogCollector logCollector() {
+        return PulsarLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_PULSAR.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_PULSAR.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_PULSAR;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
index b5d676cd1..f8906e557 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
@@ -35,45 +35,34 @@ import org.slf4j.LoggerFactory;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * queue-based logging collector.
  */
-public class PulsarLogCollectClient implements 
LogConsumeClient<PulsarLogCollectConfig.PulsarLogConfig> {
+public class PulsarLogCollectClient extends 
AbstractLogConsumeClient<PulsarLogCollectConfig.PulsarLogConfig> {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarLogCollectClient.class);
 
     private PulsarClient client;
 
     private Producer<byte[]> producer;
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     /**
      * init producer.
      * 
      * @param config pulsar props
      */
     @Override
-    public void initClient(final PulsarLogCollectConfig.PulsarLogConfig 
config) {
-        if (Objects.isNull(config)) {
-            LOG.error("Pulsar config is empty. Fail to init Pulsar producer.");
-            return;
-        }
+    public void initClient0(final PulsarLogCollectConfig.PulsarLogConfig 
config) {
         String topic = config.getTopic();
         String serviceUrl = config.getServiceUrl();
         if (StringUtils.isBlank(topic) || StringUtils.isBlank(serviceUrl)) {
             LOG.error("init PulsarLogCollectClient error, please check topic 
or serviceUrl.");
             return;
         }
-        if (isStarted.get()) {
-            close();
-        }
         try {
             client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             producer = client.newProducer().topic(topic).create();
             LOG.info("init PulsarLogCollectClient success.");
-            isStarted.set(true);
             Runtime.getRuntime().addShutdownHook(new Thread(this::close));
 
         } catch (PulsarClientException e) {
@@ -83,8 +72,8 @@ public class PulsarLogCollectClient implements 
LogConsumeClient<PulsarLogCollect
     }
 
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         logs.forEach(log -> {
@@ -110,12 +99,11 @@ public class PulsarLogCollectClient implements 
LogConsumeClient<PulsarLogCollect
     }
 
     @Override
-    public void close() {
-        if (Objects.nonNull(producer) && isStarted.get()) {
+    public void close0() {
+        if (Objects.nonNull(producer)) {
             try {
                 producer.close();
                 client.close();
-                isStarted.set(false);
             } catch (PulsarClientException e) {
                 LOG.error("fail to close PulsarLogCollectClient, e", e);
             }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
index 2cb489979..65dcfb51d 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
@@ -17,8 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.pulsar.config;
 
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Objects;
 import java.util.Optional;
 
 public class PulsarLogCollectConfig {
@@ -104,9 +106,34 @@ public class PulsarLogCollectConfig {
         public void setServiceUrl(final String serviceUrl) {
             this.serviceUrl = serviceUrl;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return Boolean.TRUE;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return Boolean.FALSE;
+            }
+
+            PulsarLogConfig that = (PulsarLogConfig) o;
+            return Objects.equals(getTopic(), that.getTopic())
+                    && Objects.equals(getCompressAlg(), that.getCompressAlg())
+                    && Objects.equals(getServiceUrl(), that.getServiceUrl())
+                    && Objects.equals(getSampleRate(), that.getSampleRate())
+                    && Objects.equals(getBufferQueueSize(), 
that.getBufferQueueSize())
+                    && Objects.equals(getMaxResponseBody(), 
that.getMaxRequestBody())
+                    && Objects.equals(getMaxRequestBody(), 
that.getMaxResponseBody());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topic, compressAlg, serviceUrl);
+        }
     }
 
-    public static class LogApiConfig {
+    public static class LogApiConfig extends GenericApiConfig {
 
         /**
          * 0 means never sample, 1 means always sample. Minimum probability is 
0.01, or 1% of logging
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
index 3ebaa3b4f..75b42141d 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
@@ -17,62 +17,38 @@
 
 package org.apache.shenyu.plugin.logging.pulsar.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
 import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
 import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
  * The type logging pulsar plugin data handler.
  */
-public class LoggingPulsarPluginDataHandler implements PluginDataHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingPulsarPluginDataHandler.class);
+public class LoggingPulsarPluginDataHandler extends 
AbstractLogPluginDataHandler<PulsarLogCollectConfig.PulsarLogConfig, 
PulsarLogCollectConfig.LogApiConfig> {
 
     private static final PulsarLogCollectClient PULSAR_LOG_COLLECT_CLIENT = 
new PulsarLogCollectClient();
 
-    private static final String EMPTY_JSON = "{}";
-
-    private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = 
new ConcurrentHashMap<>();
-
-    @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("handler loggingPulsar Plugin data:{}", 
GsonUtils.getGson().toJson(pluginData));
-        if (pluginData.getEnabled()) {
-            PulsarLogCollectConfig.PulsarLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                    PulsarLogCollectConfig.PulsarLogConfig.class);
-            
PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
-            // start pulsar producer
-            PULSAR_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-            PulsarLogCollector.getInstance().start();
-        } else {
-            try {
-                PulsarLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
-    }
-
+    /**
+     * logCollector.
+     */
     @Override
-    public void handlerSelector(final SelectorData selectorData) {
-        PluginDataHandler.super.handlerSelector(selectorData);
+    protected LogCollector logCollector() {
+        return PulsarLogCollector.getInstance();
     }
 
+    /**
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
     @Override
-    public void removeSelector(final SelectorData selectorData) {
-        PluginDataHandler.super.removeSelector(selectorData);
+    protected void doRefreshConfig(final 
PulsarLogCollectConfig.PulsarLogConfig globalLogConfig) {
+        PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
+        PULSAR_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
index 52ae59f98..3ee060937 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
@@ -87,7 +87,7 @@ public class LoggingPulsarPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingPulsarPlugin.doLogExecute(exchange, chain, 
selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingPulsarPlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
index c9ea49432..2f4837a0a 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
@@ -61,7 +61,7 @@ public class PulsarLogCollectClientTest {
         } catch (Exception e) {
             msg = "false";
         }
-        Assertions.assertEquals(msg, "");
+        Assertions.assertEquals(msg, "false");
         pulsarLogCollectClient.close();
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
index 90c59621a..4f0caaccb 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.logging.rocketmq;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
 import 
org.apache.shenyu.plugin.logging.rocketmq.collector.RocketMQLogCollector;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
 
 /**
  * Integrated rocketmq collect log.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingRocketMQPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, RocketMQLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    protected LogCollector logCollector() {
+        return RocketMQLogCollector.getInstance();
     }
 
     /**
-     * get plugin order.
+     * pluginEnum.
      *
-     * @return order
+     * @return plugin
      */
     @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_ROCKETMQ.getCode();
-    }
-
-    /**
-     * get plugin name.
-     *
-     * @return plugin name
-     */
-    @Override
-    public String named() {
-        return PluginEnum.LOGGING_ROCKETMQ.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_ROCKETMQ;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
index 7ed5885ae..f7e2b9cb9 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/client/RocketMQLogCollectClient.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.shenyu.common.utils.JsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
@@ -41,12 +41,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * queue-based logging collector.
  */
-public class RocketMQLogCollectClient implements 
LogConsumeClient<RocketMQLogCollectConfig.RocketMQLogConfig> {
+public class RocketMQLogCollectClient extends 
AbstractLogConsumeClient<RocketMQLogCollectConfig.RocketMQLogConfig> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQLogCollectClient.class);
 
@@ -58,22 +57,13 @@ public class RocketMQLogCollectClient implements 
LogConsumeClient<RocketMQLogCol
 
     private String topic;
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     /**
      * init producer.
      *
      * @param config rocketmq props
      */
     @Override
-    public void initClient(final RocketMQLogCollectConfig.RocketMQLogConfig 
config) {
-        if (Objects.isNull(config)) {
-            LOG.error("RocketMQ props is empty. failed init RocketMQ 
producer");
-            return;
-        }
-        if (isStarted.get()) {
-            close();
-        }
+    public void initClient0(final RocketMQLogCollectConfig.RocketMQLogConfig 
config) {
         String topic = config.getTopic();
         String nameserverAddress = config.getNamesrvAddr();
         String producerGroup = config.getProducerGroup();
@@ -90,7 +80,6 @@ public class RocketMQLogCollectClient implements 
LogConsumeClient<RocketMQLogCol
         try {
             producer.start();
             LOG.info("init RocketMQLogCollectClient success");
-            isStarted.set(true);
             Runtime.getRuntime().addShutdownHook(new Thread(this::close));
         } catch (Exception e) {
             LOG.error("init RocketMQLogCollectClient error", e);
@@ -115,8 +104,8 @@ public class RocketMQLogCollectClient implements 
LogConsumeClient<RocketMQLogCol
      * @param logs list of log
      */
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         logs.forEach(log -> {
@@ -158,10 +147,9 @@ public class RocketMQLogCollectClient implements 
LogConsumeClient<RocketMQLogCol
      * close producer.
      */
     @Override
-    public void close() {
-        if (Objects.nonNull(producer) && isStarted.get()) {
+    public void close0() {
+        if (Objects.nonNull(producer)) {
             producer.shutdown();
-            isStarted.set(false);
         }
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
index 9772778a4..b241443eb 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/config/RocketMQLogCollectConfig.java
@@ -17,10 +17,12 @@
 
 package org.apache.shenyu.plugin.logging.rocketmq.config;
 
-import java.util.Optional;
 import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
 import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
 
+import java.util.Objects;
+import java.util.Optional;
+
 /**
  * log collect config, include rocketmq config.
  * Topic and nameserver must be included, and others are optional.
@@ -174,6 +176,34 @@ public class RocketMQLogCollectConfig {
         public void setSecretKey(final String secretKey) {
             this.secretKey = secretKey;
         }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return Boolean.TRUE;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return Boolean.FALSE;
+            }
+
+            RocketMQLogConfig that = (RocketMQLogConfig) o;
+            return Objects.equals(getTopic(), that.getTopic())
+                    && Objects.equals(getCompressAlg(), that.getCompressAlg())
+                    && Objects.equals(getNamesrvAddr(), that.getNamesrvAddr())
+                    && Objects.equals(getProducerGroup(), 
that.getProducerGroup())
+                    && Objects.equals(getAccessKey(), that.getAccessKey())
+                    && Objects.equals(getSecretKey(), that.getSecretKey())
+                    && Objects.equals(getSampleRate(), that.getSampleRate())
+                    && Objects.equals(getBufferQueueSize(), 
that.getBufferQueueSize())
+                    && Objects.equals(getMaxResponseBody(), 
that.getMaxRequestBody())
+                    && Objects.equals(getMaxRequestBody(), 
that.getMaxResponseBody());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topic, compressAlg, secretKey, accessKey, 
namesrvAddr, producerGroup);
+        }
     }
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
index dc097feb4..310edb080 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/main/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandler.java
@@ -17,49 +17,38 @@
 
 package org.apache.shenyu.plugin.logging.rocketmq.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import 
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import 
org.apache.shenyu.plugin.logging.rocketmq.client.RocketMQLogCollectClient;
 import 
org.apache.shenyu.plugin.logging.rocketmq.collector.RocketMQLogCollector;
 import 
org.apache.shenyu.plugin.logging.rocketmq.config.RocketMQLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The type logging rocketmq plugin data handler.
  */
-public class LoggingRocketMQPluginDataHandler extends 
AbstractPluginDataHandler<RocketMQLogCollectConfig.LogApiConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingRocketMQPluginDataHandler.class);
+public class LoggingRocketMQPluginDataHandler extends 
AbstractLogPluginDataHandler<RocketMQLogCollectConfig.RocketMQLogConfig, 
GenericApiConfig> {
 
     private static final RocketMQLogCollectClient ROCKET_MQ_LOG_COLLECT_CLIENT 
= new RocketMQLogCollectClient();
 
-    public LoggingRocketMQPluginDataHandler() {
-        super(RocketMQLogCollectConfig.LogApiConfig.class);
+    /**
+     * logCollector.
+     */
+    @Override
+    protected LogCollector logCollector() {
+        return RocketMQLogCollector.getInstance();
     }
 
     /**
-     * start or close rocketMQ client.
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
      */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("handler loggingRocketMQ Plugin data:{}", 
GsonUtils.getGson().toJson(pluginData));
-        if (pluginData.getEnabled()) {
-            RocketMQLogCollectConfig.RocketMQLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                RocketMQLogCollectConfig.RocketMQLogConfig.class);
-            
RocketMQLogCollectConfig.INSTANCE.setRocketMQLogConfig(globalLogConfig);
-            // start rocketmq producer
-            ROCKET_MQ_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-            RocketMQLogCollector.getInstance().start();
-        } else {
-            try {
-                RocketMQLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
+    protected void doRefreshConfig(final 
RocketMQLogCollectConfig.RocketMQLogConfig globalLogConfig) {
+        
RocketMQLogCollectConfig.INSTANCE.setRocketMQLogConfig(globalLogConfig);
+        ROCKET_MQ_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
index a9dd90a9e..396e1491d 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/LoggingRocketMQPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingRocketMQPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingRocketMQPlugin.doLogExecute(exchange, 
chain, selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingRocketMQPlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
index 3cdd79f7a..da588c9b5 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-rocketmq/src/test/java/org/apache/shenyu/plugin/logging/rocketmq/handler/LoggingRocketMQPluginDataHandlerTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.shenyu.plugin.logging.rocketmq.handler;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.shenyu.common.dto.ConditionData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.SelectorData;
-import 
org.apache.shenyu.plugin.logging.common.handler.AbstractPluginDataHandler;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import 
org.apache.shenyu.plugin.logging.rocketmq.client.RocketMQLogCollectClient;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * The Test Case For LoggingRocketMQPluginDataHandler.
  */
@@ -73,18 +74,18 @@ public class LoggingRocketMQPluginDataHandlerTest {
     @Test
     public void testHandlerSelector() {
         loggingRocketMQPluginDataHandler.handlerSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
-        
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
+        
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
     }
 
     @Test
     public void testRemoveSelector() {
         loggingRocketMQPluginDataHandler.handlerSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
-        
Assertions.assertNotEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{1=[11]}");
+        
Assertions.assertNotEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
         loggingRocketMQPluginDataHandler.removeSelector(selectorData);
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectIdUriListMap().toString(),
 "{}");
-        
Assertions.assertEquals(AbstractPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectIdUriListMap().toString(),
 "{}");
+        
Assertions.assertEquals(AbstractLogPluginDataHandler.getSelectApiConfigMap().toString(),
 "{}");
     }
 
     @Test
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
index f18ddb4d1..649159c6b 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPlugin.java
@@ -17,18 +17,10 @@
 
 package org.apache.shenyu.plugin.tencent.cls;
 
-import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import 
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
-import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
-import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
-import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
-import org.springframework.http.server.reactive.ServerHttpRequest;
-import org.springframework.web.server.ServerWebExchange;
-import reactor.core.publisher.Mono;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import 
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
 
 /**
  * LoggingTencentClsPlugin send log to Tencent cls service.
@@ -36,35 +28,17 @@ import reactor.core.publisher.Mono;
 public class LoggingTencentClsPlugin extends AbstractLoggingPlugin {
 
     @Override
-    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain,
-                                   final SelectorData selector, final RuleData 
rule,
-                                   final ServerHttpRequest request, final 
ShenyuRequestLog requestInfo) {
-        LoggingServerHttpRequest loggingServerHttpRequest = new 
LoggingServerHttpRequest(request, requestInfo);
-        LoggingServerHttpResponse loggingServerHttpResponse = new 
LoggingServerHttpResponse(exchange.getResponse(),
-                requestInfo, TencentClsSlsLogCollector.getInstance());
-        ServerWebExchange webExchange = 
exchange.mutate().request(loggingServerHttpRequest)
-                .response(loggingServerHttpResponse).build();
-        loggingServerHttpResponse.setExchange(webExchange);
-        return 
chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
-    }
-
-    /**
-     * get plugin order.
-     *
-     * @return order
-     */
-    @Override
-    public int getOrder() {
-        return PluginEnum.LOGGING_TENCENT_CLS.getCode();
+    protected LogCollector logCollector() {
+        return TencentClsSlsLogCollector.getInstance();
     }
 
     /**
-     * get plugin name.
+     * pluginEnum.
      *
-     * @return plugin name
+     * @return plugin
      */
     @Override
-    public String named() {
-        return PluginEnum.LOGGING_TENCENT_CLS.getName();
+    public PluginEnum pluginEnum() {
+        return PluginEnum.LOGGING_TENCENT_CLS;
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
index 7086b4934..d94dc943e 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/client/TencentClsLogCollectClient.java
@@ -33,7 +33,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
 import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.tencent.cls.config.TencentLogCollectConfig;
@@ -48,21 +48,16 @@ import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tencent cls log Collect client.
  */
-public class TencentClsLogCollectClient implements 
LogConsumeClient<TencentLogCollectConfig.TencentClsLogConfig> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TencentClsLogCollectClient.class);
+public class TencentClsLogCollectClient extends 
AbstractLogConsumeClient<TencentLogCollectConfig.TencentClsLogConfig> {
 
     private AsyncProducerClient client;
 
     private String topic;
 
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
     private ThreadPoolExecutor threadExecutor;
 
     /**
@@ -71,14 +66,7 @@ public class TencentClsLogCollectClient implements 
LogConsumeClient<TencentLogCo
      * @param shenyuConfig shenyu log config
      */
     @Override
-    public void initClient(final TencentLogCollectConfig.TencentClsLogConfig 
shenyuConfig) {
-        if (Objects.isNull(shenyuConfig)) {
-            LOG.error("Tencent cls props is empty. failed init Tencent cls 
producer");
-            return;
-        }
-        if (isStarted.get()) {
-            close();
-        }
+    public void initClient0(final TencentLogCollectConfig.TencentClsLogConfig 
shenyuConfig) {
         String secretId = shenyuConfig.getSecretId();
         String secretKey = shenyuConfig.getSecretKey();
         String endpoint = shenyuConfig.getEndpoint();
@@ -116,7 +104,6 @@ public class TencentClsLogCollectClient implements 
LogConsumeClient<TencentLogCo
         threadExecutor = createThreadPoolExecutor(config.getSendThreadCount());
 
         try {
-            isStarted.set(true);
             client = new AsyncProducerClient(config);
         } catch (Exception e) {
             LOG.warn("TencentClsLogCollectClient initClient error message:{}", 
e.getMessage());
@@ -129,17 +116,16 @@ public class TencentClsLogCollectClient implements 
LogConsumeClient<TencentLogCo
      * @param logs list of log
      */
     @Override
-    public void consume(final List<ShenyuRequestLog> logs) {
-        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+    public void consume0(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs)) {
             return;
         }
         logs.forEach(this::sendLog);
     }
 
     @Override
-    public void close() {
-        if (Objects.nonNull(client) && isStarted.get()) {
-            isStarted.set(false);
+    public void close0() {
+        if (Objects.nonNull(client)) {
             try {
                 client.close();
             } catch (InterruptedException | ProducerException e) {
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
index 0e932c68d..58a3cf122 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/collector/TencentClsSlsLogCollector.java
@@ -17,10 +17,10 @@
 
 package org.apache.shenyu.plugin.tencent.cls.collector;
 
-import 
org.apache.shenyu.plugin.tencent.cls.handler.LoggingTencentClsPluginDataHandler;
 import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
 import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import 
org.apache.shenyu.plugin.tencent.cls.handler.LoggingTencentClsPluginDataHandler;
 
 /**
  * Tencent cls log collector,depend a LogConsumeClient for consume logs.
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
index 1692ca842..f0f2ebe49 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/main/java/org/apache/shenyu/plugin/tencent/cls/handler/LoggingTencentClsPluginDataHandler.java
@@ -17,53 +17,38 @@
 
 package org.apache.shenyu.plugin.tencent.cls.handler;
 
-import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.enums.PluginEnum;
-import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.common.config.GenericApiConfig;
+import 
org.apache.shenyu.plugin.logging.common.handler.AbstractLogPluginDataHandler;
 import org.apache.shenyu.plugin.tencent.cls.client.TencentClsLogCollectClient;
 import 
org.apache.shenyu.plugin.tencent.cls.collector.TencentClsSlsLogCollector;
-import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
 import org.apache.shenyu.plugin.tencent.cls.config.TencentLogCollectConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
 
 /**
  * LoggingTencentClsPluginDataHandler Tencent cls plugin data handler.
  */
-public class LoggingTencentClsPluginDataHandler implements PluginDataHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingTencentClsPluginDataHandler.class);
+public class LoggingTencentClsPluginDataHandler extends 
AbstractLogPluginDataHandler<TencentLogCollectConfig.TencentClsLogConfig, 
GenericApiConfig> {
 
     private static final TencentClsLogCollectClient 
TENCENT_CLS_LOG_COLLECT_CLIENT = new TencentClsLogCollectClient();
 
+    /**
+     * logCollector.
+     */
+    @Override
+    protected LogCollector logCollector() {
+        return TencentClsSlsLogCollector.getInstance();
+    }
+
+    /**
+     * doRefreshConfig.
+     *
+     * @param globalLogConfig globalLogConfig
+     */
     @Override
-    public void handlerPlugin(final PluginData pluginData) {
-        LOG.info("Tencent cls plugin data: {}", 
GsonUtils.getGson().toJson(pluginData));
-        if (Objects.nonNull(pluginData) && 
Boolean.TRUE.equals(pluginData.getEnabled())) {
-            TencentLogCollectConfig.TencentClsLogConfig globalLogConfig = 
GsonUtils.getInstance().fromJson(pluginData.getConfig(),
-                    TencentLogCollectConfig.TencentClsLogConfig.class);
-            TencentLogCollectConfig.TencentClsLogConfig exist = 
Singleton.INST.get(TencentLogCollectConfig.TencentClsLogConfig.class);
-            if (Objects.isNull(globalLogConfig)) {
-                return;
-            }
-            if (Objects.isNull(exist) || !globalLogConfig.equals(exist)) {
-                // no data, init client
-                
TencentLogCollectConfig.INSTANCE.setTencentClsLogConfig(globalLogConfig);
-                // init tencent cls client
-                TENCENT_CLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
-                TencentClsSlsLogCollector.getInstance().start();
-                
Singleton.INST.single(TencentLogCollectConfig.TencentClsLogConfig.class, 
globalLogConfig);
-            }
-        } else {
-            try {
-                TencentClsSlsLogCollector.getInstance().close();
-            } catch (Exception e) {
-                LOG.error("close log collector error", e);
-            }
-        }
+    protected void doRefreshConfig(final 
TencentLogCollectConfig.TencentClsLogConfig globalLogConfig) {
+        
TencentLogCollectConfig.INSTANCE.setTencentClsLogConfig(globalLogConfig);
+        TENCENT_CLS_LOG_COLLECT_CLIENT.initClient(globalLogConfig);
     }
 
     @Override
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
index 3a1c5c91e..c06c2e225 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-tencent-cls/src/test/java/org/apache/shenyu/plugin/tencent/cls/LoggingTencentClsPluginTest.java
@@ -90,7 +90,7 @@ public final class LoggingTencentClsPluginTest {
     @Test
     public void testDoExecute() {
         
Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
-        Mono<Void> result = loggingTencentClsPlugin.doLogExecute(exchange, 
chain, selectorData, ruleData, request, requestLog);
+        Mono<Void> result = loggingTencentClsPlugin.doExecute(exchange, chain, 
selectorData, ruleData);
         StepVerifier.create(result).expectSubscription().verifyComplete();
     }
 

Reply via email to