This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 053c45959 Update the nameaddress and topic configuration in
logging-kafka (#3850)
053c45959 is described below
commit 053c459596cf4fd59a9113ca033f6f277a5bc875
Author: qifanyyy <[email protected]>
AuthorDate: Mon Aug 29 03:29:58 2022 -0400
Update the nameaddress and topic configuration in logging-kafka (#3850)
* update docker compose and port from 8082 to 9092
* Integration test update docker compose
* update the nameaddress and topic configuration in logging-kafka
* reformat logging-kafka
---
.../common/constant/GenericLoggingConstant.java | 2 +-
.../logging/kafka/client/KafkaLogCollectClient.java | 21 ++++++++++-----------
.../handler/LoggingKafkaPluginDataHandler.java | 16 ++++++----------
3 files changed, 17 insertions(+), 22 deletions(-)
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
index 83f3d86d2..1204a6250 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
@@ -70,7 +70,7 @@ public class GenericLoggingConstant {
/**
* aliyun sls topic.
*/
- public static final String TOPIC = "Topic";
+ public static final String TOPIC = "topic";
/**
* send thread config.
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index f724a2674..880923ad0 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -17,6 +17,13 @@
package org.apache.shenyu.plugin.logging.kafka.client;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.collections4.CollectionUtils;
@@ -28,9 +35,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
-import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
@@ -38,14 +45,6 @@ import
org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* queue-based logging collector.
*/
@@ -74,7 +73,7 @@ public class KafkaLogCollectClient implements
LogConsumeClient {
if (isStarted.get()) {
close();
}
- String topic = props.getProperty(GenericLoggingConstant.TOPIC);
+ String topic = "shenyu-access-logging";
String nameserverAddress = props.getProperty("bootstrap.servers");
if (StringUtils.isBlank(topic) ||
StringUtils.isBlank(nameserverAddress)) {
LOG.error("init kafkaLogCollectClient error, please check topic or
nameserverAddress");
@@ -82,7 +81,7 @@ public class KafkaLogCollectClient implements
LogConsumeClient {
}
this.topic = topic;
producer = new KafkaProducer<>(props);
- ProducerRecord<String, String> record = new
ProducerRecord<>(this.topic, "shenyu-access-logging");
+ ProducerRecord<String, String> record = new
ProducerRecord<>("shenyu-access-logging", StringSerializer.class.getName(),
StringSerializer.class.getName());
try {
producer.send(record);
LOG.info("init kafkaLogCollectClient success");
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
index 829316a3d..73187e545 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -17,6 +17,11 @@
package org.apache.shenyu.plugin.logging.kafka.handler;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -28,19 +33,12 @@ import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.enums.SelectorTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
-import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* The type logging kafka plugin data handler.
*/
@@ -99,8 +97,6 @@ public class LoggingKafkaPluginDataHandler implements
PluginDataHandler {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put("bootstrap.servers",
globalLogConfig.getNamesrvAddr());
- properties.put(GenericLoggingConstant.TOPIC,
globalLogConfig.getTopic());
- properties.put(GenericLoggingConstant.NAMESERVER_ADDRESS,
globalLogConfig.getTopic());
KAFKA_LOG_COLLECT_CLIENT.initProducer(properties);
KafkaLogCollector.getInstance().start();
} else {