[
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hongshun Wang updated KAFKA-17025:
----------------------------------
Description:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do
nothing:
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError:
Direct buffer memory ..... at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at
org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run
{code}
I try to find what happens:
1. It seems that OutOfMemoryError as a Error is not captured when
org.apache.kafka.clients.producer.internals.Sender#run try to catch a
Exception:
{code:java}
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending
remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// Abort the transaction if any commit or abort didn't go through the
transaction manager's queue
while (!forceClose && transactionManager != null &&
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete transactional requests and
batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced
shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
{code}
2. Then KafkaThread catch uncaught exception and just log it:
{code:java}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in
thread '{}':", name, e));
}{code}
To be honest, I don't understand why KafkaThread doing nothing but log it when
an uncaught exception occurs? Why not exposing method to set
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can
determine what to do with uncaught exception, no matter thrown it or just
ignore it?
was:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do
nothing:
```
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError:
Direct buffer memory
.....
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
at org.apache.kafka.clients.producer.internals.Sender.run
at java. Lang.Thread.run
```
I try to find what happens:
1. It seems that OutOfMemoryError as a Error is not captured when
org.apache.kafka.clients.producer.internals.Sender#run try to catch a
Exception:
```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try
{ runOnce(); } catch (Exception e) \{ log.error("Uncaught error in kafka
producer I/O thread: ", e); }
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining
records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try \{ runOnce(); }
catch (Exception e)
{ log.error("Uncaught error in kafka producer I/O thread: ", e); }
}
// Abort the transaction if any commit or abort didn't go through the
transaction manager's queue
while (!forceClose && transactionManager != null &&
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting())
{ log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort(); }
try
{ runOnce(); }
catch (Exception e)
{ log.error("Uncaught error in kafka producer I/O thread: ", e); }
}
if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and
wake up the threads waiting on
// the futures.
if (transactionManager != null)
{ log.debug("Aborting incomplete transactional requests due to forced
shutdown"); transactionManager.close(); }
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try
{ this.client.close(); }
catch (Exception e)
{ log.error("Failed to close network client", e); }
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
```
2. Then KafkaThread catch uncaught exception and just log it:
```java
public KafkaThread(final String name, Runnable runnable, boolean daemon)
{ super(runnable, name); configureThread(name, daemon); }
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread
'{}':", name, e));
}
```
To be honest, I don't understand why KafkaThread doing nothing but log it when
an uncaught exception occurs? Why not exposing method to set
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can
determine what to do with uncaught exception, no matter thrown it or just
ignore it?
> KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
> ------------------------------------------------------------------------------
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 3.6.2
> Reporter: Hongshun Wang
> Priority: Major
> Fix For: 3.6.3
>
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do
> nothing:
>
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError:
> Direct buffer memory ..... at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
> at org.apache.kafka.clients.producer.internals.Sender.run at java.
> Lang.Thread.run
> {code}
>
>
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a
> Exception:
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() ||
> this.client.inFlightRequestCount() > 0) ||
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the
> transaction manager's queue
> while (!forceClose && transactionManager != null &&
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in
> thread '{}':", name, e));
> }{code}
>
> To be honest, I don't understand why KafkaThread doing nothing but log it
> when an uncaught exception occurs? Why not exposing method to set
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can
> determine what to do with uncaught exception, no matter thrown it or just
> ignore it?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)