[
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Harris updated KAFKA-17025:
--------------------------------
Fix Version/s: (was: 3.6.3)
> KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
> ------------------------------------------------------------------------------
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 3.6.2
> Reporter: Hongshun Wang
> Priority: Major
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do
> nothing:
>
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError:
> Direct buffer memory .....
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
> at org.apache.kafka.clients.producer.internals.Sender.run
> at java.Lang.Thread.run
> {code}
>
>
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a
> Exception:
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() ||
> this.client.inFlightRequestCount() > 0) ||
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the
> transaction manager's queue
> while (!forceClose && transactionManager != null &&
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in
> thread '{}':", name, e));
> }{code}
>
> To be honest, I don't understand why KafkaThread doing nothing but log it
> when an uncaught exception occurs? Why not exposing method to set
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can
> determine what to do with uncaught exception, no matter thrown it or just
> ignore it?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)