Hello I'm really new to Camel concepts, our need is to create some identical routes, identical except for some parameters, from a Kafka topic to a http endpoint, with some processing in-between.
Besides this we want to explicitly commit the message consumption only when the http endpoint has been successfully called. In order to achieve this we set up a route template that carries the Route parameterization and set it up to manually commit after having called the http endpoint : public void configure() throws Exception { // @formatter:off routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) .templateParameter(Constantes.JOB_NAME) .templateParameter(Constantes.TOPIC) .templateParameter(Constantes.PUBLISHER_ID) .templateParameter(Constantes.CORRELATION_ID_PARAMETER) .templateParameter(Constantes.JOB_NAME_PARAMETER) .templateParameter(Constantes.CORRELATION_ID_PARAMETER) .from(getKafkaEndpoint()) .messageHistory() .filter(simple("${header.publisherId} == '{{publisherId}}'")) .process(messageLoggerProcessor) .process(modelMapperProcessor) .process(jsonlToArrayProcessor) .process(payloadProcessor) .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) .setHeader(Exchange.HTTP_METHOD, simple("POST")) .setHeader(Exchange.CONTENT_TYPE, constant("application/json;charset=UTF-8")) .setHeader(Constantes.ACCEPT,constant("application/json")) .setHeader(Constantes.API_KEY, constant(apiKey)) .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) .process(apiConsumerProcessorLogger) .to(this.url) .process(kafkaOffsetProcessor); // @formatter:on } private String getKafkaEndpoint() { String endpoint = "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" + kafkaBrokers; if (securityEnabled()) { endpoint += "&securityProtocol=SASL_SSL" + "&saslMechanism=PLAIN" + "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";" + "&sslTruststoreLocation=" + sslTrustStoreLocation + "&sslTruststorePassword=" + sslTruststorePassword; } return endpoint; } The problem is that we systematically get this error when a message is consumed by a route : Trace: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) at org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) at org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) My understanding is that the instance of KafkaConsumer is reused in multiple routes and therefore it generates the error, but it could be also related to using SEDA endpoint as stated here ( https://issues.apache.org/jira/browse/CAMEL-12722), which we don't explicitly do. We tried injecting a KafkaComponent local bean in the route : .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration) .end() .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", "#{{myKafkaConfiguration}}") .end() .from("#{{myKafka}}") But it ends up with another error because you cannot consume a Bean endpoint (https://camel.apache.org/components/3.18.x/bean-component.html) How to use a different KafkaConsumer for every created route ? Or, if the issue is SEDA related, how to make this route a direct route? Thank you for your help