Hi
If you keep getting this WARN from Kafka
WARN NetworkClient - [Consumer
clientId=consumer-1, groupId=kafkaGroup] Connection to node -1 could
not be established. Broker may not be available.
Then its because of this ticket where Kafka is looping forever:
https://issues.apache.org/jira/browse/KAFKA-3834
And as such there is not so much we can do in camel-kafka, as its
Apache Kafka itself that are hung/looping forever.
You can maybe vote for that KAFKA ticket to make the Kafka team aware
that its a important ticket.
On Sun, Jan 7, 2018 at 12:09 PM, Yacov Schondorf
<[email protected]> wrote:
> Just for the record, I have tried adding "bridgeErrorHandler=true" to the
> route . This does not help and Camel still tries to connect to the
> non-existing Kafka. Final route looks like this:
>
>
> from("kafka:{{consumer.topic}}?bridgeErrorHandler=true&brokers={{kafka.host}}:{{kafka.port}}"
> + "&maxPollRecords={{consumer.maxPollRecords}}"
> + "&consumersCount={{consumer.consumersCount}}"
> + "&seekTo={{consumer.seekTo}}"
> + "&groupId={{consumer.group}}")
>
> I was expecting an exception to be thrown but this does not happen. This is
> based on https://github.com/apache/camel/blob/master/examples/camel-e
> xample-kafka/src/main/java/org/apache/camel/example/kafka/
> MessageConsumerClient.java with just an additional
> "bridgeErrorHandler=true" added to the route.
>
>
> 2017-12-03 14:54 GMT+02:00 Yacov Schondorf <[email protected]>:
>
>> No solution by Camel for detecting connection errors? I gave a very clear
>> reproducible scenario...
>>
>> 2017-11-28 11:44 GMT+02:00 Yacov Schondorf <[email protected]>:
>>
>>> But this is exactly my point - there is no stack trace! I want there to
>>> be a stacktrace so that I could catch it using the regular error handler.
>>> This is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
>>> However, the call does not work, no trace is printed and the polling
>>> continues. Here is the complete code based on https://github.com/apache/c
>>> amel/blob/master/examples/camel-example-kafka/src/main/java/
>>> org/apache/camel/example/kafka/MessageConsumerClient.java with my
>>> addition between the *// change start *and *// change end *blocks:
>>>
>>> package org.apache.camel.example.kafka;
>>>
>>> import org.apache.camel.CamelContext;
>>> import org.apache.camel.Endpoint;
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.Processor;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.component.kafka.KafkaEndpoint;
>>> import org.apache.camel.component.properties.PropertiesComponent;
>>> import org.apache.camel.impl.DefaultCamelContext;
>>> import org.apache.camel.impl.DefaultEndpoint;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.util.Collection;
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>>
>>> public final class MessageConsumerClient {
>>>
>>> private static final Logger LOG = LoggerFactory.getLogger(Messag
>>> eConsumerClient.class);
>>>
>>> private MessageConsumerClient() {
>>> }
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> LOG.info("About to run Kafka-camel integration...");
>>>
>>> CamelContext camelContext = new DefaultCamelContext();
>>>
>>> // Add route to send messages to Kafka
>>>
>>> camelContext.addRoutes(new RouteBuilder() {
>>> public void configure() {
>>> PropertiesComponent pc =
>>> getContext().getComponent("properties",
>>> PropertiesComponent.class);
>>> pc.setLocation("classpath:application.properties");
>>>
>>> log.info("About to start route: Kafka Server -> Log ");
>>> onException(Exception.class).process(new Processor() {
>>> @Override
>>> public void process(Exchange exchange) throws
>>> Exception {
>>> System.out.println("Exception occurred!!");
>>> }
>>> });
>>> from("kafka:{{consumer.topic}}
>>> ?brokers={{kafka.host}}:{{kafka.port}}"
>>> + "&maxPollRecords={{consumer.maxPollRecords}}"
>>> + "&consumersCount={{consumer.consumersCount}}"
>>> + "&seekTo={{consumer.seekTo}}"
>>> + "&groupId={{consumer.group}}")
>>> .routeId("FromKafka")
>>> .log("${body}");
>>> }
>>> });
>>>
>>> camelContext.start();
>>>
>>> //
>>> // change start
>>> //
>>> final Collection<Endpoint> endpoints =
>>> camelContext.getEndpoints();
>>> for (Endpoint endpoint : endpoints) {
>>> if (endpoint instanceof DefaultEndpoint) {
>>> final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>>> endpoint;
>>> endpoint1.setBridgeErrorHandler(true);
>>> final HashMap<String, Object> consumerProperties = new
>>> HashMap<>();
>>> consumerProperties.put("backoffMultiplier", 10);
>>> consumerProperties.put("backoffErrorThreshold", 5);
>>> endpoint1.setConsumerProperties(consumerProperties);
>>> }
>>> }
>>> //
>>> // change end here
>>> //
>>>
>>> // let it run for 5 minutes before shutting down
>>> Thread.sleep(5 * 60 * 1000);
>>>
>>> camelContext.stop();
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>> 2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>:
>>>
>>>> Post the stactrace so we can see from where the error is thrown.
>>>>
>>>>
>>>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>>>> <[email protected]> wrote:
>>>> > I am trying to detect when Kafka is not available. I have modified the
>>>> > example -
>>>> >
>>>> > https://github.com/apache/camel/blob/master/examples/camel-e
>>>> xample-kafka/src/main/java/org/apache/camel/example/kafka/
>>>> MessageConsumerClient.java
>>>> > and
>>>> > added following code right after camelContext.start()
>>>> >
>>>> >
>>>> >
>>>> > final Collection<Endpoint> endpoints =
>>>> camelContext.getEndpoints();
>>>> >
>>>> > for (Endpoint endpoint : endpoints) {
>>>> >
>>>> > if (endpoint instanceof DefaultEndpoint) {
>>>> >
>>>> > final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>>>> > endpoint;
>>>> >
>>>> > endpoint1.setBridgeErrorHandler(true);
>>>> >
>>>> > final HashMap<String, Object> consumerProperties = new
>>>> > HashMap<>();
>>>> >
>>>> > consumerProperties.put("backoffMultiplier", 10);
>>>> >
>>>> > consumerProperties.put("backoffErrorThreshold", 5);
>>>> >
>>>> > endpoint1.setConsumerProperties(consumerProperties);
>>>> >
>>>> > }
>>>> >
>>>> > }
>>>> >
>>>> >
>>>> >
>>>> > I ran the main() and hoped to see the consumer stopping the attempts to
>>>> > connect to Kafka after 5 tries, but this did not work. I keep getting
>>>> > output messages of “Connection to node -1 could not be established.
>>>> Broker
>>>> > may not be available.”
>>>> >
>>>> > Is this the right way to go? What am I doing wrong?
>>>> >
>>>> >
>>>> > Thanks.
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> -----------------
>>>> http://davsclaus.com @davsclaus
>>>> Camel in Action 2: https://www.manning.com/ibsen2
>>>>
>>>
>>>
>>>
--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2