Hi, >From the code you provided, it's not very clear to me when and where you are calling the commit. Also it's not very clear to me: which version of Camel you are using and which kind of commit factory you are using (async [1] or sync [2]?).
That said ...The problem here is that - as explained in the exception message - the Kafka client cannot be accessed from a different thread. So, I am not entirely sure that the problem is related to seda or something like that. Also, Camel will indeed, create a different consumer for every route. Please, can you provide a bit more details about the code you have? 1. https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53 2. https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53 On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <ivan.rododen...@gmail.com> wrote: > Hello > I'm really new to Camel concepts, our need is to create some identical > routes, identical except for some parameters, from a Kafka topic to a http > endpoint, with some processing in-between. > > Besides this we want to explicitly commit the message consumption only when > the http endpoint has been successfully called. > > In order to achieve this we set up a route template that carries the Route > parameterization and set it up to manually commit after having called the > http endpoint : > public void configure() throws Exception { > // @formatter:off > routeTemplate(Constantes.KAFKA_GENERIC_ROUTE) > .templateParameter(Constantes.JOB_NAME) > .templateParameter(Constantes.TOPIC) > .templateParameter(Constantes.PUBLISHER_ID) > .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > .templateParameter(Constantes.JOB_NAME_PARAMETER) > .templateParameter(Constantes.CORRELATION_ID_PARAMETER) > .from(getKafkaEndpoint()) > .messageHistory() > .filter(simple("${header.publisherId} == '{{publisherId}}'")) > .process(messageLoggerProcessor) > .process(modelMapperProcessor) > .process(jsonlToArrayProcessor) > .process(payloadProcessor) > > .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount) > .setHeader(Exchange.HTTP_METHOD, simple("POST")) > .setHeader(Exchange.CONTENT_TYPE, > constant("application/json;charset=UTF-8")) > .setHeader(Constantes.ACCEPT,constant("application/json")) > .setHeader(Constantes.API_KEY, constant(apiKey)) > > > .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true) > .process(apiConsumerProcessorLogger) > .to(this.url) > .process(kafkaOffsetProcessor); > // @formatter:on > } > > private String getKafkaEndpoint() { > String endpoint = > "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" + > kafkaBrokers; > > if (securityEnabled()) { > endpoint += "&securityProtocol=SASL_SSL" + > "&saslMechanism=PLAIN" > + > "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"" > + username + "\" password=\"" + password + "\";" + > "&sslTruststoreLocation=" + sslTrustStoreLocation > + "&sslTruststorePassword=" + sslTruststorePassword; > } > > return endpoint; > } > > The problem is that we systematically get this error when a message is > consumed by a route : > > Trace: java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) > at > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) > at > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) > > > My understanding is that the instance of KafkaConsumer is reused in > multiple routes and therefore it generates the error, but it could be also > related to using SEDA endpoint as stated here ( > https://issues.apache.org/jira/browse/CAMEL-12722), which we don't > explicitly do. > > We tried injecting a KafkaComponent local bean in the route : > > > .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic", > "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration) > .end() > > .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration", > "#{{myKafkaConfiguration}}") > .end() > .from("#{{myKafka}}") > > But it ends up with another error because you cannot consume a Bean > endpoint (https://camel.apache.org/components/3.18.x/bean-component.html) > > How to use a different KafkaConsumer for every created route ? Or, if the > issue is SEDA related, how to make this route a direct route? > > Thank you for your help > -- Otavio R. Piske http://orpiske.net