Depending on you version, that is correct :) And yes, you can use `ProducerConfigs.RETRIES_CONFIG`. At the end, the config only understand String names anyway and parameters like `ProducerConfigs.RETRIES_CONFIG` are just syntactic sugar to avoid types in putting "retries" manually.
You might be interesting in this doc section about configuring internal clients: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#kafka-consumers-producer-and-admin-client-configuration-parameters Also note, that in upcoming CP 5.0, configuring internal clients gets improved further: https://cwiki.apache.org/confluence/display/KAFKA/KIP-276+Add+StreamsConfig+prefix+for+different+consumers -Matthias On 5/16/18 12:52 AM, Claudia Wegmann wrote: > Thx for the pointers! > I didn't set any of the conifg parameters you named. So retries defaults to 0. > > Another question though: > There is no 'retries' config for streams, just 'retry.backoff.ms'. Do I set > ProducerConfig.retries in my streams app? > Also I do have to set 'max.in.flight.requests.per.connection' to 1 to still > guarantee ordering, right? > > Best, > Claudia > > -----Ursprüngliche Nachricht----- > Von: Matthias J. Sax <matth...@confluent.io> > Gesendet: Dienstag, 15. Mai 2018 22:58 > An: users@kafka.apache.org > Betreff: Re: Exception stopps data processing (Kafka Streams) > > Claudia, > > I leader change is a retryable error. What is your producer config for > `retries`? You might want to increase it such that the producer does not > throw the exception immediately but retries couple of times -- you might also > want to adjust `retry.backoff.ms` that sets the time to wait until the > producer retries. > > -Matthias > > On 5/15/18 6:30 AM, Claudia Wegmann wrote: >> Hey there, >> >> I've got a few Kafka Streams services which run smoothly most of the time. >> Sometimes, however, some of them get an exception "Abort sending since an >> error caught with a previous record" (see below for a full example). The >> Stream Service having this exception just stops its work altogether. After >> restarting it, the service starts to process all the messages that piled up >> and all is fine again. Is it possible for the Kafka Streams service to >> recover from such a situation itself? >> >> Thx for the input and best regards, >> Claudia >> >> >> A example stacktrace: >> 15.5.2018 13:13:07Exception in thread >> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" >> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending >> since an error caught with a previous record (key >> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp >> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog >> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This >> server is not the leader for that topic-partition.. >> 15.5.2018 13:13:07 at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) >> 15.5.2018 13:13:07 at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) >> 15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748) >> 15.5.2018 13:13:07Caused by: >> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server >> is not the leader for that topic-partition. >> >> >
signature.asc
Description: OpenPGP digital signature