[ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532997#comment-16532997
 ] 

Guozhang Wang commented on KAFKA-6127:
--------------------------------------

That is a good question. I think we should consider whether it worth to add 
more configs in Streams post KIP-266. Our current situation is:

1. we do have {{RETRIES_CONFIG}} and {{RETRY_BACKOFF_MS_CONFIG}} config in 
StreamsConfig, but today it is only used in global consumer's 
{{globalConsumer.endOffsets(topicPartitions);}} and {{partitionInfos = 
globalConsumer.partitionsFor(sourceTopic);}} because we always try to complete 
the restoration of global stores before starting any stream threads today.

2. we do not have anything like {{MAX_BLOCK_MS}} config, and we hard code 
different values today for some of the callers, and for some other calls we do 
not provide the timeout and hence relying on the consumer's request timeout 
value as default, and that value is {{40 * 1000}} by default.

The question for 2) is, whether it's better to define a global config and use 
that across all blocking calls to consumer; on the other hand, if for other 
callers if we should pass in specific timeout than just relying on request 
timeout.

The question for 1) is, whether we can just use a very large timeout value, and 
get rid of retries then?

> Streams should never block infinitely
> -------------------------------------
>
>                 Key: KAFKA-6127
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6127
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>              Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> Thanks to 
> [KIP-266|[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886],]
>  the Consumer now has non-blocking variants that we can use, but the same is 
> not true of Producer. We can add non-blocking variants to Producer as well, 
> or set the appropriate config options to set the max timeout.
> Of course, we'd also need to be sure the catch the appropriate timeout 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to