Actually, every time you consume a message, the offset moves to the beginning of the next message.
Thanks, Jun On Fri, Sep 14, 2012 at 6:52 PM, Felix GV <fe...@mate1inc.com> wrote: > Hello, > > Sorry for doing thread necromancy on this one, but I have a little question > hehe... Can you confirm whether my understanding, below, is correct please? > > 1. Every time I extract a message from a KafkaMessageStream, it sets my > consumer offset to the offset of the beginning of the message I just > extracted. > 2. If I shut down my consumer (and call commitOffsets), then the offset > kept in memory by the KafkaMessageStream will be committed to ZK. > 3. This means that if my program persists the message I just extracted > into a datastore, and then shuts down the consumer before I extract the > next message, then, the next time I start my consumer, I will begin from > the offset of the beginning of that same message, extract it again, and > I > will thus end up persisting a duplicate of that message in my datastore. > > This is what my testing seems to demonstrate... > > I have found a way to account for this behavior in my current use case, but > I wanted to know if the behavior I describe above is normal and intended, > or if perhaps I'm doing something weird that could be causing unexpected > behavior. > > The behavior I describe makes sense for "at least once" guarantees of > delivery. I guess the alternative implementation would have been to set the > consumer offset in ZK to that of the next message, whether the current > iteration succeeds or not, which would have given "at most once" guarantees > (as far as that part of the system is concerned anyway). > > So, yeah, I'd like it if someone could confirm or deny my above > interpretation. > > And also, I have another related question: say that a consumer was to die > without being shutdown gracefully (and without calling commitOffsets), then > would the offset stored in ZK be the one that was put there the last time > the autocommit.interval.ms elapsed, thus causing potentially even more > duplicate messages the next time the consumer is started? (This is assuming > the default settings where autocommit.enable is true.) > > Thanks in advance :) ...! > > -- > Felix > > > > On Thu, Jul 12, 2012 at 10:17 AM, Jun Rao <jun...@gmail.com> wrote: > > > Yes, it knows. The consumer offset is only advanced every time a message > is > > iterated over. > > > > Thanks, > > > > Jun > > > > On Wed, Jul 11, 2012 at 10:03 PM, Vaibhav Puranik <vpura...@gmail.com > > >wrote: > > > > > The inner loop keeps running. If I break it in the middle, is Kafka > > broker > > > going to know that rest of the mesaages in the stream were not > delivered? > > > > > > Regards, > > > Vaibhav > > > GumGum > > > On Jul 11, 2012 5:05 PM, "Vaibhav Puranik" <vpura...@gmail.com> wrote: > > > > > > > Hi all, > > > > > > > > Is there any way to get a fixed amount of messages using Zookeeper > > based > > > > consumer (ConsumerConnector)? > > > > > > > > I know that with SimpleConsumer you can pass fetchSize as an argument > > and > > > > limit the number of messages coming back. > > > > > > > > This sample code creates 4 threads that keep consuming forever. > > > > > > > > > > > > // consume the messages in the threads > > > > for(final KafkaStream<Message> stream: streams) { > > > > executor.submit(new Runnable() { > > > > public void run() { > > > > for(MessageAndMetadata msgAndMetadata: stream) { > > > > // process message (msgAndMetadata.message()) > > > > } > > > > } > > > > }); > > > > } > > > > > > > > Regards, > > > > Vaibhav > > > > > > > > > > > > > > > > > >