[ https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13114956#comment-13114956 ]
Joel Koshy commented on KAFKA-3: -------------------------------- I tried various experiments for this - the difficulty lies in bringing the Java API up to par with the Scala API mainly due to limitations in (or my understanding of) the interoperability of Java-Scala generics and default arguments. The current consumer API is: createMessageStreams(topicCountMap: Map[String, Int]): Map[String, List[KafkaMessageStream]] With consumer decoders, we want to be able to return a Map[String, List[KafkaMessageStream[T]]], and if possible have different types bound to T in the map. This is doable in Scala, but I don't see how we can support an equally flexible API in Java. So I think our alternatives are: 1 - Use type inference, and allow disparity between the Scala and Java APIs 2 - Go with a simpler API that requires an explicit type parameter, but is consistent across Java/Scala 3 - Other thoughts? For the first approach, the API could be: createMessageStreams[T](topicCountMap: Map[String, StreamConfig[T]]): Map[String, List[KafkaMessageStream[T]]] There are a couple of issues in getting this to work. Although it works well on the Scala side, Java is more limited. E.g., if the type parameter is declared covariant (i.e., +T) with Scala it is possible to mix different decoder types in a single call to createMessageStreams, but it is not possible do this cleanly in Java. Anyway, I think this use case would be rare. More importantly, the other useful benefit of this approach (default type if decoder is not specified) is also not available to the Java API, since Java doesn't understand default arguments. I tried a wrapper factory object to make StreamConfigs, but ultimately an explicit type and casting is required on the Java side. The second approach would be along the following lines: Consumer connector provides: createMessageStreams[T](topicCountMap: Map[String, Int], decoder: Decoder[T]): Map[String, List[KafkaMessageStream[T]]] So pretty much the existing API with the addition of the parameterized type and the decoder argument. Scala usage: // c is a ConsumerConnector val intStreamMap = c.createMessageStreams(intTopicMap, new IntDecoder) val floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDeocder) val aFloatTopicStreams = floatStreamMap.get("aFloatTopic") Java usage: java.util.Map<String, List<KafkaMessageStream<Float>>> floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDecoder()); KafkaMessageStream<Float> aFloatTopicStreams = floatStreamMap.get("aFloatTopic"); Thoughts? > Consumer needs a pluggable decoder > ---------------------------------- > > Key: KAFKA-3 > URL: https://issues.apache.org/jira/browse/KAFKA-3 > Project: Kafka > Issue Type: Improvement > Attachments: KAFKA-3_v1.patch > > > Kafka producer allows a user to plug in an encoder (from type T to Message). > We need to do the same thing on the consumer side, by allowing a user to plug > in a decoder (from Message to type T). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira