This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new ca31ab0275 [type:feat] add Logging-Kafka Plugin e2e and make 
independent of Logging-rocketmq e2e (#5709)
ca31ab0275 is described below

commit ca31ab0275b799a56d704b145960911f32b75d09
Author: jakiuncle <88994283+jakiun...@users.noreply.github.com>
AuthorDate: Thu Mar 6 09:27:54 2025 +0800

    [type:feat] add Logging-Kafka Plugin e2e and make independent of 
Logging-rocketmq e2e (#5709)
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:fix] fix zookeeper healthy check
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:fix] fix http rocketmq
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * kafka e2e
    
    * kafka e2e
    
    * kafka e2e
    
    * kafka e2e
    
    * spilt logging rocketmq and logging kafka
    
    * checkstyle
    
    * e2e port
    
    * e2e kafka logging
    
    * e2e kafka logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * e2e rocketmq logging
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * [type:feat]add kafka logging e2e test
    
    * add nonull check
    
    * modify kafka namesrvAddr to bootstrapServer
    
    * create kafka topic
    
    * create kafka topic
    
    * create kafka topic
    
    * create kafka topic
    
    * kafka connection
    
    * kafka
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    * kafka connection
    
    ---------
    
    Co-authored-by: aias00 <liuhon...@apache.org>
    Co-authored-by: xiaoyu <xia...@apache.org>
---
 .github/workflows/e2e-k8s.yml                      |   4 +
 db/init/mysql/schema.sql                           |   2 +-
 .../apache/shenyu/admin/ShenyuAdminBootstrap.java  |   4 +-
 .../ShenyuClientRegisterDivideServiceImpl.java     |   3 +
 shenyu-e2e/pom.xml                                 |   1 +
 shenyu-e2e/shenyu-e2e-case/pom.xml                 |   2 +
 .../compose/script/e2e-http-sync-compose.sh        |   4 -
 .../k8s/script/e2e-http-sync.sh                    |   3 +
 .../shenyu-e2e-case/shenyu-e2e-case-http/pom.xml   |   8 --
 .../e2e/testcase/http/DividePluginCases.java       |  86 +----------
 .../shenyu/e2e/testcase/http/DividePluginTest.java |  15 +-
 .../compose/script/e2e-logging-kafka-compose.sh}   |  28 ++--
 .../compose/shenyu-examples-http-compose.yml       |  41 ++++++
 .../compose/shenyu-kafka-compose.yml               |  74 ++++++++++
 .../k8s/script/e2e-http-sync.sh                    |  13 +-
 .../k8s/shenyu-kafka.yml                           | 101 +++++++++++++
 .../pom.xml                                        |   8 +-
 .../testcase/logging/kafka/DividePluginCases.java  | 157 +++++++++++++++++++++
 .../testcase/logging/kafka}/DividePluginTest.java  |  40 ++++--
 .../script/e2e-logging-rocketmq-compose.sh}        |   6 +-
 .../compose/shenyu-examples-http-compose.yml       |  41 ++++++
 .../compose/shenyu-rocketmq-compose.yml            |  51 +++++++
 .../k8s/script/e2e-http-sync.sh                    |  13 +-
 .../k8s/shenyu-kafka.yml                           | 101 +++++++++++++
 .../pom.xml                                        |   4 +-
 .../logging/rocketmq}/DividePluginCases.java       |   2 +-
 .../logging/rocketmq}/DividePluginTest.java        |  23 +--
 .../org/apache/shenyu/e2e/client/WaitDataSync.java |   2 +-
 .../shenyu/e2e/client/gateway/GatewayClient.java   |   4 +
 .../kafka/client/KafkaLogCollectClient.java        |  11 +-
 30 files changed, 674 insertions(+), 178 deletions(-)

diff --git a/.github/workflows/e2e-k8s.yml b/.github/workflows/e2e-k8s.yml
index a9577415aa..62739cb4b3 100644
--- a/.github/workflows/e2e-k8s.yml
+++ b/.github/workflows/e2e-k8s.yml
@@ -218,6 +218,10 @@ jobs:
             script: e2e-cluster-jdbc-compose
           - case: shenyu-e2e-case-cluster
             script: e2e-cluster-zookeeper-compose
+          - case: shenyu-e2e-case-logging-rocketmq
+            script: e2e-logging-rocketmq-compose
+          - case: shenyu-e2e-case-logging-kafka
+            script: e2e-logging-kafka-compose
             
     steps:
       - uses: actions/checkout@v2
diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql
index 4059214b5d..fafe5e102e 100644
--- a/db/init/mysql/schema.sql
+++ b/db/init/mysql/schema.sql
@@ -923,7 +923,7 @@ INSERT INTO `plugin` VALUES ('6', 'dubbo', 
'{\"register\":\"zookeeper://localhos
 INSERT INTO `plugin` VALUES ('8', 'springCloud', NULL, 'Proxy', 200, 0, 
'2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
 INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0, 
'2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
 INSERT INTO `plugin` VALUES ('32', 
'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}', 
'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00',null);
-INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\", 
\"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 
22:00:00',null);
+INSERT INTO `plugin` VALUES ('33', 
'loggingKafka','{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"http://localhost:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}',
 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null);
 INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\": 
\"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\": 
\"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 
21:00:00',null);
 INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', 
'{\"topic":\"shenyu-access-logging\", \"serviceUrl\": 
\"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', 
'2022-06-30 21:00:00',null);
 INSERT INTO `plugin` VALUES ('36', 'loggingTencentCls','{\"endpoint\": 
\"ap-guangzhou.cls.tencentcs.com\", \"topic\": \"shenyu-topic\"}', 'Logging', 
176, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null);
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
index 9bd5026202..796164adcf 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
@@ -22,7 +22,7 @@ import 
org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
 
 /**
- * shenyu admin start.
+ * shenyu admin startShenyuAdminBootstrap.
  */
 @SpringBootApplication(exclude = {LdapAutoConfiguration.class})
 public class ShenyuAdminBootstrap {
@@ -30,7 +30,7 @@ public class ShenyuAdminBootstrap {
     /**
      * Main entrance.
      *
-     * @param args startup arguments
+     * @param args startup arguments.
      */
     public static void main(final String[] args) {
         SpringApplication.run(ShenyuAdminBootstrap.class, args);
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
index 0ec8dcc1d0..33216d68ff 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
@@ -120,6 +120,9 @@ public class ShenyuClientRegisterDivideServiceImpl extends 
AbstractContextPathRe
                 .collect(Collectors.toList());
         final List<DivideUpstream> needToRemove = 
buildDivideUpstreamList(validUriList);
         List<DivideUpstream> existList = 
GsonUtils.getInstance().fromCurrentList(selectorDO.getHandle(), 
DivideUpstream.class);
+        if (CollectionUtils.isEmpty(existList)) {
+            return Constants.SUCCESS;
+        }
         existList.removeAll(needToRemove);
         final String handler = GsonUtils.getInstance().toJson(existList);
         selectorDO.setHandle(handler);
diff --git a/shenyu-e2e/pom.xml b/shenyu-e2e/pom.xml
index 98fcecf3a0..e524324212 100644
--- a/shenyu-e2e/pom.xml
+++ b/shenyu-e2e/pom.xml
@@ -58,6 +58,7 @@
         <guava.version>32.0.0-jre</guava.version>
         <commons-collection.verion>4.4</commons-collection.verion>
         <websocket.version>1.5.1</websocket.version>
+        <kafka-clients.version>3.7.1</kafka-clients.version>
     </properties>
 
     <modules>
diff --git a/shenyu-e2e/shenyu-e2e-case/pom.xml 
b/shenyu-e2e/shenyu-e2e-case/pom.xml
index 901a5c5936..4ca3cc3d31 100644
--- a/shenyu-e2e/shenyu-e2e-case/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/pom.xml
@@ -32,6 +32,8 @@
         <module>shenyu-e2e-case-cluster</module>
         <module>shenyu-e2e-case-storage</module>
         <module>shenyu-e2e-case-http</module>
+        <module>shenyu-e2e-case-logging-kafka</module>
+        <module>shenyu-e2e-case-logging-rocketmq</module>
         <module>shenyu-e2e-case-spring-cloud</module>
         <module>shenyu-e2e-case-apache-dubbo</module>
         <module>shenyu-e2e-case-sofa</module>
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
index a0b102413c..446dae7dfb 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
@@ -37,7 +37,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
   sleep 30s
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31095/actuator/health
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31195/actuator/health
-  docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
   docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d 
--quiet-pull
   sleep 30s
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31189/actuator/health
@@ -55,9 +54,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
     echo "shenyu-bootstrap log:"
     echo "------------------"
     docker compose -f 
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs 
shenyu-bootstrap
-    echo "shenyu-rocketmq log:"
-    echo "------------------"
-    docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
     echo "shenyu-examples-http log:"
     echo "------------------"
     docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs 
shenyu-examples-http
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
index 25f37f01d0..95d7931c0b 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
@@ -37,6 +37,8 @@ for sync in ${SYNC_ARRAY[@]}; do
 
   kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
 
+  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31877/actuator/health
+
   sleep 30s
   echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
   # shellcheck disable=SC2199
@@ -75,6 +77,7 @@ for sync in ${SYNC_ARRAY[@]}; do
   kubectl delete -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
   kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
   kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+
   # shellcheck disable=SC2199
   # shellcheck disable=SC2076
   if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
index 2a3141908e..f326c86009 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
@@ -25,12 +25,4 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>shenyu-e2e-case-http</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>4.9.3</version>
-        </dependency>
-    </dependencies>
 </project>
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
index d42bfc3aab..b6e3f2aab0 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
@@ -18,48 +18,22 @@
 package org.apache.shenyu.e2e.testcase.http;
 
 import com.google.common.collect.Lists;
-import io.restassured.http.Method;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
 import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
 import 
org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
-import org.apache.shenyu.e2e.model.MatchMode;
-import org.apache.shenyu.e2e.model.Plugin;
-import org.apache.shenyu.e2e.model.data.Condition;
-import org.junit.jupiter.api.Assertions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
-import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
-import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
-import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
 
 public class DividePluginCases implements ShenYuScenarioProvider {
 
-    private static final String NAMESERVER = "http://localhost:31876";;
-
-    private static final String CONSUMERGROUP = 
"shenyu-plugin-logging-rocketmq";
-
-    private static final String TOPIC = "shenyu-access-logging";
-
-    private static final String TEST = "/http/order/findById?id=123";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DividePluginCases.class);
-
     @Override
     public List<ScenarioSpec> get() {
         return Lists.newArrayList(
-                testDivideHello(),
-                testRocketMQHello()
+                testDivideHello()
         );
     }
 
@@ -74,62 +48,4 @@ public class DividePluginCases implements 
ShenYuScenarioProvider {
                         .build())
                 .build();
     }
-
-    private ShenYuScenarioSpec testRocketMQHello() {
-        return ShenYuScenarioSpec.builder()
-                .name("testRocketMQHello")
-                .beforeEachSpec(
-                        ShenYuBeforeEachSpec.builder()
-                                .addSelectorAndRule(
-                                        newSelectorBuilder("selector", 
Plugin.LOGGING_ROCKETMQ)
-                                                .name("1")
-                                                .matchMode(MatchMode.OR)
-                                                
.conditionList(newConditions(Condition.ParamType.URI, 
Condition.Operator.STARTS_WITH, "/http"))
-                                                .build(),
-                                        newRuleBuilder("rule")
-                                                .name("1")
-                                                .matchMode(MatchMode.OR)
-                                                
.conditionList(newConditions(Condition.ParamType.URI, 
Condition.Operator.STARTS_WITH, "/http"))
-                                                .build()
-                                )
-                                .checker(exists(TEST))
-                                .build()
-                )
-                .caseSpec(
-                        ShenYuCaseSpec.builder()
-                                .add(request -> {
-                                    AtomicBoolean isLog = new 
AtomicBoolean(false);
-                                    try {
-                                        Thread.sleep(1000 * 30);
-                                        request.request(Method.GET, 
"/http/order/findById?id=23");
-                                        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer(CONSUMERGROUP);
-                                        consumer.setNamesrvAddr(NAMESERVER);
-                                        consumer.subscribe(TOPIC, "*");
-                                        
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, 
consumeConcurrentlyContext) -> {
-                                            LOG.info("Msg:{}", msgs);
-                                            if 
(CollectionUtils.isNotEmpty(msgs)) {
-                                                msgs.forEach(e -> {
-                                                    if (new 
String(e.getBody()).contains("/http/order/findById?id=23")) {
-                                                        isLog.set(true);
-                                                    }
-                                                });
-                                            }
-                                            return 
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                                        });
-                                        LOG.info("consumer.start ; 
isLog.get():{}", isLog.get());
-                                        consumer.start();
-                                        Thread.sleep(1000 * 30);
-                                        LOG.info("isLog.get():{}", 
isLog.get());
-                                        Assertions.assertTrue(isLog.get());
-                                    } catch (Exception e) {
-                                        LOG.error("error", e);
-                                        Assertions.assertTrue(isLog.get());
-                                    }
-                                })
-                                .build()
-                )
-//                .afterEachSpec(ShenYuAfterEachSpec.builder()
-//                        .deleteWaiting(notExists(TEST)).build())
-                .build();
-    }
 }
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
index 1b18091e7c..b66fb619b3 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
 import org.apache.shenyu.e2e.client.WaitDataSync;
 import org.apache.shenyu.e2e.client.admin.AdminClient;
 import org.apache.shenyu.e2e.client.gateway.GatewayClient;
-import org.apache.shenyu.e2e.constant.Constants;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
 import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
@@ -35,9 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID;
@@ -98,23 +95,13 @@ public class DividePluginTest {
 //        selectorIds = Lists.newArrayList();
 //    }
 
+
     @BeforeAll
     void setup(final AdminClient adminClient, final GatewayClient 
gatewayClient) throws Exception {
         adminClient.login();
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, 
gatewayClient::getSelectorCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, 
gatewayClient::getMetaDataCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, 
gatewayClient::getRuleCache, adminClient);
-        LOG.info("start loggingRocketMQ plugin");
-        Map<String, String> reqBody = new HashMap<>();
-        reqBody.put("pluginId", "29");
-        reqBody.put("name", "loggingRocketMQ");
-        reqBody.put("enabled", "true");
-        reqBody.put("role", "Logging");
-        reqBody.put("sort", "170");
-        reqBody.put("namespaceId", 
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
-        reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", 
\"namesrvAddr\": 
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
-        adminClient.changePluginStatus("1801816010882822166", reqBody);
-        WaitDataSync.waitGatewayPluginUse(gatewayClient, 
"org.apache.shenyu.plugin.logging.rocketmq");
     }
 
     @ShenYuScenario(provider = DividePluginCases.class)
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
similarity index 74%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
index a0b102413c..3936dd821a 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
@@ -20,6 +20,9 @@
 SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
 bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
 
+# init ip
+export HOST_IP=$(hostname -I | awk '{print $1}')
+
 # init register center
 CUR_PATH=$(readlink -f "$(dirname "$0")")
 PRGDIR=$(dirname "$CUR_PATH")
@@ -35,16 +38,21 @@ for sync in "${SYNC_ARRAY[@]}"; do
   echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml "
   docker compose -f 
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml up -d --quiet-pull
   sleep 30s
-  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31095/actuator/health
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31195/actuator/health
-  docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
-  docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d 
--quiet-pull
+  docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml up -d --quiet-pull
   sleep 30s
+  # 创建kafka topic
+  echo "create kafka topic shenyu-access-logging"
+  docker exec shenyu-kafka kafka-topics --create --topic shenyu-access-logging 
--partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
+
+#  docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d 
--quiet-pull
+#  sleep 30s
+  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31095/actuator/health
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31189/actuator/health
   sleep 10s
   docker ps -a
   ## run e2e-test
-  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http 
-am test
+  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl 
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
   # shellcheck disable=SC2181
   if (($?)); then
     echo "${sync}-sync-e2e-test failed"
@@ -55,15 +63,15 @@ for sync in "${SYNC_ARRAY[@]}"; do
     echo "shenyu-bootstrap log:"
     echo "------------------"
     docker compose -f 
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs 
shenyu-bootstrap
-    echo "shenyu-rocketmq log:"
-    echo "------------------"
-    docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
-    echo "shenyu-examples-http log:"
+    echo "shenyu-kafka log:"
     echo "------------------"
-    docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs 
shenyu-examples-http
+    docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml logs
+#    echo "kafka-console-consumer log:"
+    timeout 50s docker exec shenyu-kafka kafka-console-consumer --topic 
shenyu-access-logging --bootstrap-server localhost:9092 --from-beginning
     exit 1
   fi
   docker compose -f 
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
-  docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
+  docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml down
+#  docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
   echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml "
 done
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 0000000000..ce8c8a7e7b
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+  shenyu-examples-http:
+    image: shenyu-examples-http:latest
+    container_name: shenyu-examples-http
+    environment:
+      - shenyu.register.serverLists=http://shenyu-admin:9095
+    ports:
+      - "31189:8189"
+    healthcheck:
+      test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health 
| grep UP || exit 1" ]
+      interval: 10s
+      timeout: 2s
+      retries: 3
+      start_period: 10s
+    restart: always
+    networks:
+      - shenyu
+
+networks:
+  shenyu:
+    name: shenyu
+    driver: bridge
+    external: true
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
new file mode 100644
index 0000000000..8487aa1bae
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+  shenyu-zk:
+    container_name: shenyu-zk
+    image: zookeeper:latest
+#    network_mode: "host"
+    ports:
+      - "2181:2181"
+    restart: always
+    environment:
+      - ALLOW_ANONYMOUS_LOGIN=yes
+      - ZOO_PORT=2181
+    networks:
+      - shenyu
+  
+  shenyu-kafka:
+    image: confluentinc/cp-kafka:latest
+    container_name: shenyu-kafka
+    extra_hosts:
+      - "shenyu-kafka:127.0.0.1"
+    depends_on:
+      - shenyu-zk
+    ports:
+      - "9092:9092"
+      - "29092:29092"
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: shenyu-zk:2181
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: 
PLAINTEXT://shenyu-kafka:29092,PLAINTEXT_HOST://localhost:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+    networks:
+      - shenyu
+  
+  shenyu-examples-http:
+    image: shenyu-examples-http:latest
+    container_name: shenyu-examples-http
+    environment:
+      - shenyu.register.serverLists=http://shenyu-admin:9095
+    ports:
+      - "31189:8189"
+    healthcheck:
+      test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health 
| grep UP || exit 1" ]
+      interval: 10s
+      timeout: 2s
+      retries: 3
+      start_period: 10s
+    restart: always
+    networks:
+      - shenyu
+
+networks:
+  shenyu:
+    name: shenyu
+    driver: bridge
+    external: true
\ No newline at end of file
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
similarity index 91%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
index 25f37f01d0..2ca65f63dd 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
@@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
 MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
 for sync in ${SYNC_ARRAY[@]}; do
   echo -e "------------------\n"
-  kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml
-
-  kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+  kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:9092/actuator/health
 
   sleep 30s
   echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
@@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31095/actuator/health
   kubectl apply -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31195/actuator/health
-  kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml
-  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31189/actuator/health
   sleep 10s
   kubectl get pod -o wide
 
   kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print 
$1}')"
 
   ## run e2e-test
-  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http 
-am test
+  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl 
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
   # shellcheck disable=SC2181
   if (($?)); then
     echo "${sync}-sync-e2e-test failed"
@@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do
   kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
   kubectl delete -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
   kubectl delete -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
-  kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
-  kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+  kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
   # shellcheck disable=SC2199
   # shellcheck disable=SC2076
   if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
new file mode 100644
index 0000000000..9c5c05fad9
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: zookeeper
+  namespace: default
+  labels:
+    app: zookeeper
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: zookeeper
+  template:
+    metadata:
+      labels:
+        app: zookeeper
+    spec:
+      containers:
+        - name: zookeeper
+          image: zookeeper:3.7
+          ports:
+            - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: zookeeper
+  namespace: default
+  labels:
+    app: zookeeper
+spec:
+  ports:
+    - port: 2181
+      name: client
+  selector:
+    app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: kafka
+  namespace: default
+  labels:
+    app: kafka
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: kafka
+  template:
+    metadata:
+      labels:
+        app: kafka
+    spec:
+      containers:
+        - name: kafka
+          image: bitnami/kafka:3.6.2
+          env:
+            - name: KAFKA_ADVERTISED_LISTENERS
+              value: PLAINTEXT://kafka:9092
+            - name: KAFKA_ZOOKEEPER_CONNECT
+              value: zookeeper:2181
+          ports:
+            - containerPort: 9092
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: kafka
+  namespace: default
+  labels:
+    app: kafka
+spec:
+  type: NodePort
+  ports:
+    - port: 9092
+      name: client
+      protocol: TCP
+      targetPort: 9092
+      nodePort: 9092
+  selector:
+    app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
similarity index 86%
copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
index 2a3141908e..90ca689ce6 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
@@ -24,13 +24,13 @@
         <version>0.0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>shenyu-e2e-case-http</artifactId>
+    <artifactId>shenyu-e2e-case-logging-kafka</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>4.9.3</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
         </dependency>
     </dependencies>
 </project>
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
new file mode 100644
index 0000000000..010c54867f
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import io.restassured.http.Method;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
+import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
+import 
org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
+import org.apache.shenyu.e2e.model.MatchMode;
+import org.apache.shenyu.e2e.model.Plugin;
+import org.apache.shenyu.e2e.model.data.Condition;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
+import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
+import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
+import static 
org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
+
+public class DividePluginCases implements ShenYuScenarioProvider {
+
+    private static final String TOPIC = "shenyu-access-logging";
+
+    private static final String TEST = "/http/order/findById?id=123";
+    
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DividePluginCases.class);
+
+    @Override
+    public List<ScenarioSpec> get() {
+        return Lists.newArrayList(
+                testDivideHello(),
+                testKafkaHello()
+        );
+    }
+
+    private ShenYuScenarioSpec testDivideHello() {
+        return ShenYuScenarioSpec.builder()
+                .name("http client hello1")
+                .beforeEachSpec(ShenYuBeforeEachSpec.builder()
+                        .checker(exists(TEST))
+                        .build())
+                .caseSpec(ShenYuCaseSpec.builder()
+                        .addExists(TEST)
+                        .build())
+                .build();
+    }
+
+    private ShenYuScenarioSpec testKafkaHello() {
+        return ShenYuScenarioSpec.builder()
+                .name("testKafkaHello")
+                .beforeEachSpec(
+                        ShenYuBeforeEachSpec.builder()
+                                .addSelectorAndRule(
+                                        newSelectorBuilder("selector", 
Plugin.LOGGING_KAFKA)
+                                                .name("2")
+                                                .matchMode(MatchMode.OR)
+                                                
.conditionList(newConditions(Condition.ParamType.URI, 
Condition.Operator.STARTS_WITH, "/http"))
+                                                .build(),
+                                        newRuleBuilder("rule")
+                                                .name("2")
+                                                .matchMode(MatchMode.OR)
+                                                
.conditionList(newConditions(Condition.ParamType.URI, 
Condition.Operator.STARTS_WITH, "/http"))
+                                                .build()
+                                )
+                                .checker(exists(TEST))
+                                .build()
+                )
+                .caseSpec(
+                        ShenYuCaseSpec.builder()
+                                .add(request -> {
+                                    AtomicBoolean messageFound = new 
AtomicBoolean(false);
+                                    try {
+                                        // Send request first
+                                        request.request(Method.GET, 
"/http/order/findById?id=23");
+                                        
+                                        Properties properties = new 
Properties();
+                                        
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+                                        
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "shenyu-consumer-group");
+                                        
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+                                        
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+                                        
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+                                        
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+                                        
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+                                        
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
+                                        
+                                        try (KafkaConsumer<String, String> 
consumer = new KafkaConsumer<>(properties)) {
+                                            
consumer.subscribe(Arrays.asList(TOPIC));
+                                            
+                                            Instant start = Instant.now();
+                                            // Set timeout to 30 seconds
+                                            while (Duration.between(start, 
Instant.now()).getSeconds() < 90) {
+                                                ConsumerRecords<String, 
String> records = consumer.poll(Duration.ofMillis(1000));
+                                                LOG.info("records.count:{}", 
records.count());
+                                                
+                                                for (var record : records) {
+                                                    String message = 
record.value();
+                                                    LOG.info("kafka 
message:{}", message);
+                                                    if 
(message.contains("/http/order/findById")) {
+                                                        messageFound.set(true);
+                                                        consumer.commitSync();
+                                                        break;
+                                                    }
+                                                }
+                                                
+                                                if (messageFound.get()) {
+                                                    break;
+                                                }
+                                            }
+                                            
+                                            if (!messageFound.get()) {
+                                                LOG.error("Timeout waiting for 
kafka message");
+                                                Assertions.fail("Did not 
receive expected message within timeout period");
+                                            }
+                                            
+                                            
Assertions.assertTrue(messageFound.get(), "Expected message was not found in 
Kafka topic");
+                                        }
+                                    } catch (Exception e) {
+                                        LOG.error("Error during kafka message 
consumption", e);
+                                        throw new RuntimeException("Failed to 
consume kafka message", e);
+                                    }
+                                }).build()
+                ).build();
+    }
+}
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
similarity index 80%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
index 1b18091e7c..ea32a50e4c 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.kafka;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import org.apache.shenyu.e2e.client.WaitDataSync;
 import org.apache.shenyu.e2e.client.admin.AdminClient;
@@ -24,12 +25,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient;
 import org.apache.shenyu.e2e.constant.Constants;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
 import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
 import org.apache.shenyu.e2e.model.ResourcesData;
 import org.apache.shenyu.e2e.model.data.BindingData;
 import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
@@ -65,7 +68,9 @@ import static 
org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAM
 public class DividePluginTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DividePluginTest.class);
-
+    
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    
     private List<String> selectorIds = Lists.newArrayList();
 
     @BeforeEach
@@ -91,30 +96,35 @@ public class DividePluginTest {
         spec.getWaiting().waitFor(gateway);
     }
 
-//    @AfterEach
-//    void after(final AdminClient client, final GatewayClient gateway, final 
AfterEachSpec spec) {
-//        spec.getDeleter().delete(client, selectorIds);
-//        spec.deleteWaiting().waitFor(gateway);
-//        selectorIds = Lists.newArrayList();
-//    }
+    @AfterEach
+    void after(final AdminClient client, final GatewayClient gateway, final 
AfterEachSpec spec) {
+        spec.getDeleter().delete(client, selectorIds);
+        spec.deleteWaiting().waitFor(gateway);
+        selectorIds = Lists.newArrayList();
+    }
 
     @BeforeAll
     void setup(final AdminClient adminClient, final GatewayClient 
gatewayClient) throws Exception {
         adminClient.login();
+        
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, 
gatewayClient::getSelectorCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, 
gatewayClient::getMetaDataCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, 
gatewayClient::getRuleCache, adminClient);
-        LOG.info("start loggingRocketMQ plugin");
+        
         Map<String, String> reqBody = new HashMap<>();
-        reqBody.put("pluginId", "29");
-        reqBody.put("name", "loggingRocketMQ");
+        LOG.info("start loggingKafka plugin");
+        reqBody.put("pluginId", "33");
+        reqBody.put("name", "loggingKafka");
         reqBody.put("enabled", "true");
         reqBody.put("role", "Logging");
-        reqBody.put("sort", "170");
+        reqBody.put("sort", "180");
         reqBody.put("namespaceId", 
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
-        reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", 
\"namesrvAddr\": 
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
-        adminClient.changePluginStatus("1801816010882822166", reqBody);
-        WaitDataSync.waitGatewayPluginUse(gatewayClient, 
"org.apache.shenyu.plugin.logging.rocketmq");
+        reqBody.put("config",
+                
"{\"topic\":\"shenyu-access-logging\",\"bootstrapServer\":\"shenyu-kafka:29092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}");
+        adminClient.changePluginStatus("1801816010882822171", reqBody);
+//        TimeUnit.SECONDS.sleep(5);
+//        Map<String, Integer> plugins = gatewayClient.getPlugins();
+        WaitDataSync.waitGatewayPluginUse(gatewayClient, 
"org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin");
     }
 
     @ShenYuScenario(provider = DividePluginCases.class)
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
similarity index 94%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
index a0b102413c..f62d9b33d1 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
@@ -44,7 +44,7 @@ for sync in "${SYNC_ARRAY[@]}"; do
   sleep 10s
   docker ps -a
   ## run e2e-test
-  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http 
-am test
+  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl 
shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq -am test
   # shellcheck disable=SC2181
   if (($?)); then
     echo "${sync}-sync-e2e-test failed"
@@ -58,12 +58,10 @@ for sync in "${SYNC_ARRAY[@]}"; do
     echo "shenyu-rocketmq log:"
     echo "------------------"
     docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
-    echo "shenyu-examples-http log:"
-    echo "------------------"
-    docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs 
shenyu-examples-http
     exit 1
   fi
   docker compose -f 
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
+  docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml down
   docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
   echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml "
 done
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 0000000000..ce8c8a7e7b
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+  shenyu-examples-http:
+    image: shenyu-examples-http:latest
+    container_name: shenyu-examples-http
+    environment:
+      - shenyu.register.serverLists=http://shenyu-admin:9095
+    ports:
+      - "31189:8189"
+    healthcheck:
+      test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health 
| grep UP || exit 1" ]
+      interval: 10s
+      timeout: 2s
+      retries: 3
+      start_period: 10s
+    restart: always
+    networks:
+      - shenyu
+
+networks:
+  shenyu:
+    name: shenyu
+    driver: bridge
+    external: true
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
new file mode 100644
index 0000000000..5f34a739ca
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+  rocketmq-dialevoneid:
+    image: rocketmqinc/rocketmq:4.4.0
+    container_name: rocketmq-dialevoneid
+    command: [ "/bin/sh", "mqnamesrv" ]
+    ports:
+      - "31876:9876"
+    environment:
+      - TZ=Asia/Shanghai
+    restart: always
+    networks:
+      - shenyu
+
+  rocketmq-broker:
+    image: rocketmqinc/rocketmq:4.4.0
+    container_name: rocketmq-broker
+    command: [ "/bin/sh", "mqbroker" ]
+    ports:
+      - "10909:10909"
+      - "10911:10911"
+      - "10912:10912"
+    environment:
+      - NAMESRV_ADDR=rocketmq-dialevoneid:9876
+      - TZ=Asia/Shanghai
+    restart: always
+    networks:
+      - shenyu
+
+networks:
+  shenyu:
+    name: shenyu
+    driver: bridge
+    external: true
\ No newline at end of file
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
similarity index 91%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
index 25f37f01d0..2ca65f63dd 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
@@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
 MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
 for sync in ${SYNC_ARRAY[@]}; do
   echo -e "------------------\n"
-  kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml
-
-  kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+  kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:9092/actuator/health
 
   sleep 30s
   echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml 
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
@@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31095/actuator/health
   kubectl apply -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
   sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31195/actuator/health
-  kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml
-  sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh 
http://localhost:31189/actuator/health
   sleep 10s
   kubectl get pod -o wide
 
   kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print 
$1}')"
 
   ## run e2e-test
-  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http 
-am test
+  ./mvnw -B -f ./shenyu-e2e/pom.xml -pl 
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
   # shellcheck disable=SC2181
   if (($?)); then
     echo "${sync}-sync-e2e-test failed"
@@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do
   kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
   kubectl delete -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
   kubectl delete -f 
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
-  kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
-  kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+  kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
   # shellcheck disable=SC2199
   # shellcheck disable=SC2076
   if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
new file mode 100644
index 0000000000..94a63c58ae
--- /dev/null
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: zookeeper
+  namespace: default
+  labels:
+    app: zookeeper
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: zookeeper
+  template:
+    metadata:
+      labels:
+        app: zookeeper
+    spec:
+      containers:
+        - name: zookeeper
+          image: zookeeper:3.7
+          ports:
+            - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: zookeeper
+  namespace: default
+  labels:
+    app: zookeeper
+spec:
+  ports:
+    - port: 2181
+      name: client
+  selector:
+    app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: kafka
+  namespace: default
+  labels:
+    app: kafka
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: kafka
+  template:
+    metadata:
+      labels:
+        app: kafka
+    spec:
+      containers:
+        - name: kafka
+          image: bitnami/kafka:3.6.2
+          env:
+            - name: KAFKA_ADVERTISED_LISTENERS
+              value: PLAINTEXT://kafka:9092
+            - name: KAFKA_ZOOKEEPER_CONNECT
+              value: zookeeper:2181
+          ports:
+            - containerPort: 31877
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: kafka
+  namespace: default
+  labels:
+    app: kafka
+spec:
+  type: NodePort
+  ports:
+    - port: 31877
+      name: client
+      protocol: TCP
+      targetPort: 31877
+      nodePort: 31877
+  selector:
+    app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
similarity index 95%
copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
index 2a3141908e..f9e8eef963 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
@@ -24,8 +24,8 @@
         <version>0.0.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>shenyu-e2e-case-http</artifactId>
-
+    <artifactId>shenyu-e2e-case-logging-rocketmq</artifactId>
+    
     <dependencies>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
similarity index 99%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
index d42bfc3aab..bc24271d34 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
 
 import com.google.common.collect.Lists;
 import io.restassured.http.Method;
diff --git 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
similarity index 89%
copy from 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
copy to 
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
index 1b18091e7c..66f6ee5bbe 100644
--- 
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++ 
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
 
 import com.google.common.collect.Lists;
 import org.apache.shenyu.e2e.client.WaitDataSync;
@@ -24,12 +24,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient;
 import org.apache.shenyu.e2e.constant.Constants;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
 import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
 import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
 import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
 import org.apache.shenyu.e2e.model.ResourcesData;
 import org.apache.shenyu.e2e.model.data.BindingData;
 import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
@@ -91,19 +93,21 @@ public class DividePluginTest {
         spec.getWaiting().waitFor(gateway);
     }
 
-//    @AfterEach
-//    void after(final AdminClient client, final GatewayClient gateway, final 
AfterEachSpec spec) {
-//        spec.getDeleter().delete(client, selectorIds);
-//        spec.deleteWaiting().waitFor(gateway);
-//        selectorIds = Lists.newArrayList();
-//    }
+    @AfterEach
+    void after(final AdminClient client, final GatewayClient gateway, final 
AfterEachSpec spec) {
+        spec.getDeleter().delete(client, selectorIds);
+        spec.deleteWaiting().waitFor(gateway);
+        selectorIds = Lists.newArrayList();
+    }
 
     @BeforeAll
     void setup(final AdminClient adminClient, final GatewayClient 
gatewayClient) throws Exception {
         adminClient.login();
+        
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, 
gatewayClient::getSelectorCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, 
gatewayClient::getMetaDataCache, adminClient);
         
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, 
gatewayClient::getRuleCache, adminClient);
+        
         LOG.info("start loggingRocketMQ plugin");
         Map<String, String> reqBody = new HashMap<>();
         reqBody.put("pluginId", "29");
@@ -114,7 +118,10 @@ public class DividePluginTest {
         reqBody.put("namespaceId", 
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
         reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", 
\"namesrvAddr\": 
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
         adminClient.changePluginStatus("1801816010882822166", reqBody);
-        WaitDataSync.waitGatewayPluginUse(gatewayClient, 
"org.apache.shenyu.plugin.logging.rocketmq");
+        Map<String, Integer> plugins = gatewayClient.getPlugins();
+        LOG.info("shenyu e2e plugin list ={}", plugins);
+        WaitDataSync.waitGatewayPluginUse(gatewayClient, 
"org.apache.shenyu.plugin.logging.rocketmq.LoggingRocketMQPlugin");
+        
     }
 
     @ShenYuScenario(provider = DividePluginCases.class)
diff --git 
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
 
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
index ce3efdd91c..518eb461e4 100644
--- 
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
+++ 
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
@@ -83,7 +83,7 @@ public class WaitDataSync {
         Map<String, Integer> pluginMap = gatewayClient.getPlugins();
         int retryNum = 0;
         boolean existPlugin = false;
-        while (!existPlugin && retryNum < 5) {
+        while (!existPlugin && retryNum < 10) {
             for (String plugin : pluginMap.keySet()) {
                 if (plugin.startsWith(pluginClass)) {
                     existPlugin = true;
diff --git 
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
 
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
index 5ffd1a61eb..50afe58dd7 100644
--- 
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
+++ 
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.restassured.response.Response;
 import io.restassured.specification.RequestSpecification;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.shenyu.e2e.annotation.ShenYuGatewayClient;
 import org.apache.shenyu.e2e.client.BaseClient;
 import org.apache.shenyu.e2e.common.RequestLogConsumer;
@@ -178,6 +179,9 @@ public class GatewayClient extends BaseClient {
         List<SelectorCacheData> selectorDataList = new ArrayList<>();
         for (Map.Entry entry : s.entrySet()) {
             List list = (List) entry.getValue();
+            if (CollectionUtils.isEmpty(list)) {
+                continue;
+            }
             String json = MAPPER.writeValueAsString(list.get(0));
             SelectorCacheData selectorData = MAPPER.readValue(json, 
SelectorCacheData.class);
             selectorDataList.add(selectorData);
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index 6d12c447ff..42c3bf33a4 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.common.utils.JsonUtils;
 import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
 import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
@@ -65,12 +66,13 @@ public class KafkaLogCollectClient extends 
AbstractLogConsumeClient<KafkaLogColl
      */
     @Override
     public void initClient0(@NonNull final 
KafkaLogCollectConfig.KafkaLogConfig config) {
-        if (Objects.isNull(config)
-                || StringUtils.isBlank(config.getBootstrapServer())
-                || StringUtils.isBlank(config.getTopic())) {
+        if (StringUtils.isBlank(config.getBootstrapServer()) || 
StringUtils.isBlank(config.getTopic())) {
             LOG.error("kafka props is empty. failed init kafka producer");
             return;
         }
+
+        LOG.info("initClient0:{}", GsonUtils.getInstance().toJson(config));
+        
         String topic = config.getTopic();
         String nameserverAddress = config.getBootstrapServer();
 
@@ -122,11 +124,14 @@ public class KafkaLogCollectClient extends 
AbstractLogConsumeClient<KafkaLogColl
                     .map(apiConfig -> 
StringUtils.defaultIfBlank(apiConfig.getTopic(), topic)
                     ).orElse(topic);
             try {
+                LOG.info("logTopic:{}, log:{}", logTopic, log);
                 producer.send(toProducerRecord(logTopic, log), (metadata, 
exception) -> {
+                    LOG.info("kafka push logs metadata:{}", 
GsonUtils.getInstance().toJson(metadata));
                     if (Objects.nonNull(exception)) {
                         LOG.error("kafka push logs error", exception);
                     }
                 });
+                producer.flush();
             } catch (Exception e) {
                 LOG.error("kafka push logs error", e);
             }

Reply via email to