Just started testing consumers and can reproduce this, did you check JIRA
or the forum. May be as this is consumer side, there isn't much you can do
but wait right especially for commit Sync?

On Thu, Jul 7, 2016 at 1:21 PM, Fumo, Vincent <vincent_f...@comcast.com>
wrote:

> KafkaConsumer v0.9::
>
> I have a consumer set up with session.timeout.ms set to 30s. I make a
> call like
>
> consumer.poll(10000)
>
> but if the kafka broker is down, that call will hang indefinitely.
>
> Digging into the code it seems that the timeout isn't respected:
>
> KafkaConsumer calls out to pollOnce() as seen below::
>
>     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
> timeout) {
>         // TODO: Sub-requests should take into account the poll timeout
> (KAFKA-1894)
>         coordinator.ensureCoordinatorKnown();
>
>         // ensure we have partitions assigned if we expect to
>         if (subscriptions.partitionsAutoAssigned())
>             coordinator.ensurePartitionAssignment();
>
>         // fetch positions if we have partitions we're subscribed to that
> we
>         // don't know the offset for
>         if (!subscriptions.hasAllFetchPositions())
>
> updateFetchPositions(this.subscriptions.missingFetchPositions());
>
>         // init any new fetches (won't resend pending fetches)
>         Cluster cluster = this.metadata.fetch();
>         Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
> fetcher.fetchedRecords();
>
>         // if data is available already, e.g. from a previous network
> client poll() call to commit,
>         // then just return it immediately
>         if (!records.isEmpty()) {
>             return records;
>         }
>
>         fetcher.initFetches(cluster);
>         client.poll(timeout);
>         return fetcher.fetchedRecords();
>     }
>
> and we see that we stick on the call to
> coordinator.ensureCoordinatorKnown();
>
> AbstractCoordinator ::
>
>     public void ensureCoordinatorKnown() {
>         while (coordinatorUnknown()) {
>             RequestFuture<Void> future = sendGroupMetadataRequest();
>             client.poll(future);
>
>             if (future.failed()) {
>                 if (future.isRetriable())
>                     client.awaitMetadataUpdate();
>                 else
>                     throw future.exception();
>             }
>         }
>     }
>
> in this case the Future fails (since the broker is down) and then a call
> to client.awaitMetadataUpdate() is made which in the case of the
> ConsumerNetworkClient will block forever :
>
>     public void awaitMetadataUpdate() {
>         int version = this.metadata.requestUpdate();
>         do {
>             poll(Long.MAX_VALUE);
>         } while (this.metadata.version() == version);
>     }
>
>
> I feel that this is a bug. When you set a timeout on a call to a blocking
> method, that timeout should be respected and an exception should be thrown.




-- 
Radha Krishna, Proddaturi
253-234-5657

Reply via email to