Tim,

I'm going to ask you the same question :-)

By "per stream commit", do you mean a per partition commit like this API -

public OffsetMetadata commit(Map<TopicPartition, Long> offsets);

This API allows the consumer to commit the specified offsets only for
selected partitions.

Thanks,
Neha


On Thu, May 15, 2014 at 8:42 AM, Timothy Chen <tnac...@gmail.com> wrote:

> Also going to add that I know a per stream commit is a strong requirement
> for folks I know using Kafka, and seen custom code done just to do so.
>
> Tim
>
> > On May 9, 2014, at 1:19 PM, Eric Sammer <esam...@scalingdata.com> wrote:
> >
> > All:
> >
> > I've been going over the new consumer APIs and it seems like we're
> > squishing a lot of different concerns together into a single class. The
> > scope of the new Consumer is kind of all over the place. Managing the
> > lifecycle - and especially the thread safety - seems challenging.
> > Specifically, Consumer seems to serve the following purposes:
> > * Acts as a holder of subscription info (e.g. subscribe()).
> > * Acts as a stream (e.g. poll(), seek()).
> >
> > I definitely think we want these to be separate. It's pretty common to
> have
> > a consumer process that connects to the broker, creates N consumer
> threads,
> > each of which working on a single stream (which could be composed of some
> > number of partitions). In this scenario, you *really* want to explicitly
> > control durability (e.g. commit()s) on a per-stream basis. You also have
> > different lifecycle semantics and thread safety concerns at the stream
> > level versus the global level. Is there a reason the API doesn't look
> more
> > like:
> >
> > // Thread safe, owns the multiplexed connection
> > Consumer:
> >  def subscribe(topic: String, streams: Int): Set[Stream]
> >  def close() // Release everything
> >
> > // Not at all thread safe, no synchronization.
> > Stream:
> >  def commit() // Really important this be here and not on Consumer.
> >  def seek(...)
> >  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
> >  def close() // Release these partitions
> >  ...
> >
> > I think this also significantly reduces the complexity of the Consumer
> API
> > and lets each thread in a consumer process handle stream lifecycle
> > appropriately. Since the connection is multiplexed and things could get
> > rebalanced, just toss an exception if the streams become invalid,
> forcing a
> > resubscribe. That way we don't have crazy state logic.
> >
> > I'm sure I'm missing something, but I wanted to toss this out there for
> > folks to poke at.
> > (p.s. I *really* want per-stream commit baked into the API.)
> > --
> > E. Sammer
> > CTO - ScalingData
>

Reply via email to