[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-11-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654191#comment-15654191
 ] 

Stig Rohde Døssing commented on KAFKA-1894:
---

Suggestion for a fix for this issue here 
https://issues.apache.org/jira/browse/KAFKA-4387

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-11-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651799#comment-15651799
 ] 

Shixiong Zhu commented on KAFKA-1894:
-

Hit the same issue in Spark Structured Streaming Kafka Source. These loops 
should check the current thread's interrupted status.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-11-09 Thread Catalina-Alina Dobrica (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650814#comment-15650814
 ] 

Catalina-Alina Dobrica commented on KAFKA-1894:
---

This issue also prevents the consumer's thread from being interrupted. This is 
particularly relevant when the consumer is integrated in an external system - 
like a camel ecosystem. Trying to force the shutdown of the ExecutorService 
that manages the thread or to terminate the thread itself has no effect and the 
thread is in the infinite loop. This eventually leads to OOME if enough such 
threads are started.
I found this issue when providing an incorrect SSL protocol to the consumer in 
version 0.10.1.0, but it can occur in any circumstance where the channel is not 
established - such as not having kafka enabled. The thread loops infinitely to 
check if this connection was established, which, in some cases, will never 
happen.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2016-06-14 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330848#comment-15330848
 ] 

Edoardo Comar commented on KAFKA-1894:
--

Another case that leads to the loop 
```
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
```
to be stuck forever is the case where Authentication is turned on - with SASL 
PLAIN - and the user specifies wrong credentials.



> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.1.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-12-09 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049441#comment-15049441
 ] 

Jason Gustafson commented on KAFKA-1894:


There's been a ton of movement on the new consumer since this issue was first 
posted, so here's an update of the current blocking calls:

1. poll(timeout) blocks indefinitely for a) finding the coordinator, b) joining 
the group, and c) fetching/resetting offsets. The last of these may require an 
OffsetFetch to get the last committed position or a ListOffset to reset the 
position to the earliest or latest offset. Obviously we depend on the 
coordinator being available to join the group, but we also depend on partition 
leaders being available if we need to call ListOffset.
2. commitSync() blocks indefinitely until the commit succeeds. This may involve 
finding a new coordinator if the old one has failed.
3. position() blocks to set the position (if it needs to be set). This is 
similar to case c) in poll() above.
4. committed() blocks to fetch the last committed position if the consumer has 
no cached commit.
5. partitionsFor()/listTopics() blocks to send a TopicMetadataRequest to any of 
the brokers (if the request cannot be served from the cache).
6. close() blocks if auto-commit is enabled in a call to commitSync().

In all of these cases, we're fairly careful to propagate unrecoverable errors 
to the user. For example, commitSync() will not retry a commit if it receives 
an ILLEGAL_GENERATION since there is no way the commit can succeed after that 
error. However, there are still some situations where the blocking can be 
prolonged. In the most extreme case, if the consumer cannot connect to any of 
the brokers it knows about, it will retry indefinitely until it can. Other than 
that, the main cases that come to mind are blocking in ListOffsets when the 
partition leader is not available, and blocking in coordinator discovery when 
the coordinator cannot be found (e.g. if there is no leader for the 
corresponding partition of __consumer_offsets).

Going forward, it would be ideal to have poll() enforce the timeout parameter 
in any situation. This is complicated mainly by the fact that we may have to 
leave an active rebalance in progress, which will surely require additional 
state tracking. There are some subtle implications as well. For example, if we 
return to the user with a JoinGroup on the wire, it could actually return in a 
separate blocking call and have its handler callback invoked. We'd have to be 
careful that this doesn't cause any surprises for the user (e.g. partitions 
getting revoked while a call to position() is active). We also have limited 
options when it comes to handling the rebalance callback which could itself 
call another blocking method such as commitSync(). Since we have only one 
thread to work with, there doesn't seem like much we can do in this case.

The other blocking calls are more straightforward: we can just raise a 
TimeoutException after a configurable amount of time has passed. The producer 
has a setting "max.block.ms" which we could borrow for this purpose (guess we 
would need a KIP for this now). But similarly as in poll(), we'll have to be 
careful about any state we're leaving behind when the exceptions are thrown (in 
particular requests left on the wire).

An open question for the consumer is what its behavior should be if a partition 
leader cannot be found. Once the initial offset has been found, we generally 
handle leader failures gracefully by requesting metadata updates in the 
background and continuing to fetch from the other partitions. But if the leader 
failure occurs before we've fetched the initial offset, we will not send any 
fetches until we've found the new leader. This case is probably rare in 
practice, but it would seem more desirable (and more consistent) to let 
fetching continue on other partitions. This will require decoupling the offset 
state of individual partitions, which may be tricky.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning 

[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-12-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036256#comment-15036256
 ] 

Guozhang Wang commented on KAFKA-1894:
--

[~BigAndy] You can indeed interrupt the poll() call by calling wakeup() from 
another thread (search for the paragraph of "multi-threaded processing"):

http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-12-02 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035665#comment-15035665
 ] 

The Data Lorax commented on KAFKA-1894:
---

I'm running into this issue and struggling to find a way around it - if the 
Kafka cluster is unavailable the KafkaConsumer.poll() call can block 
indefinitely - and does not even enter an interruptible state, which means 
there is no way of recovering, short of thread.stop().

Would be good to move this into a more imminent release or at least have the 
thread enter an interruptible state within the loop.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-10-07 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947277#comment-14947277
 ] 

Onur Karaman commented on KAFKA-1894:
-

Just for reference, this is similar to the following ticket: 
https://issues.apache.org/jira/browse/KAFKA-2391

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-10-07 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947298#comment-14947298
 ] 

Jason Gustafson commented on KAFKA-1894:


[~onurkaraman] Maybe we clarify the scope of this issue. The main thing I was 
hoping to address was KafkaConsumer.poll() blocking longer than the passed 
timeout. Were you also looking at this problem in KAFKA-2391?

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-10-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944043#comment-14944043
 ] 

Ismael Juma commented on KAFKA-1894:


`NetworkClient.completeAll` has thankfully been removed, so that's no longer an 
issue. This is on Jason's radar already, but recording it here so that we don't 
forget:

{code}
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
}
{code}

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)