The `Producer#send()` call is actually not covered by the KIP because it may result in data loss if we try to handle the timeout directly. -- Kafka Streams does not have a copy of the data in the producer's send buffer and thus we cannot retry the `send()`. -- Instead, it's necessary to re-process the input data which is not done automatically.

-Matthias

On 11/1/21 4:34 PM, Guozhang Wang wrote:
Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole <pdeole2...@gmail.com> wrote:

Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?


{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -

analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this

timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat

datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat

org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat

org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat

org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat

org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat

org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch creation\n"}



Reply via email to