Depending on you version, that is correct :)

And yes, you can use `ProducerConfigs.RETRIES_CONFIG`. At the end, the
config only understand String names anyway and parameters like
`ProducerConfigs.RETRIES_CONFIG` are just syntactic sugar to avoid types
in putting "retries" manually.

You might be interesting in this doc section about configuring internal
clients:
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#kafka-consumers-producer-and-admin-client-configuration-parameters

Also note, that in upcoming CP 5.0, configuring internal clients gets
improved further:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-276+Add+StreamsConfig+prefix+for+different+consumers


-Matthias

On 5/16/18 12:52 AM, Claudia Wegmann wrote:
> Thx for the pointers!
> I didn't set any of the conifg parameters you named. So retries defaults to 0.
> 
> Another question though:
> There is no 'retries' config for streams, just 'retry.backoff.ms'. Do I set 
> ProducerConfig.retries in my streams app?
> Also I do have to set 'max.in.flight.requests.per.connection' to 1 to still 
> guarantee ordering, right?
> 
> Best,
> Claudia
> 
> -----Ursprüngliche Nachricht-----
> Von: Matthias J. Sax <matth...@confluent.io> 
> Gesendet: Dienstag, 15. Mai 2018 22:58
> An: users@kafka.apache.org
> Betreff: Re: Exception stopps data processing (Kafka Streams)
> 
> Claudia,
> 
> I leader change is a retryable error. What is your producer config for 
> `retries`? You might want to increase it such that the producer does not 
> throw the exception immediately but retries couple of times -- you might also 
> want to adjust `retry.backoff.ms` that sets the time to wait until the 
> producer retries.
> 
> -Matthias
> 
> On 5/15/18 6:30 AM, Claudia Wegmann wrote:
>> Hey there,
>>
>> I've got a few Kafka Streams services which run smoothly most of the time. 
>> Sometimes, however, some of them get an exception "Abort sending since an 
>> error caught with a previous record" (see below for a full example). The 
>> Stream Service having this exception just stops its work altogether. After 
>> restarting it, the service starts to process all the messages that piled up 
>> and all is fine again. Is it possible for the Kafka Streams service to 
>> recover from such a situation itself?
>>
>> Thx for the input and best regards,
>> Claudia
>>
>>
>> A example stacktrace:
>> 15.5.2018 13:13:07Exception in thread 
>> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
>> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
>> since an error caught with a previous record (key 
>> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
>> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog 
>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
>> server is not the leader for that topic-partition..
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> 15.5.2018 13:13:07     at 
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>> 15.5.2018 13:13:07     at java.lang.Thread.run(Thread.java:748)
>> 15.5.2018 13:13:07Caused by: 
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
>> is not the leader for that topic-partition.
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to