>> (p.s. I *really* want per-stream commit baked into the API.) Assuming that you mean being able to control commit() per partition, then yes. This is included. You can see some code examples<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html>here to get a better idea of how that can be used.
However, I'd also like to understand your use case better. Are you trying to use the consumer for group management (auto rebalancing) and at the same time, control per partition commit()? In general, before proposing any changes, I'd like to understand what you are hoping to achieve with the consumer APIs. Thanks, Neha On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <esam...@scalingdata.com> wrote: > All: > > I've been going over the new consumer APIs and it seems like we're > squishing a lot of different concerns together into a single class. The > scope of the new Consumer is kind of all over the place. Managing the > lifecycle - and especially the thread safety - seems challenging. > Specifically, Consumer seems to serve the following purposes: > * Acts as a holder of subscription info (e.g. subscribe()). > * Acts as a stream (e.g. poll(), seek()). > > I definitely think we want these to be separate. It's pretty common to have > a consumer process that connects to the broker, creates N consumer threads, > each of which working on a single stream (which could be composed of some > number of partitions). In this scenario, you *really* want to explicitly > control durability (e.g. commit()s) on a per-stream basis. You also have > different lifecycle semantics and thread safety concerns at the stream > level versus the global level. Is there a reason the API doesn't look more > like: > > // Thread safe, owns the multiplexed connection > Consumer: > def subscribe(topic: String, streams: Int): Set[Stream] > def close() // Release everything > > // Not at all thread safe, no synchronization. > Stream: > def commit() // Really important this be here and not on Consumer. > def seek(...) > def poll(duration: Long, unit: TimeUnit): List[MessageThingie] > def close() // Release these partitions > ... > > I think this also significantly reduces the complexity of the Consumer API > and lets each thread in a consumer process handle stream lifecycle > appropriately. Since the connection is multiplexed and things could get > rebalanced, just toss an exception if the streams become invalid, forcing a > resubscribe. That way we don't have crazy state logic. > > I'm sure I'm missing something, but I wanted to toss this out there for > folks to poke at. > (p.s. I *really* want per-stream commit baked into the API.) > -- > E. Sammer > CTO - ScalingData >