Hi And there is also a ticket here which seems has more votes and is the parent ticket https://issues.apache.org/jira/browse/KAFKA-1894
On Mon, Jan 15, 2018 at 2:29 PM, Claus Ibsen <[email protected]> wrote: > Hi > > If you keep getting this WARN from Kafka > > WARN NetworkClient - [Consumer > clientId=consumer-1, groupId=kafkaGroup] Connection to node -1 could > not be established. Broker may not be available. > > > Then its because of this ticket where Kafka is looping forever: > https://issues.apache.org/jira/browse/KAFKA-3834 > > And as such there is not so much we can do in camel-kafka, as its > Apache Kafka itself that are hung/looping forever. > You can maybe vote for that KAFKA ticket to make the Kafka team aware > that its a important ticket. > > > > > > > On Sun, Jan 7, 2018 at 12:09 PM, Yacov Schondorf > <[email protected]> wrote: >> Just for the record, I have tried adding "bridgeErrorHandler=true" to the >> route . This does not help and Camel still tries to connect to the >> non-existing Kafka. Final route looks like this: >> >> >> from("kafka:{{consumer.topic}}?bridgeErrorHandler=true&brokers={{kafka.host}}:{{kafka.port}}" >> + "&maxPollRecords={{consumer.maxPollRecords}}" >> + "&consumersCount={{consumer.consumersCount}}" >> + "&seekTo={{consumer.seekTo}}" >> + "&groupId={{consumer.group}}") >> >> I was expecting an exception to be thrown but this does not happen. This is >> based on https://github.com/apache/camel/blob/master/examples/camel-e >> xample-kafka/src/main/java/org/apache/camel/example/kafka/ >> MessageConsumerClient.java with just an additional >> "bridgeErrorHandler=true" added to the route. >> >> >> 2017-12-03 14:54 GMT+02:00 Yacov Schondorf <[email protected]>: >> >>> No solution by Camel for detecting connection errors? I gave a very clear >>> reproducible scenario... >>> >>> 2017-11-28 11:44 GMT+02:00 Yacov Schondorf <[email protected]>: >>> >>>> But this is exactly my point - there is no stack trace! I want there to >>>> be a stacktrace so that I could catch it using the regular error handler. >>>> This is the purpose of the call to endpoint1.setBridgeErrorHandler(true); >>>> However, the call does not work, no trace is printed and the polling >>>> continues. Here is the complete code based on https://github.com/apache/c >>>> amel/blob/master/examples/camel-example-kafka/src/main/java/ >>>> org/apache/camel/example/kafka/MessageConsumerClient.java with my >>>> addition between the *// change start *and *// change end *blocks: >>>> >>>> package org.apache.camel.example.kafka; >>>> >>>> import org.apache.camel.CamelContext; >>>> import org.apache.camel.Endpoint; >>>> import org.apache.camel.Exchange; >>>> import org.apache.camel.Processor; >>>> import org.apache.camel.builder.RouteBuilder; >>>> import org.apache.camel.component.kafka.KafkaEndpoint; >>>> import org.apache.camel.component.properties.PropertiesComponent; >>>> import org.apache.camel.impl.DefaultCamelContext; >>>> import org.apache.camel.impl.DefaultEndpoint; >>>> import org.slf4j.Logger; >>>> import org.slf4j.LoggerFactory; >>>> >>>> import java.util.Collection; >>>> import java.util.HashMap; >>>> import java.util.Iterator; >>>> >>>> public final class MessageConsumerClient { >>>> >>>> private static final Logger LOG = LoggerFactory.getLogger(Messag >>>> eConsumerClient.class); >>>> >>>> private MessageConsumerClient() { >>>> } >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> LOG.info("About to run Kafka-camel integration..."); >>>> >>>> CamelContext camelContext = new DefaultCamelContext(); >>>> >>>> // Add route to send messages to Kafka >>>> >>>> camelContext.addRoutes(new RouteBuilder() { >>>> public void configure() { >>>> PropertiesComponent pc = >>>> getContext().getComponent("properties", >>>> PropertiesComponent.class); >>>> pc.setLocation("classpath:application.properties"); >>>> >>>> log.info("About to start route: Kafka Server -> Log "); >>>> onException(Exception.class).process(new Processor() { >>>> @Override >>>> public void process(Exchange exchange) throws >>>> Exception { >>>> System.out.println("Exception occurred!!"); >>>> } >>>> }); >>>> from("kafka:{{consumer.topic}} >>>> ?brokers={{kafka.host}}:{{kafka.port}}" >>>> + "&maxPollRecords={{consumer.maxPollRecords}}" >>>> + "&consumersCount={{consumer.consumersCount}}" >>>> + "&seekTo={{consumer.seekTo}}" >>>> + "&groupId={{consumer.group}}") >>>> .routeId("FromKafka") >>>> .log("${body}"); >>>> } >>>> }); >>>> >>>> camelContext.start(); >>>> >>>> // >>>> // change start >>>> // >>>> final Collection<Endpoint> endpoints = >>>> camelContext.getEndpoints(); >>>> for (Endpoint endpoint : endpoints) { >>>> if (endpoint instanceof DefaultEndpoint) { >>>> final DefaultEndpoint endpoint1 = (DefaultEndpoint) >>>> endpoint; >>>> endpoint1.setBridgeErrorHandler(true); >>>> final HashMap<String, Object> consumerProperties = new >>>> HashMap<>(); >>>> consumerProperties.put("backoffMultiplier", 10); >>>> consumerProperties.put("backoffErrorThreshold", 5); >>>> endpoint1.setConsumerProperties(consumerProperties); >>>> } >>>> } >>>> // >>>> // change end here >>>> // >>>> >>>> // let it run for 5 minutes before shutting down >>>> Thread.sleep(5 * 60 * 1000); >>>> >>>> camelContext.stop(); >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> >>>> >>>> 2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>: >>>> >>>>> Post the stactrace so we can see from where the error is thrown. >>>>> >>>>> >>>>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf >>>>> <[email protected]> wrote: >>>>> > I am trying to detect when Kafka is not available. I have modified the >>>>> > example - >>>>> > >>>>> > https://github.com/apache/camel/blob/master/examples/camel-e >>>>> xample-kafka/src/main/java/org/apache/camel/example/kafka/ >>>>> MessageConsumerClient.java >>>>> > and >>>>> > added following code right after camelContext.start() >>>>> > >>>>> > >>>>> > >>>>> > final Collection<Endpoint> endpoints = >>>>> camelContext.getEndpoints(); >>>>> > >>>>> > for (Endpoint endpoint : endpoints) { >>>>> > >>>>> > if (endpoint instanceof DefaultEndpoint) { >>>>> > >>>>> > final DefaultEndpoint endpoint1 = (DefaultEndpoint) >>>>> > endpoint; >>>>> > >>>>> > endpoint1.setBridgeErrorHandler(true); >>>>> > >>>>> > final HashMap<String, Object> consumerProperties = new >>>>> > HashMap<>(); >>>>> > >>>>> > consumerProperties.put("backoffMultiplier", 10); >>>>> > >>>>> > consumerProperties.put("backoffErrorThreshold", 5); >>>>> > >>>>> > endpoint1.setConsumerProperties(consumerProperties); >>>>> > >>>>> > } >>>>> > >>>>> > } >>>>> > >>>>> > >>>>> > >>>>> > I ran the main() and hoped to see the consumer stopping the attempts to >>>>> > connect to Kafka after 5 tries, but this did not work. I keep getting >>>>> > output messages of “Connection to node -1 could not be established. >>>>> Broker >>>>> > may not be available.” >>>>> > >>>>> > Is this the right way to go? What am I doing wrong? >>>>> > >>>>> > >>>>> > Thanks. >>>>> >>>>> >>>>> >>>>> -- >>>>> Claus Ibsen >>>>> ----------------- >>>>> http://davsclaus.com @davsclaus >>>>> Camel in Action 2: https://www.manning.com/ibsen2 >>>>> >>>> >>>> >>>> > > > > -- > Claus Ibsen > ----------------- > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2 -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2
