Hi

If you keep getting this WARN from Kafka

   WARN  NetworkClient                  - [Consumer
clientId=consumer-1, groupId=kafkaGroup] Connection to node -1 could
not be established. Broker may not be available.


Then its because of this ticket where Kafka is looping forever:
    https://issues.apache.org/jira/browse/KAFKA-3834

And as such there is not so much we can do in camel-kafka, as its
Apache Kafka itself that are hung/looping forever.
You can maybe vote for that KAFKA ticket to make the Kafka team aware
that its a important ticket.






On Sun, Jan 7, 2018 at 12:09 PM, Yacov Schondorf
<[email protected]> wrote:
> Just for the record, I have tried adding "bridgeErrorHandler=true" to the
> route . This does not help and Camel still tries to connect to the
> non-existing Kafka. Final route looks like this:
>
>
> from("kafka:{{consumer.topic}}?bridgeErrorHandler=true&brokers={{kafka.host}}:{{kafka.port}}"
>                         + "&maxPollRecords={{consumer.maxPollRecords}}"
>                         + "&consumersCount={{consumer.consumersCount}}"
>                         + "&seekTo={{consumer.seekTo}}"
>                         + "&groupId={{consumer.group}}")
>
> I was expecting an exception to be thrown but this does not happen. This is
> based on https://github.com/apache/camel/blob/master/examples/camel-e
> xample-kafka/src/main/java/org/apache/camel/example/kafka/
> MessageConsumerClient.java with just an additional
> "bridgeErrorHandler=true" added to the route.
>
>
> 2017-12-03 14:54 GMT+02:00 Yacov Schondorf <[email protected]>:
>
>> No solution by Camel for detecting connection errors? I gave a very clear
>> reproducible scenario...
>>
>> 2017-11-28 11:44 GMT+02:00 Yacov Schondorf <[email protected]>:
>>
>>> But this is exactly my point - there is no stack trace! I want there to
>>> be a stacktrace so that I could catch it using the regular error handler.
>>> This is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
>>> However, the call does not work, no trace is printed and the polling
>>> continues. Here is the complete code based on https://github.com/apache/c
>>> amel/blob/master/examples/camel-example-kafka/src/main/java/
>>> org/apache/camel/example/kafka/MessageConsumerClient.java with my
>>> addition between the *// change start *and *// change end *blocks:
>>>
>>> package org.apache.camel.example.kafka;
>>>
>>> import org.apache.camel.CamelContext;
>>> import org.apache.camel.Endpoint;
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.Processor;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.component.kafka.KafkaEndpoint;
>>> import org.apache.camel.component.properties.PropertiesComponent;
>>> import org.apache.camel.impl.DefaultCamelContext;
>>> import org.apache.camel.impl.DefaultEndpoint;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.util.Collection;
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>>
>>> public final class MessageConsumerClient {
>>>
>>>     private static final Logger LOG = LoggerFactory.getLogger(Messag
>>> eConsumerClient.class);
>>>
>>>     private MessageConsumerClient() {
>>>     }
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         LOG.info("About to run Kafka-camel integration...");
>>>
>>>         CamelContext camelContext = new DefaultCamelContext();
>>>
>>>         // Add route to send messages to Kafka
>>>
>>>         camelContext.addRoutes(new RouteBuilder() {
>>>             public void configure() {
>>>                 PropertiesComponent pc = 
>>> getContext().getComponent("properties",
>>> PropertiesComponent.class);
>>>                 pc.setLocation("classpath:application.properties");
>>>
>>>                 log.info("About to start route: Kafka Server -> Log ");
>>>                 onException(Exception.class).process(new Processor() {
>>>                     @Override
>>>                     public void process(Exchange exchange) throws
>>> Exception {
>>>                         System.out.println("Exception occurred!!");
>>>                     }
>>>                 });
>>>                 from("kafka:{{consumer.topic}}
>>> ?brokers={{kafka.host}}:{{kafka.port}}"
>>>                         + "&maxPollRecords={{consumer.maxPollRecords}}"
>>>                         + "&consumersCount={{consumer.consumersCount}}"
>>>                         + "&seekTo={{consumer.seekTo}}"
>>>                         + "&groupId={{consumer.group}}")
>>>                         .routeId("FromKafka")
>>>                     .log("${body}");
>>>             }
>>>         });
>>>
>>>         camelContext.start();
>>>
>>>         //
>>>         // change start
>>>         //
>>>         final Collection<Endpoint> endpoints =
>>> camelContext.getEndpoints();
>>>         for (Endpoint endpoint : endpoints) {
>>>             if (endpoint instanceof DefaultEndpoint) {
>>>                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>>> endpoint;
>>>                 endpoint1.setBridgeErrorHandler(true);
>>>                 final HashMap<String, Object> consumerProperties = new
>>> HashMap<>();
>>>                 consumerProperties.put("backoffMultiplier", 10);
>>>                 consumerProperties.put("backoffErrorThreshold", 5);
>>>                 endpoint1.setConsumerProperties(consumerProperties);
>>>             }
>>>         }
>>>         //
>>>         // change end here
>>>         //
>>>
>>>         // let it run for 5 minutes before shutting down
>>>         Thread.sleep(5 * 60 * 1000);
>>>
>>>         camelContext.stop();
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>> 2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>:
>>>
>>>> Post the stactrace so we can see from where the error is thrown.
>>>>
>>>>
>>>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>>>> <[email protected]> wrote:
>>>> > I am trying to detect when Kafka is not available. I have modified the
>>>> > example -
>>>> >
>>>> > https://github.com/apache/camel/blob/master/examples/camel-e
>>>> xample-kafka/src/main/java/org/apache/camel/example/kafka/
>>>> MessageConsumerClient.java
>>>> > and
>>>> > added following code right after camelContext.start()
>>>> >
>>>> >
>>>> >
>>>> >         final Collection<Endpoint> endpoints =
>>>> camelContext.getEndpoints();
>>>> >
>>>> >         for (Endpoint endpoint : endpoints) {
>>>> >
>>>> >             if (endpoint instanceof DefaultEndpoint) {
>>>> >
>>>> >                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>>>> > endpoint;
>>>> >
>>>> >                 endpoint1.setBridgeErrorHandler(true);
>>>> >
>>>> >                 final HashMap<String, Object> consumerProperties = new
>>>> > HashMap<>();
>>>> >
>>>> >                 consumerProperties.put("backoffMultiplier", 10);
>>>> >
>>>> >                 consumerProperties.put("backoffErrorThreshold", 5);
>>>> >
>>>> >                 endpoint1.setConsumerProperties(consumerProperties);
>>>> >
>>>> >             }
>>>> >
>>>> >         }
>>>> >
>>>> >
>>>> >
>>>> > I ran the main() and hoped to see the consumer stopping the attempts to
>>>> > connect to Kafka after 5 tries, but this did not work. I keep getting
>>>> > output messages of “Connection to node -1 could not be established.
>>>> Broker
>>>> > may not be available.”
>>>> >
>>>> > Is this the right way to go? What am I doing wrong?
>>>> >
>>>> >
>>>> > Thanks.
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> -----------------
>>>> http://davsclaus.com @davsclaus
>>>> Camel in Action 2: https://www.manning.com/ibsen2
>>>>
>>>
>>>
>>>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to