Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Ismael Juma
Note that Sumant has been working on a KIP proposal to make the producer
timeout behaviour more intuitive:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

Ismael

On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram  wrote:

> If you just want to test retries, you could restart Kafka while the
> producer is running and you should see the producer retry while Kafka is
> down/leader is being elected after Kafka restarts. If you specifically want
> a TimeoutException to trigger all retries, I am not sure how you can. I
> would suggest that you raise a JIRA since the current behaviour is not very
> intuitive.
>
>
> On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal 
> wrote:
>
> > @Asaf
> >
> >
> >
> > Do I need to raise new bug for this?
> >
> >
> >
> > @Rajini
> >
> >
> >
> > Please suggest some the configuration with which retries should work
> > according to you. The code is already there in the mail chain. I am
> adding
> > it here again:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> > KafkaProducer producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != null) {
> >
> > producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",StringSerializer.
> > class.getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 5);
> >
> > properties.put("request.timeout.ms", 1);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
> >
> >
> > -Original Message-
> > From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> > Sent: 06 December 2016 17:27
> > To: users@kafka.apache.org
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> >
> >
> > I believe batches in RecordAccumulator are expired after
> > request.timeout.ms, so they wouldn't get retried in this case. I think
> > the config options are quite confusing, making it hard to figure out the
> > behavior without looking into the code.
> >
> >
> >
> > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  > > wrote:
> >
> >
> >
> > > Vatsal:
> >
> > >
> >
> > > I don't think they merged the fix for this bug (retries doesn't work)
> >
> > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
> >
> > >
> >
> > >
> >
> > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> >
> > > >
> >
> > > wrote:
> >
> > >
> >
> > > > Hello,
> >
> > > >
> >
> > > > Bumping up this thread in case anyone of you have any say on this
> > issue.
> >
> > > >
> >
> > > > Regards,
> >
> > > > Vatsal
> >
> > > >
> >
> > > > -Original Message-
> >
> > > > From: Mevada, Vatsal
> >
> > > > Sent: 02 December 2016 16:16
> >
> > > > To: Kafka Users 
> > >
> >
> > > > Subject: RE: Detecting when all the retries are expired for a
> >
> > > > message
> >
> > > >
> >
> > > > I executed the same producer code for a single record file with
> >
> > > > following
> >
> > > > config:
> >
> > > >
> >
> > > > properties.put("bootstrap.servers", bootstrapServer);
> >
> > > > 

Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Ismael Juma
Hi Asaf,

That PR is for the backport to 0.9.0.x, the original change was merged to
trunk and is in 0.10.x.x.

Ismael

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -Original Message-
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> > properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("acks", "-1");
> > properties.put("retries", 5);
> > properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 5 then the program
> > should take at-least 5 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -Original Message-
> > From: Ismael Juma [mailto:isma...@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -Original Message-
> > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users 
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > > producer
> > > > > code:
> > > > >
> > > > >
> > > > >
> > > > > public void produce(String topicName, String 

Re: Detecting when all the retries are expired for a message

2016-12-07 Thread Rajini Sivaram
If you just want to test retries, you could restart Kafka while the
producer is running and you should see the producer retry while Kafka is
down/leader is being elected after Kafka restarts. If you specifically want
a TimeoutException to trigger all retries, I am not sure how you can. I
would suggest that you raise a JIRA since the current behaviour is not very
intuitive.


On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal 
wrote:

> @Asaf
>
>
>
> Do I need to raise new bug for this?
>
>
>
> @Rajini
>
>
>
> Please suggest some the configuration with which retries should work
> according to you. The code is already there in the mail chain. I am adding
> it here again:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
> try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
> KafkaProducer producer =
> initKafkaProducer(bootstrapServers)) {
>
> String line;
>
> while ((line = bf.readLine()) != null) {
>
> producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
> if (e !=
> null) {
>
>
>   e.printStackTrace();
>
> }
>
> });
>
> }
>
> producer.flush();
>
> } catch (IOException e) {
>
> Throwables.propagate(e);
>
> }
>
> }
>
>
>
> private static KafkaProducer initKafkaProducer(String
> bootstrapServer) {
>
> Properties properties = new Properties();
>
> properties.put("bootstrap.servers", bootstrapServer);
>
> properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("value.serializer",StringSerializer.
> class.getCanonicalName());
>
> properties.put("acks", "-1");
>
> properties.put("retries", 5);
>
> properties.put("request.timeout.ms", 1);
>
> return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
> return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> Regards,
>
> Vatsal
>
>
>
> -Original Message-
> From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> Sent: 06 December 2016 17:27
> To: users@kafka.apache.org
> Subject: Re: Detecting when all the retries are expired for a message
>
>
>
> I believe batches in RecordAccumulator are expired after
> request.timeout.ms, so they wouldn't get retried in this case. I think
> the config options are quite confusing, making it hard to figure out the
> behavior without looking into the code.
>
>
>
> On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  > wrote:
>
>
>
> > Vatsal:
>
> >
>
> > I don't think they merged the fix for this bug (retries doesn't work)
>
> > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
> >
>
> >
>
> > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
>
> > >
>
> > wrote:
>
> >
>
> > > Hello,
>
> > >
>
> > > Bumping up this thread in case anyone of you have any say on this
> issue.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > >
>
> > > -Original Message-
>
> > > From: Mevada, Vatsal
>
> > > Sent: 02 December 2016 16:16
>
> > > To: Kafka Users 
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > I executed the same producer code for a single record file with
>
> > > following
>
> > > config:
>
> > >
>
> > > properties.put("bootstrap.servers", bootstrapServer);
>
> > > properties.put("key.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > > properties.put("value.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > > properties.put("acks", "-1");
>
> > > properties.put("retries", 5);
>
> > > properties.put("request.timeout.ms", 1);
>
> > >
>
> > > I have kept request.timeout.ms=1 to make sure that message delivery
>
> > > will fail with TimeoutException. Since the retries are 5 then
>
> > > the program should take at-least 5 ms (50 seconds) to complete for
> single record.
>
> > > However the program is completing almost instantly with only one
>
> > > callback with TimeoutException. I 

Re: Detecting when all the retries are expired for a message

2016-12-06 Thread Rajini Sivaram
I believe batches in RecordAccumulator are expired after request.timeout.ms,
so they wouldn't get retried in this case. I think the config options are
quite confusing, making it hard to figure out the behavior without looking
into the code.

On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika  wrote:

> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work) in
> 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -Original Message-
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > I executed the same producer code for a single record file with following
> > config:
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> > properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("acks", "-1");
> > properties.put("retries", 5);
> > properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery will
> > fail with TimeoutException. Since the retries are 5 then the program
> > should take at-least 5 ms (50 seconds) to complete for single record.
> > However the program is completing almost instantly with only one callback
> > with TimeoutException. I suspect that producer is not going for any
> > retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -Original Message-
> > From: Ismael Juma [mailto:isma...@gmail.com]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users 
> > Subject: RE: Detecting when all the retries are expired for a message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may be
> > > covered by increasing #.retries". So even if I get TimeoutException in
> > > callback, wouldn't it try to send message again until all the retries
> > > are done? Would it be safe to assume that message delivery is failed
> > > permanently just by encountering TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be covered
> > > by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > > UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation link
> > > of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -Original Message-
> > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users 
> > > Subject: Re: Detecting when all the retries are expired for a message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it in
> > > our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is my
> > 

Re: Detecting when all the retries are expired for a message

2016-12-06 Thread Asaf Mesika
Vatsal:

I don't think they merged the fix for this bug (retries doesn't work) in
0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547


On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal 
wrote:

> Hello,
>
> Bumping up this thread in case anyone of you have any say on this issue.
>
> Regards,
> Vatsal
>
> -Original Message-
> From: Mevada, Vatsal
> Sent: 02 December 2016 16:16
> To: Kafka Users 
> Subject: RE: Detecting when all the retries are expired for a message
>
> I executed the same producer code for a single record file with following
> config:
>
> properties.put("bootstrap.servers", bootstrapServer);
> properties.put("key.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> properties.put("acks", "-1");
> properties.put("retries", 5);
> properties.put("request.timeout.ms", 1);
>
> I have kept request.timeout.ms=1 to make sure that message delivery will
> fail with TimeoutException. Since the retries are 5 then the program
> should take at-least 5 ms (50 seconds) to complete for single record.
> However the program is completing almost instantly with only one callback
> with TimeoutException. I suspect that producer is not going for any
> retries. Or am I missing something in my code?
>
> My Kafka version is 0.10.0.1.
>
> Regards,
> Vatsal
> Am I missing any configuration or
> -Original Message-
> From: Ismael Juma [mailto:isma...@gmail.com]
> Sent: 02 December 2016 13:30
> To: Kafka Users 
> Subject: RE: Detecting when all the retries are expired for a message
>
> The callback is called after the retries have been exhausted.
>
> Ismael
>
> On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:
>
> > @Ismael:
> >
> > I can handle TimeoutException in the callback. However as per the
> > documentation of Callback(link: https://kafka.apache.org/0100/
> > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > TimeoutException is a retriable exception and it says that it "may be
> > covered by increasing #.retries". So even if I get TimeoutException in
> > callback, wouldn't it try to send message again until all the retries
> > are done? Would it be safe to assume that message delivery is failed
> > permanently just by encountering TimeoutException in callback?
> >
> > Here is a snippet from above mentioned documentation:
> > "exception - The exception thrown during processing of this record.
> > Null if no error occurred. Possible thrown exceptions include:
> > Non-Retriable exceptions (fatal, the message will never be sent):
> > InvalidTopicException OffsetMetadataTooLargeException
> > RecordBatchTooLargeException RecordTooLargeException
> > UnknownServerException Retriable exceptions (transient, may be covered
> > by increasing #.retries): CorruptRecordException
> > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > UnknownTopicOrPartitionException"
> >
> > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > face the issue that you are mentioning. I mentioned documentation link
> > of 0.9 by mistake.
> >
> > Regards,
> > Vatsal
> > -Original Message-
> > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> > Sent: 02 December 2016 00:32
> > To: Kafka Users 
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> > There's a critical bug in that section that has only been fixed in
> > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> really retry.
> > I forked the kafka repo, applied the fix, built it and placed it in
> > our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > Feel free to use it.
> >
> > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
> >
> > > The callback should give you what you are asking for. Has it not
> > > worked as you expect when you tried it?
> > >
> > > Ismael
> > >
> > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > producer
> > > > code:
> > > >
> > > >
> > > >
> > > > public void produce(String topicName, String filePath, String
> > > > bootstrapServers, String encoding) {
> > > >
> > > > try (BufferedReader bf =
> > > > getBufferedReader(filePath, encoding);
> > > >
> > > > KafkaProducer
> > > > producer =
> > > > initKafkaProducer(bootstrapServers)) {
> > > >
> > > > String line;
> > > >
> > > > while ((line = bf.readLine()) 

RE: Detecting when all the retries are expired for a message

2016-12-06 Thread Mevada, Vatsal
Hello,

Bumping up this thread in case anyone of you have any say on this issue.

Regards,
Vatsal

-Original Message-
From: Mevada, Vatsal 
Sent: 02 December 2016 16:16
To: Kafka Users 
Subject: RE: Detecting when all the retries are expired for a message

I executed the same producer code for a single record file with following 
config:

properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 5);
properties.put("request.timeout.ms", 1);

I have kept request.timeout.ms=1 to make sure that message delivery will fail 
with TimeoutException. Since the retries are 5 then the program should take 
at-least 5 ms (50 seconds) to complete for single record. However the 
program is completing almost instantly with only one callback with 
TimeoutException. I suspect that producer is not going for any retries. Or am I 
missing something in my code? 

My Kafka version is 0.10.0.1.

Regards,
Vatsal
Am I missing any configuration or
-Original Message-
From: Ismael Juma [mailto:isma...@gmail.com]
Sent: 02 December 2016 13:30
To: Kafka Users 
Subject: RE: Detecting when all the retries are expired for a message

The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the 
> documentation of Callback(link: https://kafka.apache.org/0100/ 
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be 
> covered by increasing #.retries". So even if I get TimeoutException in 
> callback, wouldn't it try to send message again until all the retries 
> are done? Would it be safe to assume that message delivery is failed 
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. 
> Null if no error occurred. Possible thrown exceptions include: 
> Non-Retriable exceptions (fatal, the message will never be sent): 
> InvalidTopicException OffsetMetadataTooLargeException 
> RecordBatchTooLargeException RecordTooLargeException 
> UnknownServerException Retriable exceptions (transient, may be covered 
> by increasing #.retries): CorruptRecordException 
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not 
> face the issue that you are mentioning. I mentioned documentation link 
> of 0.9 by mistake.
>
> Regards,
> Vatsal
> -Original Message-
> From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users 
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in
> 0.9.0.2 which has not been release yet. Without the fix it doesn't really 
> retry.
> I forked the kafka repo, applied the fix, built it and placed it in 
> our own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
>
> > The callback should give you what you are asking for. Has it not 
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> > 
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my 
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String 
> > > bootstrapServers, String encoding) {
> > >
> > > try (BufferedReader bf = 
> > > getBufferedReader(filePath, encoding);
> > >
> > > KafkaProducer 
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > > String line;
> > >
> > > while ((line = bf.readLine()) !=
> > > null) {
> > >
> > > producer.send(new 
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > > if 
> > > (e !=
> > > null) {
> > >
> > >
> > >   e.printStackTrace();
> > >
> > > }
> > >
> > >  

RE: Detecting when all the retries are expired for a message

2016-12-02 Thread Mevada, Vatsal
I executed the same producer code for a single record file with following 
config:

properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 5);
properties.put("request.timeout.ms", 1);

I have kept request.timeout.ms=1 to make sure that message delivery will fail 
with TimeoutException. Since the retries are 5 then the program should take 
at-least 5 ms (50 seconds) to complete for single record. However the 
program is completing almost instantly with only one callback with 
TimeoutException. I suspect that producer is not going for any retries. Or am I 
missing something in my code? 

My Kafka version is 0.10.0.1.

Regards,
Vatsal
Am I missing any configuration or 
-Original Message-
From: Ismael Juma [mailto:isma...@gmail.com] 
Sent: 02 December 2016 13:30
To: Kafka Users 
Subject: RE: Detecting when all the retries are expired for a message

The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the 
> documentation of Callback(link: https://kafka.apache.org/0100/ 
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be 
> covered by increasing #.retries". So even if I get TimeoutException in 
> callback, wouldn't it try to send message again until all the retries 
> are done? Would it be safe to assume that message delivery is failed 
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. 
> Null if no error occurred. Possible thrown exceptions include: 
> Non-Retriable exceptions (fatal, the message will never be sent): 
> InvalidTopicException OffsetMetadataTooLargeException 
> RecordBatchTooLargeException RecordTooLargeException 
> UnknownServerException Retriable exceptions (transient, may be covered 
> by increasing #.retries): CorruptRecordException 
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not 
> face the issue that you are mentioning. I mentioned documentation link 
> of 0.9 by mistake.
>
> Regards,
> Vatsal
> -Original Message-
> From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users 
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in 
> 0.9.0.2 which has not been release yet. Without the fix it doesn't really 
> retry.
> I forked the kafka repo, applied the fix, built it and placed it in 
> our own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
>
> > The callback should give you what you are asking for. Has it not 
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> > 
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my 
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String 
> > > bootstrapServers, String encoding) {
> > >
> > > try (BufferedReader bf = 
> > > getBufferedReader(filePath, encoding);
> > >
> > > KafkaProducer 
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > > String line;
> > >
> > > while ((line = bf.readLine()) !=
> > > null) {
> > >
> > > producer.send(new 
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > > if 
> > > (e !=
> > > null) {
> > >
> > >
> > >   e.printStackTrace();
> > >
> > > }
> > >
> > > });
> > >
> > > }
> > >
> > > producer.flush();
> > >
> > > } catch (IOException e) {
> > >
> > > Throwables.propagate(e);
> > >
> > > }
> > >
> > 

RE: Detecting when all the retries are expired for a message

2016-12-02 Thread Ismael Juma
The callback is called after the retries have been exhausted.

Ismael

On 2 Dec 2016 3:34 am, "Mevada, Vatsal"  wrote:

> @Ismael:
>
> I can handle TimeoutException in the callback. However as per the
> documentation of Callback(link: https://kafka.apache.org/0100/
> javadoc/org/apache/kafka/clients/producer/Callback.html),
> TimeoutException is a retriable exception and it says that it "may be
> covered by increasing #.retries". So even if I get TimeoutException in
> callback, wouldn't it try to send message again until all the retries are
> done? Would it be safe to assume that message delivery is failed
> permanently just by encountering TimeoutException in callback?
>
> Here is a snippet from above mentioned documentation:
> "exception - The exception thrown during processing of this record. Null
> if no error occurred. Possible thrown exceptions include: Non-Retriable
> exceptions (fatal, the message will never be sent): InvalidTopicException
> OffsetMetadataTooLargeException RecordBatchTooLargeException
> RecordTooLargeException UnknownServerException Retriable exceptions
> (transient, may be covered by increasing #.retries): CorruptRecordException
> InvalidMetadataException NotEnoughReplicasAfterAppendException
> NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> UnknownTopicOrPartitionException"
>
> @asaf :My kafka - API version is 0.10.0.1. So I think I should not face
> the issue that you are mentioning. I mentioned documentation link of 0.9 by
> mistake.
>
> Regards,
> Vatsal
> -Original Message-
> From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
> Sent: 02 December 2016 00:32
> To: Kafka Users 
> Subject: Re: Detecting when all the retries are expired for a message
>
> There's a critical bug in that section that has only been fixed in 0.9.0.2
> which has not been release yet. Without the fix it doesn't really retry.
> I forked the kafka repo, applied the fix, built it and placed it in our
> own Nexus Maven repository until 0.9.0.2 will be released.
>
> https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> Feel free to use it.
>
> On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:
>
> > The callback should give you what you are asking for. Has it not
> > worked as you expect when you tried it?
> >
> > Ismael
> >
> > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > 
> > wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > I am reading a file and dumping each record on Kafka. Here is my
> > > producer
> > > code:
> > >
> > >
> > >
> > > public void produce(String topicName, String filePath, String
> > > bootstrapServers, String encoding) {
> > >
> > > try (BufferedReader bf = getBufferedReader(filePath,
> > > encoding);
> > >
> > > KafkaProducer
> > > producer =
> > > initKafkaProducer(bootstrapServers)) {
> > >
> > > String line;
> > >
> > > while ((line = bf.readLine()) !=
> > > null) {
> > >
> > > producer.send(new
> > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > >
> > > if
> > > (e !=
> > > null) {
> > >
> > >
> > >   e.printStackTrace();
> > >
> > > }
> > >
> > > });
> > >
> > > }
> > >
> > > producer.flush();
> > >
> > > } catch (IOException e) {
> > >
> > > Throwables.propagate(e);
> > >
> > > }
> > >
> > > }
> > >
> > >
> > >
> > > private static KafkaProducer
> > > initKafkaProducer(String
> > > bootstrapServer) {
> > >
> > > Properties properties = new Properties();
> > >
> > > properties.put("bootstrap.servers",
> > > bootstrapServer);
> > >
> > > properties.put("key.serializer",
> StringSerializer.class.
> > > getCanonicalName());
> > >
> > > properties.put("value.serializer",
> > StringSerializer.class.
> > > getCanonicalName());
> > >
> > > properties.put("acks", "-1");
> > >
> > > properties.put("retries", 10);
> > >
> > > return new KafkaProducer<>(properties);
> > >
> > > }
> > >
> > >
> > >
> > > private BufferedReader getBufferedReader(String filePath, String
> > encoding)
> > > throws UnsupportedEncodingException, FileNotFoundException {
> > >
> > > return new BufferedReader(new InputStreamReader(new
> > > FileInputStream(filePath), Optional.ofNullable(encoding).
> > > orElse("UTF-8")));
> > >
> > > }
> > >
> > >
> > >
> > > As per the official documentation of Callback > > 

RE: Detecting when all the retries are expired for a message

2016-12-01 Thread Mevada, Vatsal
@Ismael:

I can handle TimeoutException in the callback. However as per the documentation 
of Callback(link: 
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Callback.html),
 TimeoutException is a retriable exception and it says that it "may be covered 
by increasing #.retries". So even if I get TimeoutException in callback, 
wouldn't it try to send message again until all the retries are done? Would it 
be safe to assume that message delivery is failed 
permanently just by encountering TimeoutException in callback?

Here is a snippet from above mentioned documentation: 
"exception - The exception thrown during processing of this record. Null if no 
error occurred. Possible thrown exceptions include: Non-Retriable exceptions 
(fatal, the message will never be sent): InvalidTopicException 
OffsetMetadataTooLargeException RecordBatchTooLargeException 
RecordTooLargeException UnknownServerException Retriable exceptions (transient, 
may be covered by increasing #.retries): CorruptRecordException 
InvalidMetadataException NotEnoughReplicasAfterAppendException 
NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
UnknownTopicOrPartitionException"

@asaf :My kafka - API version is 0.10.0.1. So I think I should not face the 
issue that you are mentioning. I mentioned documentation link of 0.9 by mistake.

Regards,
Vatsal
-Original Message-
From: Asaf Mesika [mailto:asaf.mes...@gmail.com] 
Sent: 02 December 2016 00:32
To: Kafka Users 
Subject: Re: Detecting when all the retries are expired for a message

There's a critical bug in that section that has only been fixed in 0.9.0.2 
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own 
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:

> The callback should give you what you are asking for. Has it not 
> worked as you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> 
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my 
> > producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String 
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath, 
> > encoding);
> >
> > KafkaProducer 
> > producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != 
> > null) {
> >
> > producer.send(new 
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if 
> > (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer 
> > initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", 
> > bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 10);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new 
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 
> > 10, producer will try to resend the message if delivering some 
> > message fails with TimeoutException. I am looking for some reliable 
> > to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>


Re: Detecting when all the retries are expired for a message

2016-12-01 Thread Asaf Mesika
There's a critical bug in that section that has only been fixed in 0.9.0.2
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:

> The callback should give you what you are asking for. Has it not worked as
> you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> > KafkaProducer producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != null) {
> >
> > producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 10);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 10,
> > producer will try to resend the message if delivering some message fails
> > with TimeoutException. I am looking for some reliable to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>


Re: Detecting when all the retries are expired for a message

2016-12-01 Thread Ismael Juma
The callback should give you what you are asking for. Has it not worked as
you expect when you tried it?

Ismael

On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
wrote:

> Hi,
>
>
>
> I am reading a file and dumping each record on Kafka. Here is my producer
> code:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
> try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
> KafkaProducer producer =
> initKafkaProducer(bootstrapServers)) {
>
> String line;
>
> while ((line = bf.readLine()) != null) {
>
> producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
> if (e !=
> null) {
>
>
>   e.printStackTrace();
>
> }
>
> });
>
> }
>
> producer.flush();
>
> } catch (IOException e) {
>
> Throwables.propagate(e);
>
> }
>
> }
>
>
>
> private static KafkaProducer initKafkaProducer(String
> bootstrapServer) {
>
> Properties properties = new Properties();
>
> properties.put("bootstrap.servers", bootstrapServer);
>
> properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("value.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("acks", "-1");
>
> properties.put("retries", 10);
>
> return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
> return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> As per the official documentation of Callback org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> TimeoutException is a retriable exception. As I have kept retries 10,
> producer will try to resend the message if delivering some message fails
> with TimeoutException. I am looking for some reliable to way to detect when
> delivery of a message is failed permanently after all retries.
>
>
>
> Regards,
>
> Vatsal
>


Detecting when all the retries are expired for a message

2016-12-01 Thread Mevada, Vatsal
Hi,



I am reading a file and dumping each record on Kafka. Here is my producer code:



public void produce(String topicName, String filePath, String bootstrapServers, 
String encoding) {

try (BufferedReader bf = getBufferedReader(filePath, encoding);

KafkaProducer producer = 
initKafkaProducer(bootstrapServers)) {

String line;

while ((line = bf.readLine()) != null) {

producer.send(new 
ProducerRecord<>(topicName, line), (metadata, e) -> {

if (e != null) {


e.printStackTrace();

}

});

}

producer.flush();

} catch (IOException e) {

Throwables.propagate(e);

}

}



private static KafkaProducer initKafkaProducer(String 
bootstrapServer) {

Properties properties = new Properties();

properties.put("bootstrap.servers", bootstrapServer);

properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());

properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());

properties.put("acks", "-1");

properties.put("retries", 10);

return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) 
throws UnsupportedEncodingException, FileNotFoundException {

return new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



As per the official documentation of 
Callback,
 TimeoutException is a retriable exception. As I have kept retries 10, producer 
will try to resend the message if delivering some message fails with 
TimeoutException. I am looking for some reliable to way to detect when delivery 
of a message is failed permanently after all retries.



Regards,

Vatsal