No solution by Camel for detecting connection errors? I gave a very clear reproducible scenario...
2017-11-28 11:44 GMT+02:00 Yacov Schondorf <[email protected]>: > But this is exactly my point - there is no stack trace! I want there to be > a stacktrace so that I could catch it using the regular error handler. This > is the purpose of the call to endpoint1.setBridgeErrorHandler(true); > However, the call does not work, no trace is printed and the polling > continues. Here is the complete code based on https://github.com/apache/c > amel/blob/master/examples/camel-example-kafka/src/main/java/ > org/apache/camel/example/kafka/MessageConsumerClient.java with my > addition between the *// change start *and *// change end *blocks: > > package org.apache.camel.example.kafka; > > import org.apache.camel.CamelContext; > import org.apache.camel.Endpoint; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.kafka.KafkaEndpoint; > import org.apache.camel.component.properties.PropertiesComponent; > import org.apache.camel.impl.DefaultCamelContext; > import org.apache.camel.impl.DefaultEndpoint; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.util.Collection; > import java.util.HashMap; > import java.util.Iterator; > > public final class MessageConsumerClient { > > private static final Logger LOG = LoggerFactory.getLogger( > MessageConsumerClient.class); > > private MessageConsumerClient() { > } > > public static void main(String[] args) throws Exception { > > LOG.info("About to run Kafka-camel integration..."); > > CamelContext camelContext = new DefaultCamelContext(); > > // Add route to send messages to Kafka > > camelContext.addRoutes(new RouteBuilder() { > public void configure() { > PropertiesComponent pc = > getContext().getComponent("properties", > PropertiesComponent.class); > pc.setLocation("classpath:application.properties"); > > log.info("About to start route: Kafka Server -> Log "); > onException(Exception.class).process(new Processor() { > @Override > public void process(Exchange exchange) throws > Exception { > System.out.println("Exception occurred!!"); > } > }); > from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{ > kafka.port}}" > + "&maxPollRecords={{consumer.maxPollRecords}}" > + "&consumersCount={{consumer.consumersCount}}" > + "&seekTo={{consumer.seekTo}}" > + "&groupId={{consumer.group}}") > .routeId("FromKafka") > .log("${body}"); > } > }); > > camelContext.start(); > > // > // change start > // > final Collection<Endpoint> endpoints = camelContext.getEndpoints(); > for (Endpoint endpoint : endpoints) { > if (endpoint instanceof DefaultEndpoint) { > final DefaultEndpoint endpoint1 = (DefaultEndpoint) > endpoint; > endpoint1.setBridgeErrorHandler(true); > final HashMap<String, Object> consumerProperties = new > HashMap<>(); > consumerProperties.put("backoffMultiplier", 10); > consumerProperties.put("backoffErrorThreshold", 5); > endpoint1.setConsumerProperties(consumerProperties); > } > } > // > // change end here > // > > // let it run for 5 minutes before shutting down > Thread.sleep(5 * 60 * 1000); > > camelContext.stop(); > } > > } > > > > > > 2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>: > >> Post the stactrace so we can see from where the error is thrown. >> >> >> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf >> <[email protected]> wrote: >> > I am trying to detect when Kafka is not available. I have modified the >> > example - >> > >> > https://github.com/apache/camel/blob/master/examples/camel- >> example-kafka/src/main/java/org/apache/camel/example/kafka >> /MessageConsumerClient.java >> > and >> > added following code right after camelContext.start() >> > >> > >> > >> > final Collection<Endpoint> endpoints = >> camelContext.getEndpoints(); >> > >> > for (Endpoint endpoint : endpoints) { >> > >> > if (endpoint instanceof DefaultEndpoint) { >> > >> > final DefaultEndpoint endpoint1 = (DefaultEndpoint) >> > endpoint; >> > >> > endpoint1.setBridgeErrorHandler(true); >> > >> > final HashMap<String, Object> consumerProperties = new >> > HashMap<>(); >> > >> > consumerProperties.put("backoffMultiplier", 10); >> > >> > consumerProperties.put("backoffErrorThreshold", 5); >> > >> > endpoint1.setConsumerProperties(consumerProperties); >> > >> > } >> > >> > } >> > >> > >> > >> > I ran the main() and hoped to see the consumer stopping the attempts to >> > connect to Kafka after 5 tries, but this did not work. I keep getting >> > output messages of “Connection to node -1 could not be established. >> Broker >> > may not be available.” >> > >> > Is this the right way to go? What am I doing wrong? >> > >> > >> > Thanks. >> >> >> >> -- >> Claus Ibsen >> ----------------- >> http://davsclaus.com @davsclaus >> Camel in Action 2: https://www.manning.com/ibsen2 >> > > > > -- > יעקב שיינדורף > מנכ"ל ומנהל פיתוח, > מילות מפתח לאתרים > 054-5750201 > http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp> > > יעקב שיינדורף > מנהל מוסיקלי StyleBrass > 054-5750201 > http://www.stylebrass.co.il > > -- יעקב שיינדורף מנכ"ל ומנהל פיתוח, מילות מפתח לאתרים 054-5750201 http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp> יעקב שיינדורף מנהל מוסיקלי StyleBrass 054-5750201 http://www.stylebrass.co.il
