Madhav Kelkar created KAFKA-9641:
------------------------------------

             Summary: Kafka Client doesn't handle timeout exceptions for 
partition fetch requests
                 Key: KAFKA-9641
                 URL: https://issues.apache.org/jira/browse/KAFKA-9641
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 1.1.1, 0.11.0.2, 2.5.0
            Reporter: Madhav Kelkar


Occasionally, we see Kafka client throwing IllegalStateException exception, 
resulting in our process dying. Here is the exception -

 

 
{code:java}
java.lang.IllegalStateException: Unexpected error code 7 while fetching data at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
{code}
 

Error code 7 is Errors.REQUEST_TIMED_OUT .

I looked at client code, and it looks like it doesn't have 
Errors.REQUEST_TIMED_OUT handled, so it falls through and throws IllegalState 
Exception instead.  This is the code for KafkaClient 0.11.0.2 - 

 
{code:java}
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
    TopicPartition tp = completedFetch.partition;
    FetchResponse.PartitionData partition = completedFetch.partitionData;
    long fetchOffset = completedFetch.fetchedOffset;
    PartitionRecords partitionRecords = null;
    Errors error = partition.error;

    try {
        if (!subscriptions.isFetchable(tp)) {
            // this can happen when a rebalance happened or a partition 
consumption paused
            // while fetch is still in-flight
            log.debug("Ignoring fetched records for partition {} since it is no 
longer fetchable", tp);
        } else if (error == Errors.NONE) {
            // we are interested in this fetch only if the beginning offset 
matches the
            // current consumed position
            Long position = subscriptions.position(tp);
            if (position == null || position != fetchOffset) {
                log.debug("Discarding stale fetch response for partition {} 
since its offset {} does not match " +
                        "the expected offset {}", tp, fetchOffset, position);
                return null;
            }

            log.trace("Preparing to read {} bytes of data for partition {} with 
offset {}",
                    partition.records.sizeInBytes(), tp, position);
            Iterator<? extends RecordBatch> batches = 
partition.records.batches().iterator();
            partitionRecords = new PartitionRecords(tp, completedFetch, 
batches);

            if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
                if (completedFetch.responseVersion < 3) {
                    // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
                    Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
                    throw new RecordTooLargeException("There are some messages 
at [Partition=Offset]: " +
                            recordTooLargePartitions + " whose size is larger 
than the fetch size " + this.fetchSize +
                            " and hence cannot be returned. Please considering 
upgrading your broker to 0.10.1.0 or " +
                            "newer to avoid this issue. Alternately, increase 
the fetch size on the client (using " +
                            ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + 
")",
                            recordTooLargePartitions);
                } else {
                    // This should not happen with brokers that support 
FetchRequest/Response V3 or higher (i.e. KIP-74)
                    throw new KafkaException("Failed to make progress reading 
messages at " + tp + "=" +
                        fetchOffset + ". Received a non-empty fetch response 
from the server, but no " +
                        "complete records were found.");
                }
            }

            if (partition.highWatermark >= 0) {
                log.trace("Updating high watermark for partition {} to {}", tp, 
partition.highWatermark);
                subscriptions.updateHighWatermark(tp, partition.highWatermark);
            }

            if (partition.lastStableOffset >= 0) {
                log.trace("Updating last stable offset for partition {} to {}", 
tp, partition.lastStableOffset);
                subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
            }
        } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
            log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
            this.metadata.requestUpdate();
        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            log.warn("Received unknown topic or partition error in fetch for 
partition {}. The topic/partition " +
                    "may not exist or the user may not have Describe access to 
it", tp);
            this.metadata.requestUpdate();
        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
            if (fetchOffset != subscriptions.position(tp)) {
                log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {}" +
                        "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
            } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                log.info("Fetch offset {} is out of range for partition {}, 
resetting offset", fetchOffset, tp);
                subscriptions.needOffsetReset(tp);
            } else {
                throw new 
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
            }
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            log.warn("Not authorized to read from topic {}.", tp.topic());
            throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
        } else if (error == Errors.UNKNOWN) {
            log.warn("Unknown error fetching data for topic-partition {}", tp);
        } else {
            throw new IllegalStateException("Unexpected error code " + 
error.code() + " while fetching data");
        }
    } finally {
        if (partitionRecords == null)
            completedFetch.metricAggregator.record(tp, 0, 0);

        if (error != Errors.NONE)
            // we move the partition to the end if there was an error. This 
way, it's more likely that partitions for
            // the same topic can remain together (allowing for more efficient 
serialization).
            subscriptions.movePartitionToEnd(tp);
    }

    return partitionRecords;
}
{code}
I looked at other versions and looks like REQUEST_TIMED_OUT hasn't been handled 
there as well.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to