This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 1011c43  Kafka Batch Log Example: Added a little processor to show 
records content (#61)
1011c43 is described below

commit 1011c433cf0119525090792b2efdf54a184b03eb
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Feb 15 15:02:06 2024 +0100

    Kafka Batch Log Example: Added a little processor to show records content 
(#61)
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 jbang/kafka-batch-log/BatchLog.java        | 32 +++++++++++++++
 jbang/kafka-batch-log/README.adoc          | 62 +++++++++++-------------------
 jbang/kafka-batch-log/kafka-batch-log.yaml | 12 +++---
 3 files changed, 60 insertions(+), 46 deletions(-)

diff --git a/jbang/kafka-batch-log/BatchLog.java 
b/jbang/kafka-batch-log/BatchLog.java
new file mode 100644
index 0000000..54d89bd
--- /dev/null
+++ b/jbang/kafka-batch-log/BatchLog.java
@@ -0,0 +1,32 @@
+package camel.example;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchLog implements Processor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
+
+    @Override
+    public void process(Exchange e) throws Exception {
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        // The records from the batch are stored in a list of exchanges in the 
original exchange. To process, we iterate over that list
+        for (Object obj : exchanges) {
+            if (obj instanceof Exchange) {
+                LOG.info("Processing exchange with body {}", 
((Exchange)obj).getMessage().getBody(String.class));
+            }
+        }
+    }
+
+}
diff --git a/jbang/kafka-batch-log/README.adoc 
b/jbang/kafka-batch-log/README.adoc
index ae47d6e..31715d4 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -57,7 +57,7 @@ Then you can run this example using:
 
 [source,sh]
 ----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run 
--local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run 
--local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java 
kafka-batch-log.yaml
 ----
 
 === Consumer running
@@ -105,12 +105,8 @@ In the consumer log, once the pollTimeout of 40 s 
completes, you should see an o
 
 [source,sh]
 ----
-2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[]]
-]
+2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:07.909  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
 ----
 
 If you check the situation for the consumer group 'my-group' you could see 
that the commit happened manually by using the kafka-batch-manual-commit-action.
@@ -134,39 +130,25 @@ And you should immediately see the output in group of 10 
records
 
 [source,sh]
 ----
-2024-02-05 09:50:33.947  INFO 24182 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], 
Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:44.137  INFO 24182 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], 
Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:50:54.324  INFO 24182 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], 
Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:04.535  INFO 24182 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], 
Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-2024-02-05 09:51:14.747  INFO 24182 --- [mer[test-topic]] log-sink             
               : Exchange[
-  ExchangePattern: InOnly
-  Headers: {}
-  BodyType: java.util.ArrayList
-  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], 
Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
-]
-----
-
-For the aim of this example the payload of the records is not important.
+.
+.
+.
+.
+2024-02-05 09:42:40.908  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.909  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.913  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.914  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.920  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.928  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.930  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.940  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.950  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+2024-02-05 09:42:40.955  INFO 21666 --- [mer[test-topic]] 
camel.example.BatchLog              : Processing exchange with body hello there
+.
+.
+.
+.
+----
 
 If you check again the offset for the consumers of my-group group you'll 
notice we are at offset 52 now.
 
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml 
b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 11c25df..1a779bd 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,6 +17,10 @@
 
 # camel-k: dependency=camel:kafka
 
+- beans:
+  - name: batchLog
+    type: "#class:camel.example.BatchLog"
+
 - route:
     id: "kafka-to-log"
     from:
@@ -31,11 +35,7 @@
         autoCommitEnable: false
         allowManualCommit: true
       steps:
-        - to:
-            uri: "kamelet:log-sink"
-            parameters:
-              showStreams: true
-              showHeaders: true
-              multiline: true
+        - bean:
+            ref: batchLog
         - to:
             uri: "kamelet:kafka-batch-manual-commit-action"

Reply via email to