This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 32fb1f1239b08371c1ff830bc7f993b6f5d390bc
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Thu Apr 11 14:15:19 2024 +0200

    CAMEL-20758: allow user-provided subscription adapters
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  31 +++++-
 .../camel/component/kafka/KafkaConstants.java      |   2 +
 .../camel/component/kafka/KafkaFetchRecords.java   |  17 +++-
 .../KafkaConsumerCustomSubscribeAdapterIT.java     | 105 +++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_6.adoc     |   4 +
 5 files changed, 157 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 42423f9ceb7..f48a4f5e9bd 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -494,7 +494,7 @@ static {
 }
 ----
 
-=== Batching Consumer
+== Batching Consumer
 
 To use a Kafka batching consumer with Camel, an application has to set the 
configuration `batching` to `true`.
 
@@ -634,6 +634,35 @@ public void configure() {
 }
 ----
 
+== Custom Subscription Adapters
+
+Applications with complex subscription logic may provide a custom bean to 
handle the subscription process. To so, it is
+necessary to implement the interface `SubscribeAdapter`.
+
+[source,java]
+.Example subscriber adapter that subscribes to a set of Kafka topics or 
patterns
+----
+public class CustomSubscribeAdapter implements SubscribeAdapter {
+    @Override
+    public void subscribe(Consumer<?, ?> consumer, ConsumerRebalanceListener 
reBalanceListener, TopicInfo topicInfo) {
+        if (topicInfo.isPattern()) {
+            consumer.subscribe(topicInfo.getPattern(), reBalanceListener);
+        } else {
+            consumer.subscribe(topicInfo.getTopics(), reBalanceListener);
+        }
+    }
+}
+----
+
+Then, it is necessary to add it as named bean instance to the registry:
+
+
+[source,java]
+.Add to registry example
+----
+context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new 
CustomSubscribeAdapter());
+----
+
 include::spring-boot:partial$starter.adoc[]
 
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 150614ac7d9..5fc3e184bda 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -70,6 +70,8 @@ public final class KafkaConstants {
               javaType = 
"org.apache.camel.component.kafka.consumer.KafkaManualCommit")
     public static final String MANUAL_COMMIT = "CamelKafkaManualCommit";
 
+    public static final String KAFKA_SUBSCRIBE_ADAPTER = "subscribeAdapter";
+
     private KafkaConstants() {
         // Utility class
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 1fd61ff3479..26ee3cf3330 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
 import 
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
@@ -54,6 +55,8 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.rmi.registry.LocateRegistry.getRegistry;
+
 public class KafkaFetchRecords implements Runnable {
     /*
      This keeps track of the state the record fetcher is. Because the Kafka 
consumer is not thread safe, it may take
@@ -305,10 +308,22 @@ public class KafkaFetchRecords implements Runnable {
 
         TopicInfo topicInfo = new TopicInfo(topicPattern, topicName);
 
-        SubscribeAdapter adapter = new DefaultSubscribeAdapter();
+        final CamelContext camelContext = 
kafkaConsumer.getEndpoint().getCamelContext();
+        LOG.info("Searching for a custom subscribe adapter on the registry");
+        final SubscribeAdapter adapter = resolveSubscribeAdapter(camelContext);
+
         adapter.subscribe(consumer, listener, topicInfo);
     }
 
+    private static SubscribeAdapter resolveSubscribeAdapter(CamelContext 
camelContext) {
+        SubscribeAdapter adapter = 
camelContext.getRegistry().lookupByNameAndType(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER,
+                SubscribeAdapter.class);
+        if (adapter == null) {
+            adapter = new DefaultSubscribeAdapter();
+        }
+        return adapter;
+    }
+
     protected void startPolling() {
 
         try {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
new file mode 100644
index 00000000000..421a92b590a
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerCustomSubscribeAdapterIT.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import 
org.apache.camel.component.kafka.consumer.support.subcription.DefaultSubscribeAdapter;
+import org.apache.camel.component.kafka.consumer.support.subcription.TopicInfo;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaConsumerCustomSubscribeAdapterIT extends 
BaseKafkaTestSupport {
+
+    public static final String TOPIC = "test-subscribe-adapter";
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    private static class TestSubscribeAdapter extends DefaultSubscribeAdapter {
+        private volatile boolean subscribeCalled = false;
+
+        @Override
+        public void subscribe(Consumer<?, ?> consumer, 
ConsumerRebalanceListener reBalanceListener, TopicInfo topicInfo) {
+            try {
+                super.subscribe(consumer, reBalanceListener, topicInfo);
+            } finally {
+                subscribeCalled = true;
+            }
+        }
+
+        public boolean isSubscribeCalled() {
+            return subscribeCalled;
+        }
+    }
+
+    @BindToRegistry(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER)
+    private TestSubscribeAdapter testSubscribeAdapter = new 
TestSubscribeAdapter();
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
fromF("kafka:%s?brokers=%s&autoOffsetReset=earliest&consumersCount=1",
+                        TOPIC, service.getBootstrapServers())
+                        .routeId("subadapter").to(KafkaTestUtil.MOCK_RESULT);
+            }
+        };
+    }
+
+    @Test
+    public void kafkaMessagesIsConsumedByCamel() throws Exception {
+        MockEndpoint to = 
contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
+
+        to.expectedBodiesReceivedInAnyOrder("m1", "m2");
+        for (int k = 1; k <= 2; k++) {
+            String msg = "m" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
"1", msg);
+            producer.send(data);
+        }
+
+        to.assertIsSatisfied();
+
+        assertTrue(testSubscribeAdapter.isSubscribeCalled());
+    }
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc
index 7fe55d34524..258d52658e5 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_6.adoc
@@ -62,3 +62,7 @@ If you are migrating from Camel 4.5, please ensure the 
following
 
 The `PlatformHttpEngine` class has changed the `createConsumer` method to 
return a `org.apache.camel.component.platform.http.spi.PlatformHttpConsumer` 
type,
 instead of `org.apache.camel.Consumer`.
+
+=== camel-kafka
+
+The Kafka component now supports custom subscription adapters for applications 
with very complex subscription logic.
\ No newline at end of file

Reply via email to