@Asaf


Do I need to raise new bug for this?



@Rajini



Please suggest some the configuration with which retries should work according 
to you. The code is already there in the mail chain. I am adding it here again:



public void produce(String topicName, String filePath, String bootstrapServers, 
String encoding) {

                try (BufferedReader bf = getBufferedReader(filePath, encoding);

                                KafkaProducer<Object, String> producer = 
initKafkaProducer(bootstrapServers)) {

                                String line;

                                while ((line = bf.readLine()) != null) {

                                                producer.send(new 
ProducerRecord<>(topicName, line), (metadata, e) -> {

                                                                if (e != null) {

                                                                                
e.printStackTrace();

                                                                }

                                                });

                                }

                                producer.flush();

                } catch (IOException e) {

                                Throwables.propagate(e);

                }

}



private static KafkaProducer<Object, String> initKafkaProducer(String 
bootstrapServer) {

                Properties properties = new Properties();

                properties.put("bootstrap.servers", bootstrapServer);

                properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());

                
properties.put("value.serializer",StringSerializer.class.getCanonicalName());

                properties.put("acks", "-1");

                properties.put("retries", 50000);

                properties.put("request.timeout.ms", 1);

                return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) 
throws UnsupportedEncodingException, FileNotFoundException {

                return new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



Regards,

Vatsal



-----Original Message-----
From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
Sent: 06 December 2016 17:27
To: users@kafka.apache.org
Subject: Re: Detecting when all the retries are expired for a message



I believe batches in RecordAccumulator are expired after request.timeout.ms, so 
they wouldn't get retried in this case. I think the config options are quite 
confusing, making it hard to figure out the behavior without looking into the 
code.



On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika 
<asaf.mes...@gmail.com<mailto:asaf.mes...@gmail.com>> wrote:



> Vatsal:

>

> I don't think they merged the fix for this bug (retries doesn't work)

> in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547

>

>

> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal

> <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>>

> wrote:

>

> > Hello,

> >

> > Bumping up this thread in case anyone of you have any say on this issue.

> >

> > Regards,

> > Vatsal

> >

> > -----Original Message-----

> > From: Mevada, Vatsal

> > Sent: 02 December 2016 16:16

> > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>>

> > Subject: RE: Detecting when all the retries are expired for a

> > message

> >

> > I executed the same producer code for a single record file with

> > following

> > config:

> >

> >         properties.put("bootstrap.servers", bootstrapServer);

> >         properties.put("key.serializer",

> > StringSerializer.class.getCanonicalName());

> >         properties.put("value.serializer",

> > StringSerializer.class.getCanonicalName());

> >         properties.put("acks", "-1");

> >         properties.put("retries", 50000);

> >         properties.put("request.timeout.ms", 1);

> >

> > I have kept request.timeout.ms=1 to make sure that message delivery

> > will fail with TimeoutException. Since the retries are 50000 then

> > the program should take at-least 50000 ms (50 seconds) to complete for 
> > single record.

> > However the program is completing almost instantly with only one

> > callback with TimeoutException. I suspect that producer is not going

> > for any retries. Or am I missing something in my code?

> >

> > My Kafka version is 0.10.0.1.

> >

> > Regards,

> > Vatsal

> > Am I missing any configuration or

> > -----Original Message-----

> > From: Ismael Juma [mailto:isma...@gmail.com]

> > Sent: 02 December 2016 13:30

> > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>>

> > Subject: RE: Detecting when all the retries are expired for a

> > message

> >

> > The callback is called after the retries have been exhausted.

> >

> > Ismael

> >

> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" 
> > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> wrote:

> >

> > > @Ismael:

> > >

> > > I can handle TimeoutException in the callback. However as per the

> > > documentation of Callback(link: https://kafka.apache.org/0100/

> > > javadoc/org/apache/kafka/clients/producer/Callback.html),

> > > TimeoutException is a retriable exception and it says that it "may

> > > be covered by increasing #.retries". So even if I get

> > > TimeoutException in callback, wouldn't it try to send message

> > > again until all the retries are done? Would it be safe to assume

> > > that message delivery is failed permanently just by encountering 
> > > TimeoutException in callback?

> > >

> > > Here is a snippet from above mentioned documentation:

> > > "exception - The exception thrown during processing of this record.

> > > Null if no error occurred. Possible thrown exceptions include:

> > > Non-Retriable exceptions (fatal, the message will never be sent):

> > > InvalidTopicException OffsetMetadataTooLargeException

> > > RecordBatchTooLargeException RecordTooLargeException

> > > UnknownServerException Retriable exceptions (transient, may be

> > > covered by increasing #.retries): CorruptRecordException

> > > InvalidMetadataException NotEnoughReplicasAfterAppendException

> > > NotEnoughReplicasException OffsetOutOfRangeException

> > > TimeoutException UnknownTopicOrPartitionException"

> > >

> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not

> > > face the issue that you are mentioning. I mentioned documentation

> > > link of 0.9 by mistake.

> > >

> > > Regards,

> > > Vatsal

> > > -----Original Message-----

> > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]

> > > Sent: 02 December 2016 00:32

> > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>>

> > > Subject: Re: Detecting when all the retries are expired for a

> > > message

> > >

> > > There's a critical bug in that section that has only been fixed in

> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't

> > really retry.

> > > I forked the kafka repo, applied the fix, built it and placed it

> > > in our own Nexus Maven repository until 0.9.0.2 will be released.

> > >

> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

> > >

> > > Feel free to use it.

> > >

> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma 
> > > <ism...@juma.me.uk<mailto:ism...@juma.me.uk>> wrote:

> > >

> > > > The callback should give you what you are asking for. Has it not

> > > > worked as you expect when you tried it?

> > > >

> > > > Ismael

> > > >

> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal

> > > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>>

> > > > wrote:

> > > >

> > > > > Hi,

> > > > >

> > > > >

> > > > >

> > > > > I am reading a file and dumping each record on Kafka. Here is

> > > > > my producer

> > > > > code:

> > > > >

> > > > >

> > > > >

> > > > > public void produce(String topicName, String filePath, String

> > > > > bootstrapServers, String encoding) {

> > > > >

> > > > >                 try (BufferedReader bf =

> > > > > getBufferedReader(filePath, encoding);

> > > > >

> > > > >                                 KafkaProducer<Object, String>

> > > > > producer =

> > > > > initKafkaProducer(bootstrapServers)) {

> > > > >

> > > > >                                 String line;

> > > > >

> > > > >                                 while ((line = bf.readLine())

> > > > > !=

> > > > > null) {

> > > > >

> > > > >

> > > > > producer.send(new ProducerRecord<>(topicName, line),

> > > > > (metadata, e) -> {

> > > > >

> > > > >

> > > > > if (e !=

> > > > > null) {

> > > > >

> > > > >

> > > > >       e.printStackTrace();

> > > > >

> > > > >

> > > > > }

> > > > >

> > > > >                                                 });

> > > > >

> > > > >                                 }

> > > > >

> > > > >                                 producer.flush();

> > > > >

> > > > >                 } catch (IOException e) {

> > > > >

> > > > >                                 Throwables.propagate(e);

> > > > >

> > > > >                 }

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > private static KafkaProducer<Object, String>

> > > > > initKafkaProducer(String

> > > > > bootstrapServer) {

> > > > >

> > > > >                 Properties properties = new Properties();

> > > > >

> > > > >                 properties.put("bootstrap.servers",

> > > > > bootstrapServer);

> > > > >

> > > > >                 properties.put("key.serializer",

> > > StringSerializer.class.

> > > > > getCanonicalName());

> > > > >

> > > > >                 properties.put("value.serializer",

> > > > StringSerializer.class.

> > > > > getCanonicalName());

> > > > >

> > > > >                 properties.put("acks", "-1");

> > > > >

> > > > >                 properties.put("retries", 10);

> > > > >

> > > > >                 return new KafkaProducer<>(properties);

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > private BufferedReader getBufferedReader(String filePath,

> > > > > String

> > > > encoding)

> > > > > throws UnsupportedEncodingException, FileNotFoundException {

> > > > >

> > > > >                 return new BufferedReader(new

> > > > > InputStreamReader(new FileInputStream(filePath),

> > Optional.ofNullable(encoding).

> > > > > orElse("UTF-8")));

> > > > >

> > > > > }

> > > > >

> > > > >

> > > > >

> > > > > As per the official documentation of

> > > > > Callback<https://kafka.apache

> .

> > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm

> > > > > l>, TimeoutException is a retriable exception. As I have kept

> > > > > retries 10, producer will try to resend the message if

> > > > > delivering some message fails with TimeoutException. I am

> > > > > looking for some reliable to way to detect

> > > > when

> > > > > delivery of a message is failed permanently after all retries.

> > > > >

> > > > >

> > > > >

> > > > > Regards,

> > > > >

> > > > > Vatsal

> > > > >

> > > >

> > >

> >

>







--

Regards,



Rajini

Reply via email to