[ 
https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13114956#comment-13114956
 ] 

Joel Koshy commented on KAFKA-3:
--------------------------------

I tried various experiments for this - the difficulty lies in bringing the Java
API up to par with the Scala API mainly due to limitations in (or my
understanding of) the interoperability of Java-Scala generics and default
arguments.

The current consumer API is:

    createMessageStreams(topicCountMap: Map[String, Int]): Map[String,
        List[KafkaMessageStream]]

With consumer decoders, we want to be able to return a Map[String,
List[KafkaMessageStream[T]]], and if possible have different types bound to
T in the map. This is doable in Scala, but I don't see how we can support an
equally flexible API in Java.

So I think our alternatives are:

1 - Use type inference, and allow disparity between the Scala and Java APIs
2 - Go with a simpler API that requires an explicit type parameter, but is
    consistent across Java/Scala
3 - Other thoughts?

For the first approach, the API could be:

    createMessageStreams[T](topicCountMap: Map[String, StreamConfig[T]]):
        Map[String, List[KafkaMessageStream[T]]]

There are a couple of issues in getting this to work. Although it works well
on the Scala side, Java is more limited. E.g., if the type parameter is
declared covariant (i.e., +T) with Scala it is possible to mix different
decoder types in a single call to createMessageStreams, but it is not
possible do this cleanly in Java. Anyway, I think this use case would be
rare.  More importantly, the other useful benefit of this approach (default
type if decoder is not specified) is also not available to the Java API,
since Java doesn't understand default arguments. I tried a wrapper factory
object to make StreamConfigs, but ultimately an explicit type and casting is
required on the Java side.

The second approach would be along the following lines:

Consumer connector provides:

    createMessageStreams[T](topicCountMap: Map[String, Int], decoder:
        Decoder[T]): Map[String, List[KafkaMessageStream[T]]]

So pretty much the existing API with the addition of the parameterized type
and the decoder argument.

Scala usage:

// c is a ConsumerConnector

val intStreamMap = c.createMessageStreams(intTopicMap, new IntDecoder)
val floatStreamMap = c.createMessageStreams(floatTopicMap, new FloatDeocder)
val aFloatTopicStreams = floatStreamMap.get("aFloatTopic")

Java usage:

java.util.Map<String, List<KafkaMessageStream<Float>>> floatStreamMap =
    c.createMessageStreams(floatTopicMap, new FloatDecoder());

KafkaMessageStream<Float> aFloatTopicStreams =
    floatStreamMap.get("aFloatTopic");

Thoughts?

                
> Consumer needs a pluggable decoder
> ----------------------------------
>
>                 Key: KAFKA-3
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3
>             Project: Kafka
>          Issue Type: Improvement
>         Attachments: KAFKA-3_v1.patch
>
>
> Kafka producer allows a user to plug in an encoder (from type T to Message). 
> We need to do the same thing on the consumer side, by allowing a user to plug 
> in a decoder (from Message to type T).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to