lucasbru commented on code in PR #19697:
URL: https://github.com/apache/kafka/pull/19697#discussion_r2086150911


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -550,12 +551,16 @@ private static MainConsumerSetup setupMainConsumer(final 
TopologyMetadata topolo
             );
             final ByteArrayDeserializer keyDeserializer = new 
ByteArrayDeserializer();
             final ByteArrayDeserializer valueDeserializer = new 
ByteArrayDeserializer();
+
             return new MainConsumerSetup(
-                new AsyncKafkaConsumer<>(
-                    new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(consumerConfigs, 
keyDeserializer, valueDeserializer)),
-                    keyDeserializer,
-                    valueDeserializer,
-                    streamsRebalanceData
+                maybeWrapConsumer(

Review Comment:
   I think this approach will break here:
   
   
https://github.com/apache/kafka/blame/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1108
   
   With the latest changes above, we need to make the extended subscribe method 
available to streams in the interface returned by the wrapper. 
   
   Could we introduce a little private interface `StreamsConsumer` that 
contains all extension methods for streams and let `AsyncKafkaConsumer` 
implement it, and have the wrapper return a `StreamsConsumer`? 
   
   This could serve as a basis for a public interface later on in KIP-1088.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to