Hi all, This is a follow-up to the email I had sent out a few days ago on the consumer API extension as part of KAFKA-249 - after the code review, a more major API change may be more suitable, so here is an overview.
The new method in the consumer connector that supports wildcarding (in the v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects. There were a couple of comments on this: - It is (somewhat oddly) different from the existing API (createMessageStreams which returns a map containing KafkaMessageStream[T] objects) - We already have a MessageAndOffset class, and at some point we may want to give consumers access to logical partition/offset information. So this would be an opportunity to fix the consumer API to accomodate a more general consumer stream and iterator API, that provide access to MessageAndMetadata elements, each of which contains the message + metadata (such as topic, offset, partition, etc.) So I have incorporated this in a new patch (which I will upload soon after I address all the other review comments), and I wanted to share the API changes here since it is a more significant change that would require users of the consumer and iterator to update their code. -------------------------------------------------------------------------------- Proposal for the new ConsumerConnector API: /** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair * @param decoder Decoder to decode each Message to type T * @return a map of (topic, list of KafkaMessageAndMetadataStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) : Map[String,List[KafkaMessageAndMetadataStream[T]]] /** * Create a list of message streams for all topics that match a given filter. * * @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object. * @param numStreams Number of streams to return * @param decoder Decoder to decode each Message to type T * @return a list of KafkaMessageAndMetadataStream each of which provides an * iterator over message/metadata pairs over allowed topics. */ def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int = 1, decoder: Decoder[T] = new DefaultDecoder) : Seq[KafkaMessageAndMetadataStream[T]] -------------------------------------------------------------------------------- The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T] which is an iterator over MessageAndMetadata[T] objects: case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L) Although the MessageAndMetadata class is simple, it also needs to be evolved carefully - i.e., adding fields is easy, but removing fields would effectively break older clients at compile time). I think it would be better to avoid schemas and/or explicit versioning since that would make writing the client-side code more difficult. -------------------------------------------------------------------------------- This means the current pattern of: for (message <- stream) { // process(message) } will change to: for (msgAndMetadata <- stream) { // processMessage(msgAndMetadata.message) // can also access msgAndMetadata.offset, topic, etc. if appropriate } -------------------------------------------------------------------------------- Would love to get any thoughts on this. Given that this is an API change that would require code changes for consumers, I wanted to send this around for comments/objections before proceeding further. Thanks, Joel