Madhav Kelkar created KAFKA-9641:
------------------------------------
Summary: Kafka Client doesn't handle timeout exceptions for
partition fetch requests
Key: KAFKA-9641
URL: https://issues.apache.org/jira/browse/KAFKA-9641
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 1.1.1, 0.11.0.2, 2.5.0
Reporter: Madhav Kelkar
Occasionally, we see Kafka client throwing IllegalStateException exception,
resulting in our process dying. Here is the exception -
{code:java}
java.lang.IllegalStateException: Unexpected error code 7 while fetching data at
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
{code}
Error code 7 is Errors.REQUEST_TIMED_OUT .
I looked at client code, and it looks like it doesn't have
Errors.REQUEST_TIMED_OUT handled, so it falls through and throws IllegalState
Exception instead. This is the code for KafkaClient 0.11.0.2 -
{code:java}
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
PartitionRecords partitionRecords = null;
Errors error = partition.error;
try {
if (!subscriptions.isFetchable(tp)) {
// this can happen when a rebalance happened or a partition
consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no
longer fetchable", tp);
} else if (error == Errors.NONE) {
// we are interested in this fetch only if the beginning offset
matches the
// current consumed position
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {}
since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
log.trace("Preparing to read {} bytes of data for partition {} with
offset {}",
partition.records.sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches =
partition.records.batches().iterator();
partitionRecords = new PartitionRecords(tp, completedFetch,
batches);
if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a
RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions =
Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages
at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger
than the fetch size " + this.fetchSize +
" and hence cannot be returned. Please considering
upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase
the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG +
")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support
FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading
messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response
from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark >= 0) {
log.trace("Updating high watermark for partition {} to {}", tp,
partition.highWatermark);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
if (partition.lastStableOffset >= 0) {
log.trace("Updating last stable offset for partition {} to {}",
tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp,
partition.lastStableOffset);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
log.debug("Error in fetch for partition {}: {}", tp,
error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for
partition {}. The topic/partition " +
"may not exist or the user may not have Describe access to
it", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {}
since the fetched offset {}" +
"does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {},
resetting offset", fetchOffset, tp);
subscriptions.needOffsetReset(tp);
} else {
throw new
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new
TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " +
error.code() + " while fetching data");
}
} finally {
if (partitionRecords == null)
completedFetch.metricAggregator.record(tp, 0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This
way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient
serialization).
subscriptions.movePartitionToEnd(tp);
}
return partitionRecords;
}
{code}
I looked at other versions and looks like REQUEST_TIMED_OUT hasn't been handled
there as well.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)