@Asaf
Do I need to raise new bug for this? @Rajini Please suggest some the configuration with which retries should work according to you. The code is already there in the mail chain. I am adding it here again: public void produce(String topicName, String filePath, String bootstrapServers, String encoding) { try (BufferedReader bf = getBufferedReader(filePath, encoding); KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) { String line; while ((line = bf.readLine()) != null) { producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> { if (e != null) { e.printStackTrace(); } }); } producer.flush(); } catch (IOException e) { Throwables.propagate(e); } } private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServer); properties.put("key.serializer", StringSerializer.class.getCanonicalName()); properties.put("value.serializer",StringSerializer.class.getCanonicalName()); properties.put("acks", "-1"); properties.put("retries", 50000); properties.put("request.timeout.ms", 1); return new KafkaProducer<>(properties); } private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException { return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8"))); } Regards, Vatsal -----Original Message----- From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com] Sent: 06 December 2016 17:27 To: users@kafka.apache.org Subject: Re: Detecting when all the retries are expired for a message I believe batches in RecordAccumulator are expired after request.timeout.ms, so they wouldn't get retried in this case. I think the config options are quite confusing, making it hard to figure out the behavior without looking into the code. On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mes...@gmail.com<mailto:asaf.mes...@gmail.com>> wrote: > Vatsal: > > I don't think they merged the fix for this bug (retries doesn't work) > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547 > > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> > wrote: > > > Hello, > > > > Bumping up this thread in case anyone of you have any say on this issue. > > > > Regards, > > Vatsal > > > > -----Original Message----- > > From: Mevada, Vatsal > > Sent: 02 December 2016 16:16 > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > Subject: RE: Detecting when all the retries are expired for a > > message > > > > I executed the same producer code for a single record file with > > following > > config: > > > > properties.put("bootstrap.servers", bootstrapServer); > > properties.put("key.serializer", > > StringSerializer.class.getCanonicalName()); > > properties.put("value.serializer", > > StringSerializer.class.getCanonicalName()); > > properties.put("acks", "-1"); > > properties.put("retries", 50000); > > properties.put("request.timeout.ms", 1); > > > > I have kept request.timeout.ms=1 to make sure that message delivery > > will fail with TimeoutException. Since the retries are 50000 then > > the program should take at-least 50000 ms (50 seconds) to complete for > > single record. > > However the program is completing almost instantly with only one > > callback with TimeoutException. I suspect that producer is not going > > for any retries. Or am I missing something in my code? > > > > My Kafka version is 0.10.0.1. > > > > Regards, > > Vatsal > > Am I missing any configuration or > > -----Original Message----- > > From: Ismael Juma [mailto:isma...@gmail.com] > > Sent: 02 December 2016 13:30 > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > Subject: RE: Detecting when all the retries are expired for a > > message > > > > The callback is called after the retries have been exhausted. > > > > Ismael > > > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> wrote: > > > > > @Ismael: > > > > > > I can handle TimeoutException in the callback. However as per the > > > documentation of Callback(link: https://kafka.apache.org/0100/ > > > javadoc/org/apache/kafka/clients/producer/Callback.html), > > > TimeoutException is a retriable exception and it says that it "may > > > be covered by increasing #.retries". So even if I get > > > TimeoutException in callback, wouldn't it try to send message > > > again until all the retries are done? Would it be safe to assume > > > that message delivery is failed permanently just by encountering > > > TimeoutException in callback? > > > > > > Here is a snippet from above mentioned documentation: > > > "exception - The exception thrown during processing of this record. > > > Null if no error occurred. Possible thrown exceptions include: > > > Non-Retriable exceptions (fatal, the message will never be sent): > > > InvalidTopicException OffsetMetadataTooLargeException > > > RecordBatchTooLargeException RecordTooLargeException > > > UnknownServerException Retriable exceptions (transient, may be > > > covered by increasing #.retries): CorruptRecordException > > > InvalidMetadataException NotEnoughReplicasAfterAppendException > > > NotEnoughReplicasException OffsetOutOfRangeException > > > TimeoutException UnknownTopicOrPartitionException" > > > > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not > > > face the issue that you are mentioning. I mentioned documentation > > > link of 0.9 by mistake. > > > > > > Regards, > > > Vatsal > > > -----Original Message----- > > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com] > > > Sent: 02 December 2016 00:32 > > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > Subject: Re: Detecting when all the retries are expired for a > > > message > > > > > > There's a critical bug in that section that has only been fixed in > > > 0.9.0.2 which has not been release yet. Without the fix it doesn't > > really retry. > > > I forked the kafka repo, applied the fix, built it and placed it > > > in our own Nexus Maven repository until 0.9.0.2 will be released. > > > > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio > > > > > > Feel free to use it. > > > > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma > > > <ism...@juma.me.uk<mailto:ism...@juma.me.uk>> wrote: > > > > > > > The callback should give you what you are asking for. Has it not > > > > worked as you expect when you tried it? > > > > > > > > Ismael > > > > > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal > > > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > I am reading a file and dumping each record on Kafka. Here is > > > > > my producer > > > > > code: > > > > > > > > > > > > > > > > > > > > public void produce(String topicName, String filePath, String > > > > > bootstrapServers, String encoding) { > > > > > > > > > > try (BufferedReader bf = > > > > > getBufferedReader(filePath, encoding); > > > > > > > > > > KafkaProducer<Object, String> > > > > > producer = > > > > > initKafkaProducer(bootstrapServers)) { > > > > > > > > > > String line; > > > > > > > > > > while ((line = bf.readLine()) > > > > > != > > > > > null) { > > > > > > > > > > > > > > > producer.send(new ProducerRecord<>(topicName, line), > > > > > (metadata, e) -> { > > > > > > > > > > > > > > > if (e != > > > > > null) { > > > > > > > > > > > > > > > e.printStackTrace(); > > > > > > > > > > > > > > > } > > > > > > > > > > }); > > > > > > > > > > } > > > > > > > > > > producer.flush(); > > > > > > > > > > } catch (IOException e) { > > > > > > > > > > Throwables.propagate(e); > > > > > > > > > > } > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > private static KafkaProducer<Object, String> > > > > > initKafkaProducer(String > > > > > bootstrapServer) { > > > > > > > > > > Properties properties = new Properties(); > > > > > > > > > > properties.put("bootstrap.servers", > > > > > bootstrapServer); > > > > > > > > > > properties.put("key.serializer", > > > StringSerializer.class. > > > > > getCanonicalName()); > > > > > > > > > > properties.put("value.serializer", > > > > StringSerializer.class. > > > > > getCanonicalName()); > > > > > > > > > > properties.put("acks", "-1"); > > > > > > > > > > properties.put("retries", 10); > > > > > > > > > > return new KafkaProducer<>(properties); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > private BufferedReader getBufferedReader(String filePath, > > > > > String > > > > encoding) > > > > > throws UnsupportedEncodingException, FileNotFoundException { > > > > > > > > > > return new BufferedReader(new > > > > > InputStreamReader(new FileInputStream(filePath), > > Optional.ofNullable(encoding). > > > > > orElse("UTF-8"))); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > As per the official documentation of > > > > > Callback<https://kafka.apache > . > > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm > > > > > l>, TimeoutException is a retriable exception. As I have kept > > > > > retries 10, producer will try to resend the message if > > > > > delivering some message fails with TimeoutException. I am > > > > > looking for some reliable to way to detect > > > > when > > > > > delivery of a message is failed permanently after all retries. > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Vatsal > > > > > > > > > > > > > > > -- Regards, Rajini