Eric, With the new proposal, it seems what you want can be achieved by just instantiating N Consumer instances. Then, you can wrap each in a thread and call poll() in it. Will that work for you?
Thanks Jun On Fri, May 16, 2014 at 4:05 PM, Eric Sammer <esam...@scalingdata.com>wrote: > Neha: > > Here's the basic pseudo code of the process acting as the Kafka consumer: > > executor = Executors.newFixedThreadPool(numberOfThreads) > consumer = // get a handle to the broker. > mytopicStreams = consumer.getStreams({ "mytopic" => numberOfThreads > }).get("mytopic") > > for (stream : mytopicStreams) { > executor.submit(() => { > i = 0; > for (message : stream) { > writeMessageToSomething(message); > i++ > if (i % 1000) { > commitSomething() > // I understand I can get dupes as a result of this. > stream.commit() > } > } > }) > } > > You get the idea. I want to be able to indicate that thread N (which has > stream N which is made up of partitions a..z) is in a reasonable state to > commit; that the messages it has consumed are "on disk." Specifically, I > don't want to have to synchronize all threads when I commit, nor do I want > to enumerate the partitions in each stream to commit them individually. > From a library perspective, it's pretty obvious that the stream should > manage the offsets for each underlying partition. > > > > On Thu, May 15, 2014 at 3:09 PM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > >> (p.s. I *really* want per-stream commit baked into the API.) > > > > Assuming that you mean being able to control commit() per partition, then > > yes. This is included. You can see some code > > examples< > > > http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > > >here > > to get a better idea of how that can be used. > > > > However, I'd also like to understand your use case better. Are you trying > > to use the consumer for group management (auto rebalancing) and at the > same > > time, control per partition commit()? In general, before proposing any > > changes, I'd like to understand what you are hoping to achieve with the > > consumer APIs. > > > > Thanks, > > Neha > > > > > > On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <esam...@scalingdata.com> > > wrote: > > > > > All: > > > > > > I've been going over the new consumer APIs and it seems like we're > > > squishing a lot of different concerns together into a single class. The > > > scope of the new Consumer is kind of all over the place. Managing the > > > lifecycle - and especially the thread safety - seems challenging. > > > Specifically, Consumer seems to serve the following purposes: > > > * Acts as a holder of subscription info (e.g. subscribe()). > > > * Acts as a stream (e.g. poll(), seek()). > > > > > > I definitely think we want these to be separate. It's pretty common to > > have > > > a consumer process that connects to the broker, creates N consumer > > threads, > > > each of which working on a single stream (which could be composed of > some > > > number of partitions). In this scenario, you *really* want to > explicitly > > > control durability (e.g. commit()s) on a per-stream basis. You also > have > > > different lifecycle semantics and thread safety concerns at the stream > > > level versus the global level. Is there a reason the API doesn't look > > more > > > like: > > > > > > // Thread safe, owns the multiplexed connection > > > Consumer: > > > def subscribe(topic: String, streams: Int): Set[Stream] > > > def close() // Release everything > > > > > > // Not at all thread safe, no synchronization. > > > Stream: > > > def commit() // Really important this be here and not on Consumer. > > > def seek(...) > > > def poll(duration: Long, unit: TimeUnit): List[MessageThingie] > > > def close() // Release these partitions > > > ... > > > > > > I think this also significantly reduces the complexity of the Consumer > > API > > > and lets each thread in a consumer process handle stream lifecycle > > > appropriately. Since the connection is multiplexed and things could get > > > rebalanced, just toss an exception if the streams become invalid, > > forcing a > > > resubscribe. That way we don't have crazy state logic. > > > > > > I'm sure I'm missing something, but I wanted to toss this out there for > > > folks to poke at. > > > (p.s. I *really* want per-stream commit baked into the API.) > > > -- > > > E. Sammer > > > CTO - ScalingData > > > > > > > > > -- > E. Sammer > CTO - ScalingData >