Hi,

>From the code you provided, it's not very clear to me when and where you
are calling the commit. Also it's not very clear to me: which version of
Camel you are using and which kind of commit factory you are using (async
[1] or sync [2]?).

That said ...The problem here is that - as explained in the exception
message - the Kafka client cannot be accessed from a different thread.

So, I am not entirely sure that the problem is related to seda or something
like that. Also, Camel will indeed, create a different consumer for every
route.

Please, can you provide a bit more details about the code you have?

1.
https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncCommitIT.java#L51-L53
2.
https://github.com/apache/camel/blob/main/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java#L51-L53

On Fri, Oct 21, 2022 at 9:12 AM Ivan Rododendro <ivan.rododen...@gmail.com>
wrote:

> Hello
> I'm really new to Camel concepts, our need is to create some identical
> routes, identical except for some parameters, from a Kafka topic to a http
> endpoint, with some processing in-between.
>
> Besides this we want to explicitly commit the message consumption only when
> the http endpoint has been successfully called.
>
> In order to achieve this we set up a route template that carries the Route
> parameterization and set it up to manually commit after having called the
> http endpoint :
> public void configure() throws Exception {
>         // @formatter:off
>         routeTemplate(Constantes.KAFKA_GENERIC_ROUTE)
>             .templateParameter(Constantes.JOB_NAME)
>             .templateParameter(Constantes.TOPIC)
>             .templateParameter(Constantes.PUBLISHER_ID)
>             .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>             .templateParameter(Constantes.JOB_NAME_PARAMETER)
>             .templateParameter(Constantes.CORRELATION_ID_PARAMETER)
>             .from(getKafkaEndpoint())
>             .messageHistory()
>             .filter(simple("${header.publisherId} == '{{publisherId}}'"))
>             .process(messageLoggerProcessor)
>             .process(modelMapperProcessor)
>             .process(jsonlToArrayProcessor)
>             .process(payloadProcessor)
>
> .resequence(header("dmlTimestamp")).batch().timeout(maximumRequestCount)
>             .setHeader(Exchange.HTTP_METHOD, simple("POST"))
>             .setHeader(Exchange.CONTENT_TYPE,
> constant("application/json;charset=UTF-8"))
>             .setHeader(Constantes.ACCEPT,constant("application/json"))
>             .setHeader(Constantes.API_KEY, constant(apiKey))
>
>
> .throttle(maximumRequestCount).timePeriodMillis(timePeriodMillis).asyncDelayed(true)
>             .process(apiConsumerProcessorLogger)
>             .to(this.url)
>             .process(kafkaOffsetProcessor);
>         // @formatter:on
>     }
>
>     private String getKafkaEndpoint() {
>         String endpoint =
> "kafka:{{topic}}?allowManualCommit=true&autoCommitEnable=false&brokers=" +
> kafkaBrokers;
>
>         if (securityEnabled()) {
>             endpoint += "&securityProtocol=SASL_SSL" +
> "&saslMechanism=PLAIN"
>                     +
> "&saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule
> required username=\""
>                     + username + "\" password=\"" + password + "\";" +
> "&sslTruststoreLocation=" + sslTrustStoreLocation
>                     + "&sslTruststorePassword=" + sslTruststorePassword;
>         }
>
>         return endpoint;
>     }
>
> The problem is that we systematically get this error when a message is
> consumed by a route :
>
> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
>     at
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
>     at
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
>
>
> My understanding is that the instance of KafkaConsumer is reused in
> multiple routes and therefore it generates the error, but it could be also
> related to using SEDA endpoint as stated here (
> https://issues.apache.org/jira/browse/CAMEL-12722), which we don't
> explicitly do.
>
> We tried injecting a KafkaComponent local bean in the route :
>
>
> .templateBean("myKafkaConfiguration").typeClass("org.apache.camel.component.kafka.KafkaConfiguration").property("topic",
> "{{" + Constantes.TOPIC +"}}").properties(kafkaConfiguration)
>             .end()
>
> .templateBean("myKafka").typeClass("org.apache.camel.component.kafka.KafkaComponent").property("configuration",
> "#{{myKafkaConfiguration}}")
>             .end()
>             .from("#{{myKafka}}")
>
> But it ends up with another error because you cannot consume a Bean
> endpoint (https://camel.apache.org/components/3.18.x/bean-component.html)
>
> How to use a different KafkaConsumer for every created route ? Or, if the
> issue is SEDA related, how to make this route a direct route?
>
> Thank you for your help
>


-- 
Otavio R. Piske
http://orpiske.net

Reply via email to