But this is exactly my point - there is no stack trace! I want there to be
a stacktrace so that I could catch it using the regular error handler. This
is the purpose of the call to endpoint1.setBridgeErrorHandler(true);
However, the call does not work, no trace is printed and the polling
continues. Here is the complete code based on https://github.com/apache/
camel/blob/master/examples/camel-example-kafka/src/main/
java/org/apache/camel/example/kafka/MessageConsumerClient.java with my
addition between the *// change start *and *// change end *blocks:

package org.apache.camel.example.kafka;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;

public final class MessageConsumerClient {

    private static final Logger LOG =
LoggerFactory.getLogger(MessageConsumerClient.class);

    private MessageConsumerClient() {
    }

    public static void main(String[] args) throws Exception {

        LOG.info("About to run Kafka-camel integration...");

        CamelContext camelContext = new DefaultCamelContext();

        // Add route to send messages to Kafka

        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                PropertiesComponent pc =
getContext().getComponent("properties", PropertiesComponent.class);
                pc.setLocation("classpath:application.properties");

                log.info("About to start route: Kafka Server -> Log ");
                onException(Exception.class).process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("Exception occurred!!");
                    }
                });

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                        + "&maxPollRecords={{consumer.maxPollRecords}}"
                        + "&consumersCount={{consumer.consumersCount}}"
                        + "&seekTo={{consumer.seekTo}}"
                        + "&groupId={{consumer.group}}")
                        .routeId("FromKafka")
                    .log("${body}");
            }
        });

        camelContext.start();

        //
        // change start
        //
        final Collection<Endpoint> endpoints = camelContext.getEndpoints();
        for (Endpoint endpoint : endpoints) {
            if (endpoint instanceof DefaultEndpoint) {
                final DefaultEndpoint endpoint1 = (DefaultEndpoint)
endpoint;
                endpoint1.setBridgeErrorHandler(true);
                final HashMap<String, Object> consumerProperties = new
HashMap<>();
                consumerProperties.put("backoffMultiplier", 10);
                consumerProperties.put("backoffErrorThreshold", 5);
                endpoint1.setConsumerProperties(consumerProperties);
            }
        }
        //
        // change end here
        //

        // let it run for 5 minutes before shutting down
        Thread.sleep(5 * 60 * 1000);

        camelContext.stop();
    }

}





2017-11-25 19:18 GMT+02:00 Claus Ibsen <[email protected]>:

> Post the stactrace so we can see from where the error is thrown.
>
>
> On Wed, Nov 22, 2017 at 11:01 AM, Yacov Schondorf
> <[email protected]> wrote:
> > I am trying to detect when Kafka is not available. I have modified the
> > example -
> >
> > https://github.com/apache/camel/blob/master/examples/
> camel-example-kafka/src/main/java/org/apache/camel/example/
> kafka/MessageConsumerClient.java
> > and
> > added following code right after camelContext.start()
> >
> >
> >
> >         final Collection<Endpoint> endpoints =
> camelContext.getEndpoints();
> >
> >         for (Endpoint endpoint : endpoints) {
> >
> >             if (endpoint instanceof DefaultEndpoint) {
> >
> >                 final DefaultEndpoint endpoint1 = (DefaultEndpoint)
> > endpoint;
> >
> >                 endpoint1.setBridgeErrorHandler(true);
> >
> >                 final HashMap<String, Object> consumerProperties = new
> > HashMap<>();
> >
> >                 consumerProperties.put("backoffMultiplier", 10);
> >
> >                 consumerProperties.put("backoffErrorThreshold", 5);
> >
> >                 endpoint1.setConsumerProperties(consumerProperties);
> >
> >             }
> >
> >         }
> >
> >
> >
> > I ran the main() and hoped to see the consumer stopping the attempts to
> > connect to Kafka after 5 tries, but this did not work. I keep getting
> > output messages of “Connection to node -1 could not be established.
> Broker
> > may not be available.”
> >
> > Is this the right way to go? What am I doing wrong?
> >
> >
> > Thanks.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
יעקב שיינדורף
מנכ"ל ומנהל פיתוח,
מילות מפתח לאתרים
054-5750201
http://keywords-4-websites.com <http://keywords-4-websites.com/home.jsp>

יעקב שיינדורף
מנהל מוסיקלי StyleBrass
054-5750201
http://www.stylebrass.co.il

Reply via email to