Chuckame opened a new pull request, #19816: URL: https://github.com/apache/kafka/pull/19816
Currently, building the topology with custom graph nodes fails with a NPE because the GraphNode#buildPriority field is null. We can set this value to an empiric one (like taking the one from the previous node), but the best is to reuse the same method as all the other nodes. --- Problematic code: https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L310 NPE here: https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L327 Because of `buildPriority` being null, only set here currently: https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L269 --- Impact in our codebase: ```diff internal class ReplayableKTable<K, V>( private val replayedTable: KTableImpl<K, *, V>, ) : AbstractStream<K, V>(replayedTable as AbstractStream<K, V>) { fun replayFromInternalTopic(commandTopicPartitions: Int): KTable<K, V> { val replayedTableName = name val replayCommandTopic = "$replayedTableName-replay-command" builder.internalTopologyBuilder().addInternalTopic(replayCommandTopic, InternalTopicProperties(commandTopicPartitions)) val replayTopicSourceNode = <code to make the replay command graph node> val replayNode = ReplayableTableGraphNode( "$replayedTableName-replay", replayTopicSourceNode.nodeName(), replayedTable, replayedTable.valueGetterSupplier(), name, subTopologySourceNodes ) - replayNode.setBuildPriority(replayTopicSourceNode.buildPriority()) - replayTopicSourceNode.addChild(replayNode) - graphNode.addChild(replayNode) + builder.addGraphNode(listOf(replayTopicSourceNode, graphNode), replayNode) return KTableImpl<K, Any, V>( name, keySerde, valueSerde, subTopologySourceNodes + replayTopicSourceNode.nodeName(), replayedTable.queryableStoreName(), replayNode.processor, replayNode, builder ) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org