[ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5301:
----------------------------------

[~enothereska] I think we should not resolve this JIRA in a hurry. Have you 
made a thorough pass over the consumer code path and confirmed that {{the rest 
are OK}}?

For example, one obvious pitfall I can observe is the {{rebalanceException}} 
used in the {{StreamThread}}: we throw the exception in the 
{{onPartitionsRevoked}} and {{onPartitionsAssigned}} and at the same time 
remember that exception in this variable, the thrown exception from the 
callback will be swallowed by the {{ConsumerCoordinator}} and logged as an 
error, while we will later on rethrow the exception again. I can see two issues 
here:

1) throw the exception twice, with the first thrown exception only causing a 
error log4j entry is redundant. If we will anyways rethrow the exception after 
the rebalance, we may consider not throwing it anymore inside the callbacks.

2) when we throw the exception in the {{..Revoked}} callback, we are 
effectively leaving the assignor in an unstable state such that the suspended 
tasks / prev tasks etc are not set correctly, however we will still call 
{{..Assigned}} later which may be problematic; should we consider skipping the 
later callback if an exception has already been thrown, or should we cleanup 
the cached maps while throwing the exception?

> Improve exception handling on consumer path
> -------------------------------------------
>
>                 Key: KAFKA-5301
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5301
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Eno Thereska
>            Assignee: Eno Thereska
>             Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to