[ 
https://issues.apache.org/jira/browse/KAFKA-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16858771#comment-16858771
 ] 

Gaurav commented on KAFKA-8506:
-------------------------------

Properties props = {color:#000080}new {color}Properties();
 props.put(ConsumerConfig.{color:#660e7a}BOOTSTRAP_SERVERS_CONFIG{color}, 
{color:#008000}"localhost:9092"{color});
 
props.put(ConsumerConfig.{color:#660e7a}VALUE_DESERIALIZER_CLASS_CONFIG{color}, 
{color:#008000}"org.apache.kafka.common.serialization.StringDeserializer"{color});
 props.put(ConsumerConfig.{color:#660e7a}KEY_DESERIALIZER_CLASS_CONFIG{color}, 
{color:#008000}"org.apache.kafka.common.serialization.StringDeserializer"{color});
 props.put(ConsumerConfig.{color:#660e7a}GROUP_ID_CONFIG{color}, 
{color:#008000}"test-consumer-group"{color});
 props.put(ConsumerConfig.{color:#660e7a}REQUEST_TIMEOUT_MS_CONFIG{color}, 
{color:#0000ff}5000{color});
 props.put(ConsumerConfig.{color:#660e7a}RETRY_BACKOFF_MS_CONFIG{color}, 
{color:#0000ff}0{color});
 
 {color:#660e7a}consumer {color}= {color:#000080}new 
{color}KafkaConsumer(props);
 
{color:#660e7a}consumer{color}.subscribe(Arrays.asList({color:#008000}"odd"{color},
 {color:#008000}"even"{color}));
 {color:#000080}long {color}startTinme = System.currentTimeMillis();
 {color:#000080}try {color}{
 {color:#000080}while {color}({color:#000080}true{color}) {
 ConsumerRecords<String, String> recs = 
{color:#660e7a}consumer{color}.poll({color:#0000ff}1000{color});
 {color:#000080}if {color}(recs == {color:#000080}null {color}|| recs.isEmpty())
 {
 {color:#000080}return {color}{color:#008000}"Success"{color};
 }
 {color:#000080}for {color}(ConsumerRecord<String, String> rec : recs) {
 System.{color:#660e7a}out{color}.printf({color:#008000}"Recieved %s: 
%s"{color}, rec.key(), rec.value());
 }
 }
 } {color:#000080}catch {color}(Exception e) {
 e.printStackTrace();
 } {color:#000080}finally {color}{
 System.{color:#660e7a}out{color}.println(System.currentTimeMillis() - 
startTinme);
 }
 {color:#000080}return {color}{color:#008000}"Failure"{color};
}

> Kafka Consumer broker never stops on connection failure
> -------------------------------------------------------
>
>                 Key: KAFKA-8506
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8506
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Gaurav
>            Priority: Critical
>
> not able to stop the Kafka broker on connection failure it keeps retrying to 
> create connection on calling consumer.poll(1000). We poll ondemand via rest 
> if case of connection failure it should stop and throw exception. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to