[ https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13109245#comment-13109245 ]
Jay Kreps commented on KAFKA-3: ------------------------------- One way to do this would be to make use of type inference off the method arguments. For example, you could make the api be val firstStreamConfig = new StreamConfig[String](topic = "my_topic", parallelism = 5, decoder=new StringDecoder()) val secondStreamConfig = new StreamConfig[Integer](topic = "your_topic", parallelism = 5, decoder=new IntDecoder()) val streamSet: StreamSet = consumer.createMessageStreams(firstStreamConfig, secondStreamConfig) val streamOne: List[KafkaMessageStream[String]] = streamSet.get(firstStreamConfig) Here the type of the iterator is actually inferred from the parametrization on the config object passed in. This trick is used a lot in java APIs to help pass through type parameters. The other advantage is that it gives us a general per-topic config object. Currently we don't have a very clean way to do per-topic config. Another way to do this would be to ask if there is a way to just make the stream creation be one at a time without causing a "rebalance storm" for the clients. In that case you could do val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig) val streamsA = consumer.createMessageStream("topic_a", decoder, parallelism) val streamsB = consumer.createMessageStream("topic_a", decoder, parallelism) To my eyes this later approach is much more friendly, especially in the common case where you just have one topic to consume from. I think the reason we didn't do this was to avoid having a sequence of these cause a ton of rebalancing activity. I wonder if there is a direct fix for that. > Consumer needs a pluggable decoder > ---------------------------------- > > Key: KAFKA-3 > URL: https://issues.apache.org/jira/browse/KAFKA-3 > Project: Kafka > Issue Type: Improvement > Attachments: KAFKA-3_v1.patch > > > Kafka producer allows a user to plug in an encoder (from type T to Message). > We need to do the same thing on the consumer side, by allowing a user to plug > in a decoder (from Message to type T). -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira