No solution by Camel for detecting connection errors? I gave a very clear
reproducible scenario...

2017-11-28 11:44 GMT+02:00 Yacov Schondorf <[email protected]>:

> But this is exactly my point - there is no stack trace! I want there to be
> a stacktrace so that I could catch it using the regular error handler. This
> is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
> However, the call does not work, no trace is printed and the polling
> continues. Here is the complete code based on https://github.com/apache/c
> amel/blob/master/examples/camel-example-kafka/src/main/java/
> org/apache/camel/example/kafka/MessageConsumerClient.java with my
> addition between the *// change start *and *// change end *blocks:
>
> package org.apache.camel.example.kafka;
>
> import org.apache.camel.CamelContext;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.kafka.KafkaEndpoint;
> import org.apache.camel.component.properties.PropertiesComponent;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.Collection;
> import java.util.HashMap;
> import java.util.Iterator;
>
> public final class MessageConsumerClient {
>
>     private static final Logger LOG = LoggerFactory.getLogger(
> MessageConsumerClient.class);
>
>     private MessageConsumerClient() {
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         LOG.info("About to run Kafka-camel integration...");
>
>         CamelContext camelContext = new DefaultCamelContext();
>
>         // Add route to send messages to Kafka
>
>         camelContext.addRoutes(new RouteBuilder() {
>             public void configure() {
>                 PropertiesComponent pc = 
> getContext().getComponent("properties",
> PropertiesComponent.class);
>                 pc.setLocation("classpath:application.properties");
>
>                 log.info("About to start route: Kafka Server -> Log ");
>                 onException(Exception.class).process(new Processor() {
>                     @Override
>                     public void process(Exchange exchange) throws
> Exception {
>                         System.out.println("Exception occurred!!");
>                     }
>                 });
>                 from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{
> kafka.port}}"
>                         + "&maxPollRecords={{consumer.maxPollRecords}}"
>                         + "&consumersCount={{consumer.consumersCount}}"
>                         + "&seekTo={{consumer.seekTo}}"
>                         + "&groupId={{consumer.group}}")
>                         .routeId("FromKafka")
>                     .log("${body}");
>             }
>         });
>
>         camelContext.start();
>
>         //
>         // change start
>         //
>         final Collection<Endpoint> endpoints = camelContext.getEndpoints();
>         for (Endpoint endpoint : endpoints) {
>             if (endpoint instanceof DefaultEndpoint) {
>                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
> endpoint;
>                 endpoint1.setBridgeErrorHandler(true);
>                 final HashMap<String, Object> consumerProperties = new
> HashMap<>();
>                 consumerProperties.put("backoffMultiplier", 10);
>                 consumerProperties.put("backoffErrorThreshold", 5);
>                 endpoint1.setConsumerProperties(consumerProperties);
>             }
>         }
>         //
>         // change end here
>         //
>
>         // let it run for 5 minutes before shutting down
>         Thread.sleep(5 * 60 * 1000);
>
>         camelContext.stop();
>     }
>
> }
>
>
>
>
>
> 2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>:
>
>> Post the stactrace so we can see from where the error is thrown.
>>
>>
>> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
>> <[email protected]> wrote:
>> > I am trying to detect when Kafka is not available. I have modified the
>> > example -
>> >
>> > https://github.com/apache/camel/blob/master/examples/camel-
>> example-kafka/src/main/java/org/apache/camel/example/kafka
>> /MessageConsumerClient.java
>> > and
>> > added following code right after camelContext.start()
>> >
>> >
>> >
>> >         final Collection<Endpoint> endpoints =
>> camelContext.getEndpoints();
>> >
>> >         for (Endpoint endpoint : endpoints) {
>> >
>> >             if (endpoint instanceof DefaultEndpoint) {
>> >
>> >                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
>> > endpoint;
>> >
>> >                 endpoint1.setBridgeErrorHandler(true);
>> >
>> >                 final HashMap<String, Object> consumerProperties = new
>> > HashMap<>();
>> >
>> >                 consumerProperties.put("backoffMultiplier", 10);
>> >
>> >                 consumerProperties.put("backoffErrorThreshold", 5);
>> >
>> >                 endpoint1.setConsumerProperties(consumerProperties);
>> >
>> >             }
>> >
>> >         }
>> >
>> >
>> >
>> > I ran the main() and hoped to see the consumer stopping the attempts to
>> > connect to Kafka after 5 tries, but this did not work. I keep getting
>> > output messages of “Connection to node -1 could not be established.
>> Broker
>> > may not be available.”
>> >
>> > Is this the right way to go? What am I doing wrong?
>> >
>> >
>> > Thanks.
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>>
>
>
>
> --
> יעקב שיינדורף
> מנכ"ל ומנהל פיתוח,
> מילות מפתח לאתרים
> 054-5750201
> http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp>
>
> יעקב שיינדורף
> מנהל מוסיקלי StyleBrass
> 054-5750201
> http://www.stylebrass.co.il
>
>

-- 
יעקב שיינדורף
מנכ"ל ומנהל פיתוח,
מילות מפתח לאתרים
054-5750201
http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp>

יעקב שיינדורף
מנהל מוסיקלי StyleBrass
054-5750201
http://www.stylebrass.co.il

Reply via email to