[ https://issues.apache.org/jira/browse/KAFKA-12914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-12914: ------------------------------------ Fix Version/s: 2.8.1 > StreamSourceNode.toString() throws with StreamsBuilder.stream(Pattern) ctor > --------------------------------------------------------------------------- > > Key: KAFKA-12914 > URL: https://issues.apache.org/jira/browse/KAFKA-12914 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Will Bartlett > Assignee: Matthias J. Sax > Priority: Minor > Fix For: 3.0.0, 2.8.1 > > > Hi, > I came across what looks like a bug. > h2. Repro > {code:java} > import org.apache.kafka.common.serialization.Serdes > import org.apache.kafka.streams.KafkaStreams > import org.apache.kafka.streams.StreamsBuilder > import org.apache.kafka.streams.kstream.Consumed > import java.util.* > import java.util.regex.Pattern > fun main() { > val builder = StreamsBuilder() > builder.stream(Pattern.compile("foo"), Consumed.with(Serdes.Long(), > Serdes.Long())) > val streams = KafkaStreams(builder.build(), Properties()) > streams.start() > } > {code} > {code:bash} > SLF4J: Failed toString() invocation on an object of type > [java.util.LinkedHashSet] > Reported exception: > java.lang.NullPointerException > at > java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030) > at > java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132) > at > java.base/java.util.Collections.unmodifiableSet(Collections.java:1122) > at > org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55) > at > org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65) > at java.base/java.lang.String.valueOf(String.java:3352) > at java.base/java.lang.StringBuilder.append(StringBuilder.java:166) > at > java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) > at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) > at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at > ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414) > at ch.qos.logback.classic.Logger.debug(Logger.java:490) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613) > at ApplicationKt.main(Application.kt:11) > at ApplicationKt.main(Application.kt) > SLF4J: Failed toString() invocation on an object of type > [org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode] > Reported exception: > java.lang.NullPointerException > at > java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030) > at > java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132) > at > java.base/java.util.Collections.unmodifiableSet(Collections.java:1122) > at > org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55) > at > org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65) > at > org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) > at > org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) > at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) > at > org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) > at > ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) > at > ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) > at > ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) > at > ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) > at > ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) > at > ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) > at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) > at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) > at > ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) > at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414) > at ch.qos.logback.classic.Logger.debug(Logger.java:490) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613) > at ApplicationKt.main(Application.kt:11) > at ApplicationKt.main(Application.kt) > {code} > Kotlin but you get the idea. > h2. Brief debug > StreamSourceNode.toString() calls topicNames(): > [https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java#L64-L70] > Which will be null if constructed with a Pattern rather than a String: > [https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java#L49] > Which causes UnmodifiableCollection to throw. > -- This message was sent by Atlassian Jira (v8.3.4#803005)