This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push: new ca31ab0275 [type:feat] add Logging-Kafka Plugin e2e and make independent of Logging-rocketmq e2e (#5709) ca31ab0275 is described below commit ca31ab0275b799a56d704b145960911f32b75d09 Author: jakiuncle <88994283+jakiun...@users.noreply.github.com> AuthorDate: Thu Mar 6 09:27:54 2025 +0800 [type:feat] add Logging-Kafka Plugin e2e and make independent of Logging-rocketmq e2e (#5709) * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:fix] fix zookeeper healthy check * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:fix] fix http rocketmq * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * kafka e2e * kafka e2e * kafka e2e * kafka e2e * spilt logging rocketmq and logging kafka * checkstyle * e2e port * e2e kafka logging * e2e kafka logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * e2e rocketmq logging * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * [type:feat]add kafka logging e2e test * add nonull check * modify kafka namesrvAddr to bootstrapServer * create kafka topic * create kafka topic * create kafka topic * create kafka topic * kafka connection * kafka * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection * kafka connection --------- Co-authored-by: aias00 <liuhon...@apache.org> Co-authored-by: xiaoyu <xia...@apache.org> --- .github/workflows/e2e-k8s.yml | 4 + db/init/mysql/schema.sql | 2 +- .../apache/shenyu/admin/ShenyuAdminBootstrap.java | 4 +- .../ShenyuClientRegisterDivideServiceImpl.java | 3 + shenyu-e2e/pom.xml | 1 + shenyu-e2e/shenyu-e2e-case/pom.xml | 2 + .../compose/script/e2e-http-sync-compose.sh | 4 - .../k8s/script/e2e-http-sync.sh | 3 + .../shenyu-e2e-case/shenyu-e2e-case-http/pom.xml | 8 -- .../e2e/testcase/http/DividePluginCases.java | 86 +---------- .../shenyu/e2e/testcase/http/DividePluginTest.java | 15 +- .../compose/script/e2e-logging-kafka-compose.sh} | 28 ++-- .../compose/shenyu-examples-http-compose.yml | 41 ++++++ .../compose/shenyu-kafka-compose.yml | 74 ++++++++++ .../k8s/script/e2e-http-sync.sh | 13 +- .../k8s/shenyu-kafka.yml | 101 +++++++++++++ .../pom.xml | 8 +- .../testcase/logging/kafka/DividePluginCases.java | 157 +++++++++++++++++++++ .../testcase/logging/kafka}/DividePluginTest.java | 40 ++++-- .../script/e2e-logging-rocketmq-compose.sh} | 6 +- .../compose/shenyu-examples-http-compose.yml | 41 ++++++ .../compose/shenyu-rocketmq-compose.yml | 51 +++++++ .../k8s/script/e2e-http-sync.sh | 13 +- .../k8s/shenyu-kafka.yml | 101 +++++++++++++ .../pom.xml | 4 +- .../logging/rocketmq}/DividePluginCases.java | 2 +- .../logging/rocketmq}/DividePluginTest.java | 23 +-- .../org/apache/shenyu/e2e/client/WaitDataSync.java | 2 +- .../shenyu/e2e/client/gateway/GatewayClient.java | 4 + .../kafka/client/KafkaLogCollectClient.java | 11 +- 30 files changed, 674 insertions(+), 178 deletions(-) diff --git a/.github/workflows/e2e-k8s.yml b/.github/workflows/e2e-k8s.yml index a9577415aa..62739cb4b3 100644 --- a/.github/workflows/e2e-k8s.yml +++ b/.github/workflows/e2e-k8s.yml @@ -218,6 +218,10 @@ jobs: script: e2e-cluster-jdbc-compose - case: shenyu-e2e-case-cluster script: e2e-cluster-zookeeper-compose + - case: shenyu-e2e-case-logging-rocketmq + script: e2e-logging-rocketmq-compose + - case: shenyu-e2e-case-logging-kafka + script: e2e-logging-kafka-compose steps: - uses: actions/checkout@v2 diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql index 4059214b5d..fafe5e102e 100644 --- a/db/init/mysql/schema.sql +++ b/db/init/mysql/schema.sql @@ -923,7 +923,7 @@ INSERT INTO `plugin` VALUES ('6', 'dubbo', '{\"register\":\"zookeeper://localhos INSERT INTO `plugin` VALUES ('8', 'springCloud', NULL, 'Proxy', 200, 0, '2022-05-25 18:02:53', '2022-05-25 18:02:53',null); INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0, '2022-05-25 18:02:53', '2022-05-25 18:02:53',null); INSERT INTO `plugin` VALUES ('32', 'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00',null); -INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\", \"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null); +INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"http://localhost:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null); INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\": \"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\": \"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null); INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null); INSERT INTO `plugin` VALUES ('36', 'loggingTencentCls','{\"endpoint\": \"ap-guangzhou.cls.tencentcs.com\", \"topic\": \"shenyu-topic\"}', 'Logging', 176, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null); diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java index 9bd5026202..796164adcf 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java @@ -22,7 +22,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration; /** - * shenyu admin start. + * shenyu admin startShenyuAdminBootstrap. */ @SpringBootApplication(exclude = {LdapAutoConfiguration.class}) public class ShenyuAdminBootstrap { @@ -30,7 +30,7 @@ public class ShenyuAdminBootstrap { /** * Main entrance. * - * @param args startup arguments + * @param args startup arguments. */ public static void main(final String[] args) { SpringApplication.run(ShenyuAdminBootstrap.class, args); diff --git a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java index 0ec8dcc1d0..33216d68ff 100644 --- a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java +++ b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java @@ -120,6 +120,9 @@ public class ShenyuClientRegisterDivideServiceImpl extends AbstractContextPathRe .collect(Collectors.toList()); final List<DivideUpstream> needToRemove = buildDivideUpstreamList(validUriList); List<DivideUpstream> existList = GsonUtils.getInstance().fromCurrentList(selectorDO.getHandle(), DivideUpstream.class); + if (CollectionUtils.isEmpty(existList)) { + return Constants.SUCCESS; + } existList.removeAll(needToRemove); final String handler = GsonUtils.getInstance().toJson(existList); selectorDO.setHandle(handler); diff --git a/shenyu-e2e/pom.xml b/shenyu-e2e/pom.xml index 98fcecf3a0..e524324212 100644 --- a/shenyu-e2e/pom.xml +++ b/shenyu-e2e/pom.xml @@ -58,6 +58,7 @@ <guava.version>32.0.0-jre</guava.version> <commons-collection.verion>4.4</commons-collection.verion> <websocket.version>1.5.1</websocket.version> + <kafka-clients.version>3.7.1</kafka-clients.version> </properties> <modules> diff --git a/shenyu-e2e/shenyu-e2e-case/pom.xml b/shenyu-e2e/shenyu-e2e-case/pom.xml index 901a5c5936..4ca3cc3d31 100644 --- a/shenyu-e2e/shenyu-e2e-case/pom.xml +++ b/shenyu-e2e/shenyu-e2e-case/pom.xml @@ -32,6 +32,8 @@ <module>shenyu-e2e-case-cluster</module> <module>shenyu-e2e-case-storage</module> <module>shenyu-e2e-case-http</module> + <module>shenyu-e2e-case-logging-kafka</module> + <module>shenyu-e2e-case-logging-rocketmq</module> <module>shenyu-e2e-case-spring-cloud</module> <module>shenyu-e2e-case-apache-dubbo</module> <module>shenyu-e2e-case-sofa</module> diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh index a0b102413c..446dae7dfb 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh @@ -37,7 +37,6 @@ for sync in "${SYNC_ARRAY[@]}"; do sleep 30s sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health - docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull sleep 30s sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health @@ -55,9 +54,6 @@ for sync in "${SYNC_ARRAY[@]}"; do echo "shenyu-bootstrap log:" echo "------------------" docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-bootstrap - echo "shenyu-rocketmq log:" - echo "------------------" - docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs echo "shenyu-examples-http log:" echo "------------------" docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs shenyu-examples-http diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh index 25f37f01d0..95d7931c0b 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh @@ -37,6 +37,8 @@ for sync in ${SYNC_ARRAY[@]}; do kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml + sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31877/actuator/health + sleep 30s echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml" # shellcheck disable=SC2199 @@ -75,6 +77,7 @@ for sync in ${SYNC_ARRAY[@]}; do kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml + # shellcheck disable=SC2199 # shellcheck disable=SC2076 if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml index 2a3141908e..f326c86009 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml @@ -25,12 +25,4 @@ </parent> <modelVersion>4.0.0</modelVersion> <artifactId>shenyu-e2e-case-http</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - <version>4.9.3</version> - </dependency> - </dependencies> </project> diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java index d42bfc3aab..b6e3f2aab0 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java @@ -18,48 +18,22 @@ package org.apache.shenyu.e2e.testcase.http; import com.google.common.collect.Lists; -import io.restassured.http.Method; -import org.apache.commons.collections.CollectionUtils; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider; import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec; import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec; import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec; import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec; -import org.apache.shenyu.e2e.model.MatchMode; -import org.apache.shenyu.e2e.model.Plugin; -import org.apache.shenyu.e2e.model.data.Condition; -import org.junit.jupiter.api.Assertions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists; -import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions; -import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder; -import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder; public class DividePluginCases implements ShenYuScenarioProvider { - private static final String NAMESERVER = "http://localhost:31876"; - - private static final String CONSUMERGROUP = "shenyu-plugin-logging-rocketmq"; - - private static final String TOPIC = "shenyu-access-logging"; - - private static final String TEST = "/http/order/findById?id=123"; - - private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class); - @Override public List<ScenarioSpec> get() { return Lists.newArrayList( - testDivideHello(), - testRocketMQHello() + testDivideHello() ); } @@ -74,62 +48,4 @@ public class DividePluginCases implements ShenYuScenarioProvider { .build()) .build(); } - - private ShenYuScenarioSpec testRocketMQHello() { - return ShenYuScenarioSpec.builder() - .name("testRocketMQHello") - .beforeEachSpec( - ShenYuBeforeEachSpec.builder() - .addSelectorAndRule( - newSelectorBuilder("selector", Plugin.LOGGING_ROCKETMQ) - .name("1") - .matchMode(MatchMode.OR) - .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) - .build(), - newRuleBuilder("rule") - .name("1") - .matchMode(MatchMode.OR) - .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) - .build() - ) - .checker(exists(TEST)) - .build() - ) - .caseSpec( - ShenYuCaseSpec.builder() - .add(request -> { - AtomicBoolean isLog = new AtomicBoolean(false); - try { - Thread.sleep(1000 * 30); - request.request(Method.GET, "/http/order/findById?id=23"); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMERGROUP); - consumer.setNamesrvAddr(NAMESERVER); - consumer.subscribe(TOPIC, "*"); - consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> { - LOG.info("Msg:{}", msgs); - if (CollectionUtils.isNotEmpty(msgs)) { - msgs.forEach(e -> { - if (new String(e.getBody()).contains("/http/order/findById?id=23")) { - isLog.set(true); - } - }); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - }); - LOG.info("consumer.start ; isLog.get():{}", isLog.get()); - consumer.start(); - Thread.sleep(1000 * 30); - LOG.info("isLog.get():{}", isLog.get()); - Assertions.assertTrue(isLog.get()); - } catch (Exception e) { - LOG.error("error", e); - Assertions.assertTrue(isLog.get()); - } - }) - .build() - ) -// .afterEachSpec(ShenYuAfterEachSpec.builder() -// .deleteWaiting(notExists(TEST)).build()) - .build(); - } } diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java index 1b18091e7c..b66fb619b3 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import org.apache.shenyu.e2e.client.WaitDataSync; import org.apache.shenyu.e2e.client.admin.AdminClient; import org.apache.shenyu.e2e.client.gateway.GatewayClient; -import org.apache.shenyu.e2e.constant.Constants; import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario; import org.apache.shenyu.e2e.engine.annotation.ShenYuTest; import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec; @@ -35,9 +34,7 @@ import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import static org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID; @@ -98,23 +95,13 @@ public class DividePluginTest { // selectorIds = Lists.newArrayList(); // } + @BeforeAll void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception { adminClient.login(); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient); - LOG.info("start loggingRocketMQ plugin"); - Map<String, String> reqBody = new HashMap<>(); - reqBody.put("pluginId", "29"); - reqBody.put("name", "loggingRocketMQ"); - reqBody.put("enabled", "true"); - reqBody.put("role", "Logging"); - reqBody.put("sort", "170"); - reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID); - reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}"); - adminClient.changePluginStatus("1801816010882822166", reqBody); - WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq"); } @ShenYuScenario(provider = DividePluginCases.class) diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh similarity index 74% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh index a0b102413c..3936dd821a 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh @@ -20,6 +20,9 @@ SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")") bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh +# init ip +export HOST_IP=$(hostname -I | awk '{print $1}') + # init register center CUR_PATH=$(readlink -f "$(dirname "$0")") PRGDIR=$(dirname "$CUR_PATH") @@ -35,16 +38,21 @@ for sync in "${SYNC_ARRAY[@]}"; do echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml " docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml up -d --quiet-pull sleep 30s - sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health - docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull - docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull + docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml up -d --quiet-pull sleep 30s + # 创建kafka topic + echo "create kafka topic shenyu-access-logging" + docker exec shenyu-kafka kafka-topics --create --topic shenyu-access-logging --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 + +# docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d --quiet-pull +# sleep 30s + sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health sleep 10s docker ps -a ## run e2e-test - ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http -am test + ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test # shellcheck disable=SC2181 if (($?)); then echo "${sync}-sync-e2e-test failed" @@ -55,15 +63,15 @@ for sync in "${SYNC_ARRAY[@]}"; do echo "shenyu-bootstrap log:" echo "------------------" docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs shenyu-bootstrap - echo "shenyu-rocketmq log:" - echo "------------------" - docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs - echo "shenyu-examples-http log:" + echo "shenyu-kafka log:" echo "------------------" - docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs shenyu-examples-http + docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml logs +# echo "kafka-console-consumer log:" + timeout 50s docker exec shenyu-kafka kafka-console-consumer --topic shenyu-access-logging --bootstrap-server localhost:9092 --from-beginning exit 1 fi docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down - docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down + docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml down +# docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml " done diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml new file mode 100644 index 0000000000..ce8c8a7e7b --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.9' + +services: + shenyu-examples-http: + image: shenyu-examples-http:latest + container_name: shenyu-examples-http + environment: + - shenyu.register.serverLists=http://shenyu-admin:9095 + ports: + - "31189:8189" + healthcheck: + test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health | grep UP || exit 1" ] + interval: 10s + timeout: 2s + retries: 3 + start_period: 10s + restart: always + networks: + - shenyu + +networks: + shenyu: + name: shenyu + driver: bridge + external: true diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml new file mode 100644 index 0000000000..8487aa1bae --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.9' + +services: + shenyu-zk: + container_name: shenyu-zk + image: zookeeper:latest +# network_mode: "host" + ports: + - "2181:2181" + restart: always + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + - ZOO_PORT=2181 + networks: + - shenyu + + shenyu-kafka: + image: confluentinc/cp-kafka:latest + container_name: shenyu-kafka + extra_hosts: + - "shenyu-kafka:127.0.0.1" + depends_on: + - shenyu-zk + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: shenyu-zk:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://shenyu-kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - shenyu + + shenyu-examples-http: + image: shenyu-examples-http:latest + container_name: shenyu-examples-http + environment: + - shenyu.register.serverLists=http://shenyu-admin:9095 + ports: + - "31189:8189" + healthcheck: + test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health | grep UP || exit 1" ] + interval: 10s + timeout: 2s + retries: 3 + start_period: 10s + restart: always + networks: + - shenyu + +networks: + shenyu: + name: shenyu + driver: bridge + external: true \ No newline at end of file diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh similarity index 91% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh index 25f37f01d0..2ca65f63dd 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh @@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd") MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos") for sync in ${SYNC_ARRAY[@]}; do echo -e "------------------\n" - kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml - - kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml + kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml + sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:9092/actuator/health sleep 30s echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml" @@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health - kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml - sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health sleep 10s kubectl get pod -o wide kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')" ## run e2e-test - ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http -am test + ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test # shellcheck disable=SC2181 if (($?)); then echo "${sync}-sync-e2e-test failed" @@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml - kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml - kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml + kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml + # shellcheck disable=SC2199 # shellcheck disable=SC2076 if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml new file mode 100644 index 0000000000..9c5c05fad9 --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: zookeeper + namespace: default + labels: + app: zookeeper +spec: + replicas: 1 + selector: + matchLabels: + app: zookeeper + template: + metadata: + labels: + app: zookeeper + spec: + containers: + - name: zookeeper + image: zookeeper:3.7 + ports: + - containerPort: 2181 + +--- +apiVersion: v1 +kind: Service +metadata: + name: zookeeper + namespace: default + labels: + app: zookeeper +spec: + ports: + - port: 2181 + name: client + selector: + app: zookeeper + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka + namespace: default + labels: + app: kafka +spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + containers: + - name: kafka + image: bitnami/kafka:3.6.2 + env: + - name: KAFKA_ADVERTISED_LISTENERS + value: PLAINTEXT://kafka:9092 + - name: KAFKA_ZOOKEEPER_CONNECT + value: zookeeper:2181 + ports: + - containerPort: 9092 + +--- +apiVersion: v1 +kind: Service +metadata: + name: kafka + namespace: default + labels: + app: kafka +spec: + type: NodePort + ports: + - port: 9092 + name: client + protocol: TCP + targetPort: 9092 + nodePort: 9092 + selector: + app: kafka \ No newline at end of file diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml similarity index 86% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml index 2a3141908e..90ca689ce6 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml @@ -24,13 +24,13 @@ <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>shenyu-e2e-case-http</artifactId> + <artifactId>shenyu-e2e-case-logging-kafka</artifactId> <dependencies> <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-client</artifactId> - <version>4.9.3</version> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka-clients.version}</version> </dependency> </dependencies> </project> diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java new file mode 100644 index 0000000000..010c54867f --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.e2e.testcase.logging.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.restassured.http.Method; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider; +import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec; +import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec; +import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec; +import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec; +import org.apache.shenyu.e2e.model.MatchMode; +import org.apache.shenyu.e2e.model.Plugin; +import org.apache.shenyu.e2e.model.data.Condition; +import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists; +import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions; +import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder; +import static org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder; + +public class DividePluginCases implements ShenYuScenarioProvider { + + private static final String TOPIC = "shenyu-access-logging"; + + private static final String TEST = "/http/order/findById?id=123"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final Logger LOG = LoggerFactory.getLogger(DividePluginCases.class); + + @Override + public List<ScenarioSpec> get() { + return Lists.newArrayList( + testDivideHello(), + testKafkaHello() + ); + } + + private ShenYuScenarioSpec testDivideHello() { + return ShenYuScenarioSpec.builder() + .name("http client hello1") + .beforeEachSpec(ShenYuBeforeEachSpec.builder() + .checker(exists(TEST)) + .build()) + .caseSpec(ShenYuCaseSpec.builder() + .addExists(TEST) + .build()) + .build(); + } + + private ShenYuScenarioSpec testKafkaHello() { + return ShenYuScenarioSpec.builder() + .name("testKafkaHello") + .beforeEachSpec( + ShenYuBeforeEachSpec.builder() + .addSelectorAndRule( + newSelectorBuilder("selector", Plugin.LOGGING_KAFKA) + .name("2") + .matchMode(MatchMode.OR) + .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) + .build(), + newRuleBuilder("rule") + .name("2") + .matchMode(MatchMode.OR) + .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) + .build() + ) + .checker(exists(TEST)) + .build() + ) + .caseSpec( + ShenYuCaseSpec.builder() + .add(request -> { + AtomicBoolean messageFound = new AtomicBoolean(false); + try { + // Send request first + request.request(Method.GET, "/http/order/findById?id=23"); + + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "shenyu-consumer-group"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); + properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { + consumer.subscribe(Arrays.asList(TOPIC)); + + Instant start = Instant.now(); + // Set timeout to 30 seconds + while (Duration.between(start, Instant.now()).getSeconds() < 90) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); + LOG.info("records.count:{}", records.count()); + + for (var record : records) { + String message = record.value(); + LOG.info("kafka message:{}", message); + if (message.contains("/http/order/findById")) { + messageFound.set(true); + consumer.commitSync(); + break; + } + } + + if (messageFound.get()) { + break; + } + } + + if (!messageFound.get()) { + LOG.error("Timeout waiting for kafka message"); + Assertions.fail("Did not receive expected message within timeout period"); + } + + Assertions.assertTrue(messageFound.get(), "Expected message was not found in Kafka topic"); + } + } catch (Exception e) { + LOG.error("Error during kafka message consumption", e); + throw new RuntimeException("Failed to consume kafka message", e); + } + }).build() + ).build(); + } +} diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java similarity index 80% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java index 1b18091e7c..ea32a50e4c 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.shenyu.e2e.testcase.http; +package org.apache.shenyu.e2e.testcase.logging.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.apache.shenyu.e2e.client.WaitDataSync; import org.apache.shenyu.e2e.client.admin.AdminClient; @@ -24,12 +25,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient; import org.apache.shenyu.e2e.constant.Constants; import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario; import org.apache.shenyu.e2e.engine.annotation.ShenYuTest; +import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec; import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec; import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec; import org.apache.shenyu.e2e.enums.ServiceTypeEnum; import org.apache.shenyu.e2e.model.ResourcesData; import org.apache.shenyu.e2e.model.data.BindingData; import org.apache.shenyu.e2e.model.response.SelectorDTO; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; @@ -65,7 +68,9 @@ import static org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAM public class DividePluginTest { private static final Logger LOG = LoggerFactory.getLogger(DividePluginTest.class); - + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private List<String> selectorIds = Lists.newArrayList(); @BeforeEach @@ -91,30 +96,35 @@ public class DividePluginTest { spec.getWaiting().waitFor(gateway); } -// @AfterEach -// void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) { -// spec.getDeleter().delete(client, selectorIds); -// spec.deleteWaiting().waitFor(gateway); -// selectorIds = Lists.newArrayList(); -// } + @AfterEach + void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) { + spec.getDeleter().delete(client, selectorIds); + spec.deleteWaiting().waitFor(gateway); + selectorIds = Lists.newArrayList(); + } @BeforeAll void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception { adminClient.login(); + WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient); - LOG.info("start loggingRocketMQ plugin"); + Map<String, String> reqBody = new HashMap<>(); - reqBody.put("pluginId", "29"); - reqBody.put("name", "loggingRocketMQ"); + LOG.info("start loggingKafka plugin"); + reqBody.put("pluginId", "33"); + reqBody.put("name", "loggingKafka"); reqBody.put("enabled", "true"); reqBody.put("role", "Logging"); - reqBody.put("sort", "170"); + reqBody.put("sort", "180"); reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID); - reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}"); - adminClient.changePluginStatus("1801816010882822166", reqBody); - WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq"); + reqBody.put("config", + "{\"topic\":\"shenyu-access-logging\",\"bootstrapServer\":\"shenyu-kafka:29092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}"); + adminClient.changePluginStatus("1801816010882822171", reqBody); +// TimeUnit.SECONDS.sleep(5); +// Map<String, Integer> plugins = gatewayClient.getPlugins(); + WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin"); } @ShenYuScenario(provider = DividePluginCases.class) diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh similarity index 94% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh index a0b102413c..f62d9b33d1 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh @@ -44,7 +44,7 @@ for sync in "${SYNC_ARRAY[@]}"; do sleep 10s docker ps -a ## run e2e-test - ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http -am test + ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq -am test # shellcheck disable=SC2181 if (($?)); then echo "${sync}-sync-e2e-test failed" @@ -58,12 +58,10 @@ for sync in "${SYNC_ARRAY[@]}"; do echo "shenyu-rocketmq log:" echo "------------------" docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs - echo "shenyu-examples-http log:" - echo "------------------" - docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs shenyu-examples-http exit 1 fi docker compose -f "$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down + docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml down docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml " done diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml new file mode 100644 index 0000000000..ce8c8a7e7b --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.9' + +services: + shenyu-examples-http: + image: shenyu-examples-http:latest + container_name: shenyu-examples-http + environment: + - shenyu.register.serverLists=http://shenyu-admin:9095 + ports: + - "31189:8189" + healthcheck: + test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health | grep UP || exit 1" ] + interval: 10s + timeout: 2s + retries: 3 + start_period: 10s + restart: always + networks: + - shenyu + +networks: + shenyu: + name: shenyu + driver: bridge + external: true diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml new file mode 100644 index 0000000000..5f34a739ca --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.9' + +services: + rocketmq-dialevoneid: + image: rocketmqinc/rocketmq:4.4.0 + container_name: rocketmq-dialevoneid + command: [ "/bin/sh", "mqnamesrv" ] + ports: + - "31876:9876" + environment: + - TZ=Asia/Shanghai + restart: always + networks: + - shenyu + + rocketmq-broker: + image: rocketmqinc/rocketmq:4.4.0 + container_name: rocketmq-broker + command: [ "/bin/sh", "mqbroker" ] + ports: + - "10909:10909" + - "10911:10911" + - "10912:10912" + environment: + - NAMESRV_ADDR=rocketmq-dialevoneid:9876 + - TZ=Asia/Shanghai + restart: always + networks: + - shenyu + +networks: + shenyu: + name: shenyu + driver: bridge + external: true \ No newline at end of file diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh similarity index 91% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh index 25f37f01d0..2ca65f63dd 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh @@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd") MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos") for sync in ${SYNC_ARRAY[@]}; do echo -e "------------------\n" - kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml - - kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml + kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml + sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:9092/actuator/health sleep 30s echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml" @@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31095/actuator/health kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31195/actuator/health - kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml - sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh http://localhost:31189/actuator/health sleep 10s kubectl get pod -o wide kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print $1}')" ## run e2e-test - ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http -am test + ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test # shellcheck disable=SC2181 if (($?)); then echo "${sync}-sync-e2e-test failed" @@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml - kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml - kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml + kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml + # shellcheck disable=SC2199 # shellcheck disable=SC2076 if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml new file mode 100644 index 0000000000..94a63c58ae --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: zookeeper + namespace: default + labels: + app: zookeeper +spec: + replicas: 1 + selector: + matchLabels: + app: zookeeper + template: + metadata: + labels: + app: zookeeper + spec: + containers: + - name: zookeeper + image: zookeeper:3.7 + ports: + - containerPort: 2181 + +--- +apiVersion: v1 +kind: Service +metadata: + name: zookeeper + namespace: default + labels: + app: zookeeper +spec: + ports: + - port: 2181 + name: client + selector: + app: zookeeper + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka + namespace: default + labels: + app: kafka +spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + containers: + - name: kafka + image: bitnami/kafka:3.6.2 + env: + - name: KAFKA_ADVERTISED_LISTENERS + value: PLAINTEXT://kafka:9092 + - name: KAFKA_ZOOKEEPER_CONNECT + value: zookeeper:2181 + ports: + - containerPort: 31877 + +--- +apiVersion: v1 +kind: Service +metadata: + name: kafka + namespace: default + labels: + app: kafka +spec: + type: NodePort + ports: + - port: 31877 + name: client + protocol: TCP + targetPort: 31877 + nodePort: 31877 + selector: + app: kafka \ No newline at end of file diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml similarity index 95% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml index 2a3141908e..f9e8eef963 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml @@ -24,8 +24,8 @@ <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>shenyu-e2e-case-http</artifactId> - + <artifactId>shenyu-e2e-case-logging-rocketmq</artifactId> + <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java similarity index 99% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java index d42bfc3aab..bc24271d34 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shenyu.e2e.testcase.http; +package org.apache.shenyu.e2e.testcase.logging.rocketmq; import com.google.common.collect.Lists; import io.restassured.http.Method; diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java similarity index 89% copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java index 1b18091e7c..66f6ee5bbe 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shenyu.e2e.testcase.http; +package org.apache.shenyu.e2e.testcase.logging.rocketmq; import com.google.common.collect.Lists; import org.apache.shenyu.e2e.client.WaitDataSync; @@ -24,12 +24,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient; import org.apache.shenyu.e2e.constant.Constants; import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario; import org.apache.shenyu.e2e.engine.annotation.ShenYuTest; +import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec; import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec; import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec; import org.apache.shenyu.e2e.enums.ServiceTypeEnum; import org.apache.shenyu.e2e.model.ResourcesData; import org.apache.shenyu.e2e.model.data.BindingData; import org.apache.shenyu.e2e.model.response.SelectorDTO; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; @@ -91,19 +93,21 @@ public class DividePluginTest { spec.getWaiting().waitFor(gateway); } -// @AfterEach -// void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) { -// spec.getDeleter().delete(client, selectorIds); -// spec.deleteWaiting().waitFor(gateway); -// selectorIds = Lists.newArrayList(); -// } + @AfterEach + void after(final AdminClient client, final GatewayClient gateway, final AfterEachSpec spec) { + spec.getDeleter().delete(client, selectorIds); + spec.deleteWaiting().waitFor(gateway); + selectorIds = Lists.newArrayList(); + } @BeforeAll void setup(final AdminClient adminClient, final GatewayClient gatewayClient) throws Exception { adminClient.login(); + WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors, gatewayClient::getSelectorCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient); + LOG.info("start loggingRocketMQ plugin"); Map<String, String> reqBody = new HashMap<>(); reqBody.put("pluginId", "29"); @@ -114,7 +118,10 @@ public class DividePluginTest { reqBody.put("namespaceId", Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID); reqBody.put("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}"); adminClient.changePluginStatus("1801816010882822166", reqBody); - WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq"); + Map<String, Integer> plugins = gatewayClient.getPlugins(); + LOG.info("shenyu e2e plugin list ={}", plugins); + WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq.LoggingRocketMQPlugin"); + } @ShenYuScenario(provider = DividePluginCases.class) diff --git a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java index ce3efdd91c..518eb461e4 100644 --- a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java +++ b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java @@ -83,7 +83,7 @@ public class WaitDataSync { Map<String, Integer> pluginMap = gatewayClient.getPlugins(); int retryNum = 0; boolean existPlugin = false; - while (!existPlugin && retryNum < 5) { + while (!existPlugin && retryNum < 10) { for (String plugin : pluginMap.keySet()) { if (plugin.startsWith(pluginClass)) { existPlugin = true; diff --git a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java index 5ffd1a61eb..50afe58dd7 100644 --- a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java +++ b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.restassured.response.Response; import io.restassured.specification.RequestSpecification; +import org.apache.commons.collections4.CollectionUtils; import org.apache.shenyu.e2e.annotation.ShenYuGatewayClient; import org.apache.shenyu.e2e.client.BaseClient; import org.apache.shenyu.e2e.common.RequestLogConsumer; @@ -178,6 +179,9 @@ public class GatewayClient extends BaseClient { List<SelectorCacheData> selectorDataList = new ArrayList<>(); for (Map.Entry entry : s.entrySet()) { List list = (List) entry.getValue(); + if (CollectionUtils.isEmpty(list)) { + continue; + } String json = MAPPER.writeValueAsString(list.get(0)); SelectorCacheData selectorData = MAPPER.readValue(json, SelectorCacheData.class); selectorDataList.add(selectorData); diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java index 6d12c447ff..42c3bf33a4 100644 --- a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java +++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.shenyu.common.utils.GsonUtils; import org.apache.shenyu.common.utils.JsonUtils; import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient; import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData; @@ -65,12 +66,13 @@ public class KafkaLogCollectClient extends AbstractLogConsumeClient<KafkaLogColl */ @Override public void initClient0(@NonNull final KafkaLogCollectConfig.KafkaLogConfig config) { - if (Objects.isNull(config) - || StringUtils.isBlank(config.getBootstrapServer()) - || StringUtils.isBlank(config.getTopic())) { + if (StringUtils.isBlank(config.getBootstrapServer()) || StringUtils.isBlank(config.getTopic())) { LOG.error("kafka props is empty. failed init kafka producer"); return; } + + LOG.info("initClient0:{}", GsonUtils.getInstance().toJson(config)); + String topic = config.getTopic(); String nameserverAddress = config.getBootstrapServer(); @@ -122,11 +124,14 @@ public class KafkaLogCollectClient extends AbstractLogConsumeClient<KafkaLogColl .map(apiConfig -> StringUtils.defaultIfBlank(apiConfig.getTopic(), topic) ).orElse(topic); try { + LOG.info("logTopic:{}, log:{}", logTopic, log); producer.send(toProducerRecord(logTopic, log), (metadata, exception) -> { + LOG.info("kafka push logs metadata:{}", GsonUtils.getInstance().toJson(metadata)); if (Objects.nonNull(exception)) { LOG.error("kafka push logs error", exception); } }); + producer.flush(); } catch (Exception e) { LOG.error("kafka push logs error", e); }