Re: AW: Exception stopps data processing (Kafka Streams)

2018-05-16 Thread Matthias J. Sax
Depending on you version, that is correct :)

And yes, you can use `ProducerConfigs.RETRIES_CONFIG`. At the end, the
config only understand String names anyway and parameters like
`ProducerConfigs.RETRIES_CONFIG` are just syntactic sugar to avoid types
in putting "retries" manually.

You might be interesting in this doc section about configuring internal
clients:
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#kafka-consumers-producer-and-admin-client-configuration-parameters

Also note, that in upcoming CP 5.0, configuring internal clients gets
improved further:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-276+Add+StreamsConfig+prefix+for+different+consumers


-Matthias

On 5/16/18 12:52 AM, Claudia Wegmann wrote:
> Thx for the pointers!
> I didn't set any of the conifg parameters you named. So retries defaults to 0.
> 
> Another question though:
> There is no 'retries' config for streams, just 'retry.backoff.ms'. Do I set 
> ProducerConfig.retries in my streams app?
> Also I do have to set 'max.in.flight.requests.per.connection' to 1 to still 
> guarantee ordering, right?
> 
> Best,
> Claudia
> 
> -Ursprüngliche Nachricht-
> Von: Matthias J. Sax <matth...@confluent.io> 
> Gesendet: Dienstag, 15. Mai 2018 22:58
> An: users@kafka.apache.org
> Betreff: Re: Exception stopps data processing (Kafka Streams)
> 
> Claudia,
> 
> I leader change is a retryable error. What is your producer config for 
> `retries`? You might want to increase it such that the producer does not 
> throw the exception immediately but retries couple of times -- you might also 
> want to adjust `retry.backoff.ms` that sets the time to wait until the 
> producer retries.
> 
> -Matthias
> 
> On 5/15/18 6:30 AM, Claudia Wegmann wrote:
>> Hey there,
>>
>> I've got a few Kafka Streams services which run smoothly most of the time. 
>> Sometimes, however, some of them get an exception "Abort sending since an 
>> error caught with a previous record" (see below for a full example). The 
>> Stream Service having this exception just stops its work altogether. After 
>> restarting it, the service starts to process all the messages that piled up 
>> and all is fine again. Is it possible for the Kafka Streams service to 
>> recover from such a situation itself?
>>
>> Thx for the input and best regards,
>> Claudia
>>
>>
>> A example stacktrace:
>> 15.5.2018 13:13:07Exception in thread 
>> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
>> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
>> since an error caught with a previous record (key 
>> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
>> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog 
>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
>> server is not the leader for that topic-partition..
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> 15.5.2018 13:13:07 at 
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>> 15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748)
>> 15.5.2018 13:13:07Caused by: 
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
>> is not the leader for that topic-partition.
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


AW: Exception stopps data processing (Kafka Streams)

2018-05-16 Thread Claudia Wegmann
Thx for the pointers!
I didn't set any of the conifg parameters you named. So retries defaults to 0.

Another question though:
There is no 'retries' config for streams, just 'retry.backoff.ms'. Do I set 
ProducerConfig.retries in my streams app?
Also I do have to set 'max.in.flight.requests.per.connection' to 1 to still 
guarantee ordering, right?

Best,
Claudia

-Ursprüngliche Nachricht-
Von: Matthias J. Sax <matth...@confluent.io> 
Gesendet: Dienstag, 15. Mai 2018 22:58
An: users@kafka.apache.org
Betreff: Re: Exception stopps data processing (Kafka Streams)

Claudia,

I leader change is a retryable error. What is your producer config for 
`retries`? You might want to increase it such that the producer does not throw 
the exception immediately but retries couple of times -- you might also want to 
adjust `retry.backoff.ms` that sets the time to wait until the producer retries.

-Matthias

On 5/15/18 6:30 AM, Claudia Wegmann wrote:
> Hey there,
> 
> I've got a few Kafka Streams services which run smoothly most of the time. 
> Sometimes, however, some of them get an exception "Abort sending since an 
> error caught with a previous record" (see below for a full example). The 
> Stream Service having this exception just stops its work altogether. After 
> restarting it, the service starts to process all the messages that piled up 
> and all is fine again. Is it possible for the Kafka Streams service to 
> recover from such a situation itself?
> 
> Thx for the input and best regards,
> Claudia
> 
> 
> A example stacktrace:
> 15.5.2018 13:13:07Exception in thread 
> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
> since an error caught with a previous record (key 
> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog 
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition..
> 15.5.2018 13:13:07 at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> 15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748)
> 15.5.2018 13:13:07Caused by: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> 
> 



Re: Exception stopps data processing (Kafka Streams)

2018-05-15 Thread Matthias J. Sax
Claudia,

I leader change is a retryable error. What is your producer config for
`retries`? You might want to increase it such that the producer does not
throw the exception immediately but retries couple of times -- you might
also want to adjust `retry.backoff.ms` that sets the time to wait until
the producer retries.

-Matthias

On 5/15/18 6:30 AM, Claudia Wegmann wrote:
> Hey there,
> 
> I've got a few Kafka Streams services which run smoothly most of the time. 
> Sometimes, however, some of them get an exception "Abort sending since an 
> error caught with a previous record" (see below for a full example). The 
> Stream Service having this exception just stops its work altogether. After 
> restarting it, the service starts to process all the messages that piled up 
> and all is fine again. Is it possible for the Kafka Streams service to 
> recover from such a situation itself?
> 
> Thx for the input and best regards,
> Claudia
> 
> 
> A example stacktrace:
> 15.5.2018 13:13:07Exception in thread 
> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
> since an error caught with a previous record (key 
> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog 
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition..
> 15.5.2018 13:13:07 at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> 15.5.2018 13:13:07 at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> 15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748)
> 15.5.2018 13:13:07Caused by: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> 
> 



signature.asc
Description: OpenPGP digital signature


Exception stopps data processing (Kafka Streams)

2018-05-15 Thread Claudia Wegmann
Hey there,

I've got a few Kafka Streams services which run smoothly most of the time. 
Sometimes, however, some of them get an exception "Abort sending since an error 
caught with a previous record" (see below for a full example). The Stream 
Service having this exception just stops its work altogether. After restarting 
it, the service starts to process all the messages that piled up and all is 
fine again. Is it possible for the Kafka Streams service to recover from such a 
situation itself?

Thx for the input and best regards,
Claudia


A example stacktrace:
15.5.2018 13:13:07Exception in thread 
"test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
since an error caught with a previous record (key 
25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog due 
to org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition..
15.5.2018 13:13:07 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
15.5.2018 13:13:07 at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748)
15.5.2018 13:13:07Caused by: 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.