Hi Neha, Yes a way that allows each partition to be committed seperately.
Couldn't remember if the new consumer allows it, but looks like it does! Tim On Fri, May 16, 2014 at 9:37 AM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > Tim, > > I'm going to ask you the same question :-) > > By "per stream commit", do you mean a per partition commit like this API - > > public OffsetMetadata commit(Map<TopicPartition, Long> offsets); > > This API allows the consumer to commit the specified offsets only for > selected partitions. > > Thanks, > Neha > > > On Thu, May 15, 2014 at 8:42 AM, Timothy Chen <tnac...@gmail.com> wrote: > >> Also going to add that I know a per stream commit is a strong requirement >> for folks I know using Kafka, and seen custom code done just to do so. >> >> Tim >> >> > On May 9, 2014, at 1:19 PM, Eric Sammer <esam...@scalingdata.com> wrote: >> > >> > All: >> > >> > I've been going over the new consumer APIs and it seems like we're >> > squishing a lot of different concerns together into a single class. The >> > scope of the new Consumer is kind of all over the place. Managing the >> > lifecycle - and especially the thread safety - seems challenging. >> > Specifically, Consumer seems to serve the following purposes: >> > * Acts as a holder of subscription info (e.g. subscribe()). >> > * Acts as a stream (e.g. poll(), seek()). >> > >> > I definitely think we want these to be separate. It's pretty common to >> have >> > a consumer process that connects to the broker, creates N consumer >> threads, >> > each of which working on a single stream (which could be composed of some >> > number of partitions). In this scenario, you *really* want to explicitly >> > control durability (e.g. commit()s) on a per-stream basis. You also have >> > different lifecycle semantics and thread safety concerns at the stream >> > level versus the global level. Is there a reason the API doesn't look >> more >> > like: >> > >> > // Thread safe, owns the multiplexed connection >> > Consumer: >> > def subscribe(topic: String, streams: Int): Set[Stream] >> > def close() // Release everything >> > >> > // Not at all thread safe, no synchronization. >> > Stream: >> > def commit() // Really important this be here and not on Consumer. >> > def seek(...) >> > def poll(duration: Long, unit: TimeUnit): List[MessageThingie] >> > def close() // Release these partitions >> > ... >> > >> > I think this also significantly reduces the complexity of the Consumer >> API >> > and lets each thread in a consumer process handle stream lifecycle >> > appropriately. Since the connection is multiplexed and things could get >> > rebalanced, just toss an exception if the streams become invalid, >> forcing a >> > resubscribe. That way we don't have crazy state logic. >> > >> > I'm sure I'm missing something, but I wanted to toss this out there for >> > folks to poke at. >> > (p.s. I *really* want per-stream commit baked into the API.) >> > -- >> > E. Sammer >> > CTO - ScalingData >>