[ 
https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13109245#comment-13109245
 ] 

Jay Kreps commented on KAFKA-3:
-------------------------------

One way to do this would be to make use of type inference off the method 
arguments. For example, you could make the api be
val firstStreamConfig = new StreamConfig[String](topic = "my_topic", 
parallelism = 5, decoder=new StringDecoder())
val secondStreamConfig = new StreamConfig[Integer](topic = "your_topic", 
parallelism = 5, decoder=new IntDecoder())
val streamSet: StreamSet = consumer.createMessageStreams(firstStreamConfig, 
secondStreamConfig)
val streamOne: List[KafkaMessageStream[String]] = 
streamSet.get(firstStreamConfig)

Here the type of the iterator is actually inferred from the parametrization on 
the config object passed in. This trick is used a lot in java APIs to help pass 
through type parameters. The other advantage is that it gives us a general 
per-topic config object. Currently we don't have a very clean way to do 
per-topic config.

Another way to do this would be to ask if there is a way to just make the 
stream creation be one at a time without causing a "rebalance storm" for the 
clients. In that case you could do 
val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig)
val streamsA = consumer.createMessageStream("topic_a", decoder, parallelism)
val streamsB = consumer.createMessageStream("topic_a", decoder, parallelism)
To my eyes this later approach is much more friendly, especially in the common 
case where you just have one topic to consume from. I think the reason we 
didn't do this was to avoid having a sequence of these cause a ton of 
rebalancing activity. I wonder if there is a direct fix for that.

> Consumer needs a pluggable decoder
> ----------------------------------
>
>                 Key: KAFKA-3
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3
>             Project: Kafka
>          Issue Type: Improvement
>         Attachments: KAFKA-3_v1.patch
>
>
> Kafka producer allows a user to plug in an encoder (from type T to Message). 
> We need to do the same thing on the consumer side, by allowing a user to plug 
> in a decoder (from Message to type T).

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to