KafkaConsumer v0.9::

I have a consumer set up with session.timeout.ms set to 30s. I make a call like

consumer.poll(10000)

but if the kafka broker is down, that call will hang indefinitely.

Digging into the code it seems that the timeout isn't respected:

KafkaConsumer calls out to pollOnce() as seen below::

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long 
timeout) {
        // TODO: Sub-requests should take into account the poll timeout 
(KAFKA-1894)
        coordinator.ensureCoordinatorKnown();

        // ensure we have partitions assigned if we expect to
        if (subscriptions.partitionsAutoAssigned())
            coordinator.ensurePartitionAssignment();

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // init any new fetches (won't resend pending fetches)
        Cluster cluster = this.metadata.fetch();
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();

        // if data is available already, e.g. from a previous network client 
poll() call to commit,
        // then just return it immediately
        if (!records.isEmpty()) {
            return records;
        }

        fetcher.initFetches(cluster);
        client.poll(timeout);
        return fetcher.fetchedRecords();
    }

and we see that we stick on the call to coordinator.ensureCoordinatorKnown();

AbstractCoordinator ::

    public void ensureCoordinatorKnown() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = sendGroupMetadataRequest();
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            }
        }
    }

in this case the Future fails (since the broker is down) and then a call to 
client.awaitMetadataUpdate() is made which in the case of the 
ConsumerNetworkClient will block forever :

    public void awaitMetadataUpdate() {
        int version = this.metadata.requestUpdate();
        do {
            poll(Long.MAX_VALUE);
        } while (this.metadata.version() == version);
    }


I feel that this is a bug. When you set a timeout on a call to a blocking 
method, that timeout should be respected and an exception should be thrown. 

Reply via email to