Cool-Coding opened a new issue #7316:
URL: https://github.com/apache/skywalking/issues/7316
Please answer these questions before submitting your issue.
- Why do you submit this issue?
- [ ] Question or discussion
- [x] Bug
- [ ] Requirement
- [ ] Feature or performance improvement
___
### Bug
- Which version of SkyWalking, OS, and JRE?
skywalking8.6.0,mac,jdk1.8.0_191
- What happened?
oap creates wrong topic when I write namespace with "yanggy" string value in
kafka-fetcher item;
Below is my operation steps;
1. give namespace a string value
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
namespace: ${SW_NAMESPACE:"yanggy"}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:2}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
enableNativeProtoLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_PROTO_LOG:false}
enableNativeJsonLog: ${SW_KAFKA_FETCHER_ENABLE_NATIVE_JSON_LOG:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
kafkaHandlerThreadPoolQueueSize:
${SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}
```
2. start oap server
3. show topics using kafka-manager

we can see that oap creates seven redundant topics without "yanggy"
prefix and four wrong topics with "yanggy" prefix; because the partitions of
the topic with "yanggy" prefix is wrong; the partition and replica all shoud be
2;
4. starg a demo appllication with agent
- config sending data using kafka
```yaml
# Kafka producer configuration
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# if you want to set namespace. please make sure the OAP server has set it
in Kafka fetcher module
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:yanggy}
```
- start demo
I got an error because of the "yanggy-skywalking-meters" topic doesn't
exist
```java
ERROR 2021-07-16 16:23:35:343
SkywalkingAgent-11-org.apache.skywalking.apm.dependencies.kafkaProducerInitThread-0
KafkaProducerManager : Get KAFKA topic:yanggy-skywalking-meters error.
java.util.concurrent.ExecutionException:
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition.
at
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at
org.apache.skywalking.apm.dependencies.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at
org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManager.lambda$run$1(KafkaProducerManager.java:109)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManager.run(KafkaProducerManager.java:120)
at
org.apache.skywalking.apm.util.RunnableWithExceptionProtection.run(RunnableWithExceptionProtection.java:33)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
___
### Requirement or improvement
#### First let's see how oap creates topics.
```java
// the KafkaFetcherHandlerRegister class 77-115 lines
AdminClient adminClient = AdminClient.create(properties);
Set<String> missedTopics =
adminClient.describeTopics(Lists.newArrayList(
config.getTopicNameOfManagements(),
config.getTopicNameOfMetrics(),
config.getTopicNameOfProfiling(),
config.getTopicNameOfTracingSegments(),
config.getTopicNameOfMeters(),
config.getTopicNameOfLogs(),
config.getTopicNameOfJsonLogs()
))
.values()
.entrySet()
.stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch
(InterruptedException | ExecutionException ignore) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new
NewTopic(
topic,
config.getPartitions(),
(short)
config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka
Topics" + missedTopics + ".", e);
}
}
```
oap just gets topics without using namespace !
#### Second let's see why there are four topics having "yanggy" prefix
```
// the KafkaFetcherHandlerRegister class
public void start() {
handlerMap = builder.build();
if (isSharding) {
consumer.assign(topicPartitions);
} else {
consumer.subscribe(handlerMap.keySet());
}
consumer.seekToEnd(consumer.assignment());
executor.submit(this);
}
```
When the start method run, consume will subscribe some topics; if the topic
doesn't exist, it will create with one partition and one replica;
#### Third let's see why agent reports an error
The reason is obvious,because the "yanggy-skywalking-meters" topic doesn't
exist;
Why doesn't oap create the wrong "yanggy-skywalking-meters" topic?
because that the default value of "enableMeterSystem" in application.yml
is false.
So I suggest that we should give default "true" value to
"enableMeterSystem" in order to keep the logic consistency of creating and
consuming topic among oap and agent.
#### Fourth let's see how oap and agent get topic name when they produce or
consume.
- oap
```java
// the AbstractKafkaHandler class
public String getTopic() {
StringBuilder sb = new StringBuilder();
if (StringUtils.isNotBlank(config.getMm2SourceAlias())) {
sb.append(config.getMm2SourceAlias()).append(config.getMm2SourceSeparator());
}
if (StringUtil.isNotBlank(config.getNamespace())) {
sb.append(config.getNamespace()).append("-");
}
sb.append(getPlainTopic());
return sb.toString();
}
```
- agent
```java
// the KafkaProducerManager
String formatTopicNameThenRegister(String topic) {
String topicName =
StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic
: KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" +
topic;
topics.add(topicName);
return topicName;
}
```
We can see the logic of oap and agent isn't consistent; the topics of oap
depend on "mm2SourceAlias" and "mm2SourceSeparator" besides of namespace and
origin topic; I can't find the usage of "mm2SourceAlias" and
"mm2SourceSeparator";
___
### Possiable fix
If the bug is true, I will make some changes below; Please everyone help me
review;
#### Move the logic of creating topic in oap from prepare phase to start
phase.
```java
public void start() throws ModuleStartException {
handlerMap = builder.build();
createTopicIfNeeded(handlerMap.keySet(), properties);
if (isSharding) {
consumer.assign(topicPartitions);
} else {
consumer.subscribe(handlerMap.keySet());
}
consumer.seekToEnd(consumer.assignment());
executor.submit(this);
}
private void createTopicIfNeeded(Collection<String> topics, Properties
properties) throws ModuleStartException {
AdminClient adminClient = AdminClient.create(properties);
Set<String> missedTopics = adminClient.describeTopics(topics)
.values()
.entrySet()
.stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch
(InterruptedException | ExecutionException ignore) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new
NewTopic(
topic,
config.getPartitions(),
(short)
config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka
Topics" + missedTopics + ".", e);
}
}
}
```
#### Modify the default value of enableMeterSystem in application.yml
Give default "true" value to "enableMeterSystem" in application.yml; only
doing this, when using kafka namesapce, agent won't report that the meter topic
doesn't exist;
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
......
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:true}
.....
```
#### Modify the logic of getting kafka topic name in oap
```java
// the AbstractKafkaHandler class
public String getTopic() {
StringBuilder sb = new StringBuilder();
if (StringUtil.isNotBlank(config.getNamespace())) {
sb.append(config.getNamespace()).append("-");
}
sb.append(getPlainTopic());
return sb.toString();
}
```
#### Delete the mm2SourceAlias and mm2SourceSeparator fields in
KafkaFetcherConfig class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]