lucasbru commented on code in PR #19697: URL: https://github.com/apache/kafka/pull/19697#discussion_r2086150911
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -550,12 +551,16 @@ private static MainConsumerSetup setupMainConsumer(final TopologyMetadata topolo ); final ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(); final ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(); + return new MainConsumerSetup( - new AsyncKafkaConsumer<>( - new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(consumerConfigs, keyDeserializer, valueDeserializer)), - keyDeserializer, - valueDeserializer, - streamsRebalanceData + maybeWrapConsumer( Review Comment: I think this approach will break here: https://github.com/apache/kafka/blame/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1108 With the latest changes above, we need to make the extended subscribe method available to streams in the interface returned by the wrapper. Could we introduce a little private interface `StreamsConsumer` that contains all extension methods for streams and let `AsyncKafkaConsumer` implement it, and have the wrapper return a `StreamsConsumer`? This could serve as a basis for a public interface later on in KIP-1088. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org