Hello
I'm really new to Camel concepts, our need is to create some identical
routes, identical except for some parameters, from a Kafka topic to a http
endpoint, with some processing in-between.

Besides this we want to explicitly commit the message consumption only when
the http endpoint has been successfully called.

In order to achieve this we set up a route template that carries the Route
parameterization and set it up to manually commit after having called the
http endpoint :
public void configure() throws Exception {
        // @formatter:off
        routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
            .templateParameter(Constantes.JOB_NAME)
            .templateParameter(Constantes.TOPIC)
            .templateParameter(Constantes.PUBLISHER_ID)
            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
            .templateParameter(Constantes.JOB_NAME_PARAMETER)
            .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
            .from(getKafkaEndpoint())
            .messageHistory()
            .filter(simple("${header.publisherId} == '{{publisherId}}'"))
            .process(messageLoggerProcessor)
            .process(modelMapperProcessor)
            .process(jsonlToArrayProcessor)
            .process(payloadProcessor)

.resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
            .setHeader(Exchange.HTTP_METHOD, simple("POST"))
            .setHeader(Exchange.CONTENT_TYPE,
constant("application/json;charset=UTF-8"))
            .setHeader(Constantes.ACCEPT,constant("application/json"))
            .setHeader(Constantes.API_KEY, constant(apiKey))

.throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
            .process(apiConsumerProcessorLogger)
            .to(this.url)
            .process(kafkaOffsetProcessor);
        // @formatter:on
    }

    private String getKafkaEndpoint() {
        String endpoint =
"kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" +
kafkaBrokers;

        if (securityEnabled()) {
            endpoint += "&securityProtocol=SASL_SSL" +
"&saslMechanism=PLAIN"
                    +
"&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
required username=\""
                    + username + "\" password=\"" + password + "\";" +
"&sslTruststoreLocation=" + sslTrustStoreLocation
                    + "&sslTruststorePassword=" + sslTruststorePassword;
        }

        return endpoint;
    }

The problem is that we systematically get this error when a message is
consumed by a route :

Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
safe for multi-threaded access
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
    at 
org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
    at 
org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)


My understanding is that the instance of KafkaConsumer is reused in
multiple routes and therefore it generates the error, but it could be also
related to using SEDA endpoint as stated here (
https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
explicitly do.

We tried injecting a KafkaComponent local bean in the route :

.templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
"{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
            .end()
            
.templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
"#{{myKafkaConfiguration}}")
            .end()
            .from("#{{myKafka}}")

But it ends up with another error because you cannot consume a Bean
endpoint (https://camel.apache.org/components/3.18.x/bean-component.html)

How to use a different KafkaConsumer for every created route ? Or, if the
issue is SEDA related, how to make this route a direct route?

Thank you for your help

Reply via email to