The newest patch for KAFKA-249 adds a new method to the consumer connector (after feedback on the first patch). Since this is an addition to the consumer API, I wanted to send it around for comments/concerns.
The new method basically allows the consumer connector to subscribe to a whitelist or blacklist of topics. Scala API: /** * Create a list of message streams for all topics that match a given filter. * * @param filterSpec TopicFilterSpec encapsulating a Java-style regex whitelist * or blacklist. * @param numStreams number of KafkaMessageAndTopicStream to create * or blacklist. * @return a list of KafkaMessageAndTopicStream that provide iterator over * messages from allowed topics. */ def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int = 1, decoder: Decoder[T] = new DefaultDecoder) : Seq[KafkaMessageAndTopicStream[T]] TopicFilterSpec can be either a Whitelist or Blacklist. The consumer may now receive messages from multiple topics due to wildcarding. This is why it returns a KafkaMessageAndTopicStream which allows iteration over MessageAndTopic objects. Example: val stream = zkConnector.createMessageStreamsByFilter(new Whitelist("whitetopics.*")).get(0) for (messageAndTopic <- stream) { println("Message from topic %s: %s".format(messageAndTopic.topic, messageAndTopic.message)) } The same method (and default argument variants) is also exposed in the Java API. Feedback/concerns/objections? Thanks, Joel