Cool-Coding opened a new issue #7316:
URL: https://github.com/apache/skywalking/issues/7316


   Please answer these questions before submitting your issue.
   
   - Why do you submit this issue?
   - [ ] Question or discussion
   - [x] Bug
   - [ ] Requirement
   - [ ] Feature or performance improvement
   
   ___
   ### Bug
   - Which version of SkyWalking, OS, and JRE?
    skywalking8.6.0,mac,jdk1.8.0_191
   
   - What happened?
   oap creates wrong topic when I write namespace with "yanggy" string value in 
kafka-fetcher item;
   
   Below is my operation steps;
   
   1. give namespace a string value
   ```yaml
   kafka-fetcher:
     selector: ${SW_KAFKA_FETCHER:default}
     default:
       bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
       namespace: ${SW_NAMESPACE:"yanggy"}
       partitions: ${SW_KAFKA_FETCHER_PARTITIONS:2}
       replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
       enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
       enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:false}
       enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:false}
       isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
       consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
       kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
       kafkaHandlerThreadPoolQueueSize: 
${SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}
   ```
   2. start oap server
   3. show topics using kafka-manager
     
![](https://github.com/Cool-Coding/photos/blob/master/skywalking/skywalking-1.png?raw=true)
   
      we can see that oap creates seven redundant topics without "yanggy" 
prefix and four wrong topics with "yanggy" prefix; because the partitions of  
the topic with "yanggy" prefix is wrong; the partition and replica all shoud be 
2;
   
   4. starg a demo appllication with agent
     - config sending data using kafka
     ```yaml
       # Kafka producer configuration
     plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
     # if you want to set namespace. please make sure the OAP server has set it 
in Kafka fetcher module
     plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:yanggy}
     ``` 
    - start demo
     I got an error because of the "yanggy-skywalking-meters" topic doesn't 
exist
    ```java
    ERROR 2021-07-16 16:23:35:343 
SkywalkingAgent-11-org.apache.skywalking.apm.dependencies.kafkaProducerInitThread-0
 KafkaProducerManager : Get KAFKA topic:yanggy-skywalking-meters error. 
   java.util.concurrent.ExecutionException: 
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
 This server does not host this topic-partition.
        at 
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at 
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at 
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
        at 
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
        at 
org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManager.lambda$run$1(KafkaProducerManager.java:109)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManager.run(KafkaProducerManager.java:120)
        at 
org.apache.skywalking.apm.util.RunnableWithExceptionProtection.run(RunnableWithExceptionProtection.java:33)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
   ___
   ### Requirement or improvement
   #### First let's see how oap creates topics.
   ```java
   // the KafkaFetcherHandlerRegister class  77-115 lines
   AdminClient adminClient = AdminClient.create(properties);
           Set<String> missedTopics = 
adminClient.describeTopics(Lists.newArrayList(
               config.getTopicNameOfManagements(),
               config.getTopicNameOfMetrics(),
               config.getTopicNameOfProfiling(),
               config.getTopicNameOfTracingSegments(),
               config.getTopicNameOfMeters(),
               config.getTopicNameOfLogs(),
               config.getTopicNameOfJsonLogs()
           ))
                                                 .values()
                                                 .entrySet()
                                                 .stream()
                                                 .map(entry -> {
                                                     try {
                                                         entry.getValue().get();
                                                         return null;
                                                     } catch 
(InterruptedException | ExecutionException ignore) {
                                                     }
                                                     return entry.getKey();
                                                 })
                                                 .filter(Objects::nonNull)
                                                 .collect(Collectors.toSet());
   
           if (!missedTopics.isEmpty()) {
               log.info("Topics" + missedTopics.toString() + " not exist.");
               List<NewTopic> newTopicList = missedTopics.stream()
                                                         .map(topic -> new 
NewTopic(
                                                             topic,
                                                             
config.getPartitions(),
                                                             (short) 
config.getReplicationFactor()
                                                         
)).collect(Collectors.toList());
   
               try {
                   adminClient.createTopics(newTopicList).all().get();
               } catch (Exception e) {
                   throw new ModuleStartException("Failed to create Kafka 
Topics" + missedTopics + ".", e);
               }
           }
   ```
   
   oap just gets topics without using namespace !
   
   #### Second let's see why there are four topics having "yanggy" prefix
   
   ```
     // the KafkaFetcherHandlerRegister class
      public void start() {
           handlerMap = builder.build();
           if (isSharding) {
               consumer.assign(topicPartitions);
           } else {
               consumer.subscribe(handlerMap.keySet());
           }
           consumer.seekToEnd(consumer.assignment());
           executor.submit(this);
       }
   ```
    When the start method run, consume will subscribe some topics; if the topic 
doesn't exist, it will create with one partition and one replica;
   
   #### Third let's see why agent reports an error
    The reason is obvious,because the "yanggy-skywalking-meters" topic doesn't 
exist;    
     Why doesn't oap create the wrong "yanggy-skywalking-meters" topic?  
     because that the default value of "enableMeterSystem" in application.yml  
is false. 
     So I suggest that we should give default "true" value to 
"enableMeterSystem" in order to keep the logic consistency of creating and 
consuming topic among oap and agent.
   
   #### Fourth let's see how oap and agent get topic name when they produce or 
consume.
      - oap
    ```java
   // the AbstractKafkaHandler class
     public String getTopic() {
           StringBuilder sb = new StringBuilder();
           if (StringUtils.isNotBlank(config.getMm2SourceAlias())) {
               
sb.append(config.getMm2SourceAlias()).append(config.getMm2SourceSeparator());
           }
   
           if (StringUtil.isNotBlank(config.getNamespace())) {
               sb.append(config.getNamespace()).append("-");
           }
           sb.append(getPlainTopic());
           return sb.toString();
       }
   ```
   
     - agent
   
   ```java
   //  the KafkaProducerManager
     String formatTopicNameThenRegister(String topic) {
           String topicName = 
StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic
                   : KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + 
topic;
           topics.add(topicName);
           return topicName;
       }
   ```
   
   We can see the logic of oap and agent isn't consistent; the topics of oap 
depend on "mm2SourceAlias" and "mm2SourceSeparator" besides of namespace and 
origin topic; I can't find the usage of "mm2SourceAlias" and 
"mm2SourceSeparator";
   
   ___
   ### Possiable fix
   If the bug is true, I will make some changes below; Please everyone help me 
review;
   
   #### Move the logic of creating topic in oap from prepare phase to start 
phase.
   
   ```java
   public void start() throws ModuleStartException {
           handlerMap = builder.build();
   
           createTopicIfNeeded(handlerMap.keySet(), properties);
   
           if (isSharding) {
               consumer.assign(topicPartitions);
           } else {
               consumer.subscribe(handlerMap.keySet());
           }
           consumer.seekToEnd(consumer.assignment());
           executor.submit(this);
       }
   
     private void createTopicIfNeeded(Collection<String> topics, Properties 
properties) throws ModuleStartException {
           AdminClient adminClient = AdminClient.create(properties);
           Set<String> missedTopics = adminClient.describeTopics(topics)
                                                 .values()
                                                 .entrySet()
                                                 .stream()
                                                 .map(entry -> {
                                                     try {
                                                         entry.getValue().get();
                                                         return null;
                                                     } catch 
(InterruptedException | ExecutionException ignore) {
                                                     }
                                                     return entry.getKey();
                                                 })
                                                 .filter(Objects::nonNull)
                                                 .collect(Collectors.toSet());
   
           if (!missedTopics.isEmpty()) {
               log.info("Topics" + missedTopics.toString() + " not exist.");
               List<NewTopic> newTopicList = missedTopics.stream()
                                                         .map(topic -> new 
NewTopic(
                                                             topic,
                                                             
config.getPartitions(),
                                                             (short) 
config.getReplicationFactor()
                                                         
)).collect(Collectors.toList());
   
               try {
                   adminClient.createTopics(newTopicList).all().get();
               } catch (Exception e) {
                   throw new ModuleStartException("Failed to create Kafka 
Topics" + missedTopics + ".", e);
               }
           }
       }
   ```
   #### Modify the default value of enableMeterSystem in application.yml
   
   Give default "true" value to "enableMeterSystem" in application.yml; only 
doing this, when using kafka namesapce, agent won't report that the meter topic 
doesn't exist;
   
   ```yaml
   kafka-fetcher:
     selector: ${SW_KAFKA_FETCHER:default}
     default:
       ......
       enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:true}
       .....
   ```
   
   #### Modify the logic of  getting kafka topic name  in oap
   
   ```java
    // the AbstractKafkaHandler class
     public String getTopic() {
           StringBuilder sb = new StringBuilder();
          
           if (StringUtil.isNotBlank(config.getNamespace())) {
               sb.append(config.getNamespace()).append("-");
           }
           sb.append(getPlainTopic());
           return sb.toString();
       }
   ```
   
   #### Delete the mm2SourceAlias and mm2SourceSeparator fields in 
KafkaFetcherConfig class


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to