This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 053c45959 Update the nameaddress and topic configuration in 
logging-kafka (#3850)
053c45959 is described below

commit 053c459596cf4fd59a9113ca033f6f277a5bc875
Author: qifanyyy <[email protected]>
AuthorDate: Mon Aug 29 03:29:58 2022 -0400

    Update the nameaddress and topic configuration in logging-kafka (#3850)
    
    * update docker compose and port from 8082 to 9092
    
    * Integration test update docker compose
    
    * update the nameaddress and topic configuration in logging-kafka
    
    * reformat logging-kafka
---
 .../common/constant/GenericLoggingConstant.java     |  2 +-
 .../logging/kafka/client/KafkaLogCollectClient.java | 21 ++++++++++-----------
 .../handler/LoggingKafkaPluginDataHandler.java      | 16 ++++++----------
 3 files changed, 17 insertions(+), 22 deletions(-)

diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
index 83f3d86d2..1204a6250 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
@@ -70,7 +70,7 @@ public class GenericLoggingConstant {
     /**
      * aliyun sls topic.
      */
-    public static final String TOPIC = "Topic";
+    public static final String TOPIC = "topic";
     
     /**
      * send thread config.
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index f724a2674..880923ad0 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -17,6 +17,13 @@
 
 package org.apache.shenyu.plugin.logging.kafka.client;
 
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.collections4.CollectionUtils;
@@ -28,9 +35,9 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.shenyu.common.utils.JsonUtils;
 import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
-import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
 import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
 import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
 import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
@@ -38,14 +45,6 @@ import 
org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * queue-based logging collector.
  */
@@ -74,7 +73,7 @@ public class KafkaLogCollectClient implements 
LogConsumeClient {
         if (isStarted.get()) {
             close();
         }
-        String topic = props.getProperty(GenericLoggingConstant.TOPIC);
+        String topic = "shenyu-access-logging";
         String nameserverAddress = props.getProperty("bootstrap.servers");
         if (StringUtils.isBlank(topic) || 
StringUtils.isBlank(nameserverAddress)) {
             LOG.error("init kafkaLogCollectClient error, please check topic or 
nameserverAddress");
@@ -82,7 +81,7 @@ public class KafkaLogCollectClient implements 
LogConsumeClient {
         }
         this.topic = topic;
         producer = new KafkaProducer<>(props);
-        ProducerRecord<String, String> record = new 
ProducerRecord<>(this.topic, "shenyu-access-logging");
+        ProducerRecord<String, String> record = new 
ProducerRecord<>("shenyu-access-logging", StringSerializer.class.getName(), 
StringSerializer.class.getName());
         try {
             producer.send(record);
             LOG.info("init kafkaLogCollectClient success");
diff --git 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
index 829316a3d..73187e545 100644
--- 
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -17,6 +17,11 @@
 
 package org.apache.shenyu.plugin.logging.kafka.handler;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -28,19 +33,12 @@ import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.enums.SelectorTypeEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
-import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
 import org.apache.shenyu.plugin.logging.kafka.collector.KafkaLogCollector;
 import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
-import org.apache.shenyu.plugin.logging.kafka.client.KafkaLogCollectClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * The type logging kafka plugin data handler.
  */
@@ -99,8 +97,6 @@ public class LoggingKafkaPluginDataHandler implements 
PluginDataHandler {
             properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
             properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
             properties.put("bootstrap.servers", 
globalLogConfig.getNamesrvAddr());
-            properties.put(GenericLoggingConstant.TOPIC, 
globalLogConfig.getTopic());
-            properties.put(GenericLoggingConstant.NAMESERVER_ADDRESS, 
globalLogConfig.getTopic());
             KAFKA_LOG_COLLECT_CLIENT.initProducer(properties);
             KafkaLogCollector.getInstance().start();
         } else {

Reply via email to