This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 32fb1f1239b08371c1ff830bc7f993b6f5d390bc Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Apr 11 14:15:19 2024 +0200 CAMEL-20758: allow user-provided subscription adapters --- .../camel-kafka/src/main/docs/kafka-component.adoc | 31 +++++- .../camel/component/kafka/KafkaConstants.java | 2 + .../camel/component/kafka/KafkaFetchRecords.java | 17 +++- .../KafkaConsumerCustomSubscribeAdapterIT.java | 105 +++++++++++++++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_6.adoc | 4 + 5 files changed, 157 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 42423f9ceb7..f48a4f5e9bd 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -494,7 +494,7 @@ static { } ---- -=== Batching Consumer +== Batching Consumer To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true`. @@ -634,6 +634,35 @@ public void configure() { } ---- +== Custom Subscription Adapters + +Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is +necessary to implement the interface `SubscribeAdapter`. + +[source,java] +.Example subscriber adapter that subscribes to a set of Kafka topics or patterns +---- +public class CustomSubscribeAdapter implements SubscribeAdapter { + @Override + public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) { + if (topicInfo.isPattern()) { + consumer.subscribe(topicInfo.getPattern(), reBalanceListener); + } else { + consumer.subscribe(topicInfo.getTopics(), reBalanceListener); + } + } +} +---- + +Then, it is necessary to add it as named bean instance to the registry: + + +[source,java] +.Add to registry example +---- +context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter()); +---- + include::spring-boot:partial$starter.adoc[] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 150614ac7d9..5fc3e184bda 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -70,6 +70,8 @@ public final class KafkaConstants { javaType = "org.apache.camel.component.kafka.consumer.KafkaManualCommit") public static final String MANUAL_COMMIT = "CamelKafkaManualCommit"; + public static final String KAFKA_SUBSCRIBE_ADAPTER = "subscribeAdapter"; + private KafkaConstants() { // Utility class } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 1fd61ff3479..26ee3cf3330 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import org.apache.camel.CamelContext; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.CommitManagers; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener; @@ -54,6 +55,8 @@ import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.rmi.registry.LocateRegistry.getRegistry; + public class KafkaFetchRecords implements Runnable { /* This keeps track of the state the record fetcher is. Because the Kafka consumer is not thread safe, it may take @@ -305,10 +308,22 @@ public class KafkaFetchRecords implements Runnable { TopicInfo topicInfo = new TopicInfo(topicPattern, topicName); - SubscribeAdapter adapter = new DefaultSubscribeAdapter(); + final CamelContext camelContext = kafkaConsumer.getEndpoint().getCamelContext(); + LOG.info("Searching for a custom subscribe adapter on the registry"); + final SubscribeAdapter adapter = resolveSubscribeAdapter(camelContext); + adapter.subscribe(consumer, listener, topicInfo); } + private static SubscribeAdapter resolveSubscribeAdapter(CamelContext camelContext) { + SubscribeAdapter adapter = camelContext.getRegistry().lookupByNameAndType(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, + SubscribeAdapter.class); + if (adapter == null) { + adapter = new DefaultSubscribeAdapter(); + } + return adapter; + } + protected void startPolling() { try { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java new file mode 100644 index 00000000000..421a92b590a --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.support.subcription.DefaultSubscribeAdapter; +import org.apache.camel.component.kafka.consumer.support.subcription.TopicInfo; +import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaConsumerCustomSubscribeAdapterIT extends BaseKafkaTestSupport { + + public static final String TOPIC = "test-subscribe-adapter"; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + private static class TestSubscribeAdapter extends DefaultSubscribeAdapter { + private volatile boolean subscribeCalled = false; + + @Override + public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) { + try { + super.subscribe(consumer, reBalanceListener, topicInfo); + } finally { + subscribeCalled = true; + } + } + + public boolean isSubscribeCalled() { + return subscribeCalled; + } + } + + @BindToRegistry(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER) + private TestSubscribeAdapter testSubscribeAdapter = new TestSubscribeAdapter(); + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + fromF("kafka:%s?brokers=%s&autoOffsetReset=earliest&consumersCount=1", + TOPIC, service.getBootstrapServers()) + .routeId("subadapter").to(KafkaTestUtil.MOCK_RESULT); + } + }; + } + + @Test + public void kafkaMessagesIsConsumedByCamel() throws Exception { + MockEndpoint to = contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT); + + to.expectedBodiesReceivedInAnyOrder("m1", "m2"); + for (int k = 1; k <= 2; k++) { + String msg = "m" + k; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg); + producer.send(data); + } + + to.assertIsSatisfied(); + + assertTrue(testSubscribeAdapter.isSubscribeCalled()); + } +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc index 7fe55d34524..258d52658e5 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc @@ -62,3 +62,7 @@ If you are migrating from Camel 4.5, please ensure the following The `PlatformHttpEngine` class has changed the `createConsumer` method to return a `org.apache.camel.component.platform.http.spi.PlatformHttpConsumer` type, instead of `org.apache.camel.Consumer`. + +=== camel-kafka + +The Kafka component now supports custom subscription adapters for applications with very complex subscription logic. \ No newline at end of file