Prashant, One of the trade-off between sync and async producers is performance v.s. ordering. To guanrantee the scenario you described one has to use a sync produer then, since async producer is not designed for such guarantees.
Guozhang On Thu, Jul 10, 2014 at 11:03 PM, Prashant Prakash <prash.i...@gmail.com> wrote: > Hi Gouzhang, > > Monitoring through JMX mbean will be an indirect way to detect producer > failure. > > In our requirement we want to send messages in a pre-defined sequence. At > no point we want to any message out of order at consumer. > In case of failure we replay the entire sequence. We dedupe messages at > consumer to remove duplicates. > > On failure to send message the async producer only logs error and keeps > sending the message in buffer. This causes message out of order at consumer > and eventually breaks our application logic. Therefore we need a way of > direct notification on producer failure. As mentioned in my previous mail, > callback can be one way in which we can generate alert and shutdown the > producer. > > Currently we are sending a unique incremental sequence id per partition in > the message body to detect any message out of order at consumer end. > > Regards > Prashant > > > On Fri, Jul 11, 2014 at 4:16 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Prashant, > > > > You can use on the producer failure sensors to do the monitoring. > > > > http://kafka.apache.org/documentation.html#monitoring > > > > Guozhang > > > > > > On Thu, Jul 10, 2014 at 6:11 AM, Prashant Prakash <prash.i...@gmail.com> > > wrote: > > > > > Dear Kafka Users, > > > > > > We are using kafka 0.8.0 is our application development. To keep > message > > > delivery reliable we want to detect any failure while sending message. > To > > > get high throughput we are using async producer. > > > > > > As of kafka 0.8.0 async producer implementation, failure to send to > > message > > > is logged but not thrown back to producer client. > > > > > > Snippet from ProducerSendThread - > > > > > > def tryToHandle(events: Seq[KeyedMessage[K,V]]) { > > > val size = events.size > > > try { > > > debug("Handling " + size + " events") > > > if(size > 0) > > > handler.handle(events) > > > }catch { > > > case e: Throwable => error("Error in handling batch of " + size > + " > > > events", e) > > > } > > > } > > > > > > Is there any better solution to monitor the async send failure ? > > > > > > Few of the solution we came up are : > > > 1. Create message buffer in our application logic. Instantiate > > configurable > > > pool of sync producer which reads and send message from buffer. (Sync > > > producer throws back exception to the caller class) > > > 2. Extend DefaultEventHandler and pass an exception handler callback. > On > > > encountering exception the callback method -handleException() will be > > > called notifying application code of any failure. > > > > > > Please share views, solutions which you have used to tack async send > > > failure. > > > > > > Thank you very much. > > > > > > Best regards > > > Prashant > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang