This is an automated email from the ASF dual-hosted git repository.

claudio4j pushed a commit to branch kafka-tx
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot-examples.git

commit 29a01d8b97ef516e8ecf6c4646fdc4ea0394f9eb
Author: Claudio Miranda <clau...@claudius.com.br>
AuthorDate: Wed Jul 16 17:33:54 2025 +0100

    Kafka and SQL transactions quickstart
---
 README.adoc                                        |   4 +-
 kafka-sql-transaction/README.adoc                  | 228 +++++++++++++++++++++
 kafka-sql-transaction/pom.xml                      | 136 ++++++++++++
 .../org/apache/camel/example/kafka/KafkaRoute.java |  61 ++++++
 .../camel/example/kafka/MyCamelApplication.java    |  29 +++
 .../src/main/resources/application.yaml            |  44 ++++
 .../src/main/resources/schema.sql                  |   4 +
 pom.xml                                            |   1 +
 8 files changed, 506 insertions(+), 1 deletion(-)

diff --git a/README.adoc b/README.adoc
index bf279ec..f12b9c5 100644
--- a/README.adoc
+++ b/README.adoc
@@ -27,7 +27,7 @@ readme's instructions.
 === Examples
 
 // examples: START
-Number of Examples: 64 (0 deprecated)
+Number of Examples: 65 (0 deprecated)
 
 [width="100%",cols="4,2,4",options="header"]
 |===
@@ -132,6 +132,8 @@ Number of Examples: 64 (0 deprecated)
 
 | link:kafka-offsetrepository/README.adoc[Kafka Offsetrepository] 
(kafka-offsetrepository) | Messaging | An example for Kafka offsetrepository
 
+| link:kafka-sql-transaction/README.adoc[Kafka Sql Transaction] 
(kafka-sql-transaction) | Messaging | An example of Kafka and SQL transactions.
+
 | link:paho-mqtt5-shared-subscriptions/README.adoc[Paho Mqtt5 Shared 
Subscriptions] (paho-mqtt5-shared-subscriptions) | Messaging | An example 
showing  how to set up multiple mqtt5 consumers that use shared subscription 
feature of MQTT5
 
 | link:rabbitmq/readme.adoc[Rabbitmq] (rabbitmq) | Messaging | An example 
showing how to work with Camel and RabbitMQ
diff --git a/kafka-sql-transaction/README.adoc 
b/kafka-sql-transaction/README.adoc
new file mode 100644
index 0000000..e7a2536
--- /dev/null
+++ b/kafka-sql-transaction/README.adoc
@@ -0,0 +1,228 @@
+== Camel Kafka and SQL transactions example
+
+=== Introduction
+
+An example that shows how to integrate 
https://camel.apache.org/components/next/kafka-component.html[Camel Kafka] and 
https://camel.apache.org/components/next/sql-component.html[Camel SQL] whose 
data is managed by a transaction, there are four examples that shows how these 
integrations may be affected by the transaction support in Kafka and the SQL 
components.
+
+The camel 
https://camel.apache.org/components/next/kafka-component.html#_kafka_transaction[kafka
 component has support for transactions], since camel 4.13 you can use the 
`transacted=true` parameter in either the kafka endpoint or in the 
`application.properties`. If you use camel 4.12 or before, then you have to use 
the `additionalProperties[transactional.id]` parameter.
+
+The main use case of this quickstart is a route that sends to a kafka topic 
and inserts a row to ta sql table, then in case of failure in the SQL 
operation, there is a rollback and the message is not sent to the kafka topic.
+
+To simulate the rollback, the table `foo` has an unique constraint in the 
`name` column, so for the example we will try to insert a duplicate name, 
causing the sql `insert` operation to fail and the exchange route marked for 
rollback.
+
+This example requires docker and 
https://camel.apache.org/manual/camel-jbang.html#_installation[camel-jbang].
+
+NOTE: This example makes use of the local transaction manager in Kafka client 
and the JDBC driver, this is not a JTA managed transaction, given that kafka 
doesn't support a JTA transaction api.
+
+
+=== Start the Kafka and PostgreSQL server
+
+* Use camel-jbang
+
+In camel-jbang there is a `camel infra` command to start services, before 
camel 4.13 the postgresql and kafka services were bound to random ports, but 
since camel 4.13 the service is bound to a fixed port. Then we suggest to use 
the latest camel-jbang to launch the service with a fixed port, so you don't 
have to manually update the port in `src/main/resources/application.yaml`.
+
+To start the postgresql server
+```
+camel infra run postgres
+```
+It will output this:
+```
+Starting service postgres
+{
+  "getServiceAddress" : "localhost:5432",
+  "host" : "localhost",
+  "password" : "test",
+  "port" : 5432,
+  "userName" : "test"
+}
+Press any key to stop the execution
+```
+
+If the port is different than `5432` then you should update the 
`src/main/resources/application.yaml`.
+
+To start the kafka server
+```
+camel infra run kafka
+```
+
+It will output this:
+```
+Starting service kafka
+{
+  "brokers" : "localhost:9092",
+  "getBootstrapServers" : "localhost:9092"
+}
+Press any key to stop the execution
+```
+
+If the port is different than `9092` then you should update the 
`src/main/resources/application.yaml`.
+
+* Use docker
+
+You can use docker in case you don't want to use camel-jbang.
+
+To start the postgresql server
+```
+docker run --rm --name postgresql -e POSTGRES_USER=test -e 
POSTGRES_PASSWORD=test -p 5432:5432 mirror.gcr.io/library/postgres:latest
+```
+
+To start the kafka server
+```
+docker run --rm --name kafka -p 9092:9092  mirror.gcr.io/apache/kafka:3.9.1
+```
+
+* Log the messages sent to the topic
+
+You can follow the messages sent to the kafka topic to make sure which 
messages were commited to the topic, note the isolation level of the consumer 
to read only the commited messages.
+
+```
+docker exec -it `docker ps|grep '9092->9092'|awk '{print $1}'` 
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic foo --isolation-level read_committed
+```
+
+=== Build and run
+
+Build and run the quickstart.
+
+```
+mvn compile spring-boot:run
+```
+
+There are 4 routes represented by the HTTP endpoints:
+
+```
+1. http://localhost:8080/send/{word}
+2. http://localhost:8080/send2/{word}
+3. http://localhost:8080/sendtx/{word}
+4. http://localhost:8080/sendtx2/{word}
+```
+
+* 1 - No transaction - SQL Insert and send message to kafka topic
+
+The 1st route has no transaction support, once it receives the message from 
the consumer it executes the SQL insert, then sends the message to the kafka 
topic. If the SQL operation fails, the exception handling mechanism will 
interrupt the code execution and the kafka producer won't send the message to 
the kafka topic.
+
+Run the following command:
+```
+curl http://localhost:8080/send/bar1
+```
+
+You should see compreensive log in the example terminal with `http word: bar1` 
and the `{"foo": "bar1"}` in the kafka-consumer log terminal.
+
+Then if you re-run the curl command, it should show the error in the example 
terminal:
+```
+org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; 
ERROR: duplicate key value violates unique constraint "foo_name_key"
+  Detail: Key (name)=(bar1) already exists.
+```
+
+And no message show in the kafka-consumer terminal, as if you examine the 
route, the kafka producer is not run, since the SQL insert fails before.
+```
+from("platform-http:/send/{word}")
+    .log("http word: ${header.word}")
+    .to("sql:" + insert)
+    .setBody(simple("{\"foo\": \"${header.word}\"}"))
+    .to("kafka:foo");
+```
+
+* 2 - No transaction - Send message to kafka topic and SQL Insert
+
+The 2nd route has no transaction support, once it receives the message from 
the consumer it sends the message to the kafka topic then executes the SQL 
insert command. If the SQL operation fails, as there is no transaction the 
message is not marked for rollback.
+
+Run the following command:
+```
+curl http://localhost:8080/send2/bar2
+```
+
+You should see compreensive log in the example terminal with `http word: bar2` 
and the `{"foo": "bar2"}` in the kafka-consumer log terminal.
+
+Then if you re-run the curl command, it should show the error in the example 
terminal:
+```
+org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; 
ERROR: duplicate key value violates unique constraint "foo_name_key"
+  Detail: Key (name)=(bar2) already exists.
+```
+
+The `{"foo": "bar2"}` message show in the kafka-consumer terminal shows that 
the message is in the kafka topic, since there is no transaction in the kafka 
client, there is no rollback to perform.
+```
+from("platform-http:/send2/{word}")
+    .log("http word: ${header.word}")
+    .setBody(simple("{\"foo\": \"${header.word}\"}"))jiuredhat
+    .to("kafka:foo")
+    .to("sql:" + insert);
+```
+
+* 3 - With transaction - SQL Insert and send message to kafka topic
+
+The 3rd route has transaction support, once it receives the message from the 
consumer it executes the SQL insert, then sends the message to the kafka topic. 
If the SQL operation fails, the exception handling mechanism will interrupt the 
code execution and the kafka producer won't send the message to the kafka topic.
+
+Run the following command:
+```
+curl http://localhost:8080/sendtx/bar3
+```
+
+You should see compreensive log in the example terminal with `http word: bar3` 
message content and the kafka producer commit like `Commit kafka transaction 
endpoint27-route16 with exchange F865E9F937249D7-0000000000000001` and the 
`{"foo": "bar3"}` in the kafka-consumer log terminal.
+
+Then if you re-run the curl command, it should show the error in the example 
terminal:
+```
+org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; 
ERROR: duplicate key value violates unique constraint "foo_name_key"
+  Detail: Key (name)=(bar3) already exists.
+```
+
+And no message show in the kafka-consumer terminal, as if you examine the 
route, the kafka producer doesn't run, since the SQL insert fails before.
+
+You can see there are additional code in comparison to the first route, the 
`onException` that marks the route for rollback and the `transacted=true` 
parameter of the kafka endpoint.
+
+```
+from("platform-http:/sendtx/{word}")
+    .onException(Exception.class)
+        .handled(true)
+        .rollback("Expected error when trying to insert duplicate values in 
the unique column.")
+    .end()
+    .log("http word: ${header.word}")
+    .to("sql:" + insert)
+    .setBody(simple("{\"foo\": \"${header.word}\"}"))
+    .to("kafka:foo?transacted=true");
+```
+
+* 4 - With transaction - Send message to kafka topic and SQL Insert
+
+The 4th route has transaction support, once it receives the message from the 
consumer it sends the message to the kafka topic and executes the SQL insert. 
you can note the kafka delivery occurs before the SQL operation, so if the SQL 
operation fails, the `onException` handling mechanism will catch the error and 
will mark the route exchange to rollback, then cascades to the kafka client to 
rollback the message delivery to the topic.
+
+Run the following command:
+```
+curl http://localhost:8080/sendtx2/bar4
+```
+
+You should see compreensive log in the example terminal with `http word: bar4` 
message content and the kafka producer commit like `Commit kafka transaction 
endpoint3-route16 with exchange F865E9F937249D7-0000000000000001` and the 
`{"foo": "bar4"}` in the kafka-consumer log terminal.
+
+
+Then if you re-run the curl command, it should show the error in the example 
terminal:
+```
+org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; 
ERROR: duplicate key value violates unique constraint "foo_name_key"
+  Detail: Key (name)=(bar4) already exists.
+```
+
+And no message show in the kafka-consumer terminal, as if you examNine the 
route, the kafka producer runs but the route exchange is marked for rollback, 
so the message is not commited to the topic.
+
+You can see there are additional code in comparison to the first route, the 
`onException` that marks the route for rollback and the `transacted=true` 
parameter of the kafka endpoint.
+
+```
+from("platform-http:/sendtx2/{word}")
+    .onException(Exception.class)
+        .handled(true)
+        .rollback("Expected error when trying to insert duplicate values in 
the unique column.")
+    .end()
+    .log("http word: ${header.word}")
+    .setBody(simple("{\"foo\": \"${header.word}\"}"))
+    .to("kafka:foo?transacted=true")
+    .to("sql:" + insert);
+```
+
+Press `Ctrl-C` to exit.
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback,
+then please https://camel.apache.org/community/support/[let us know].
+
+We also love contributors,
+so https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git a/kafka-sql-transaction/pom.xml b/kafka-sql-transaction/pom.xml
new file mode 100644
index 0000000..9891421
--- /dev/null
+++ b/kafka-sql-transaction/pom.xml
@@ -0,0 +1,136 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.springboot.example</groupId>
+        <artifactId>examples</artifactId>
+        <version>4.14.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-example-spring-boot-kafka-sql-transaction</artifactId>
+    <name>Camel SB Examples :: Kafka :: SQL with transactions</name>
+    <description>An example of Kafka and SQL transactions.</description>
+
+    <properties>
+        <category>Messaging</category>
+    </properties>
+
+    <!-- Spring-Boot and Camel BOM -->
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring-boot-version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.camel.springboot</groupId>
+                <artifactId>camel-spring-boot-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <!-- Spring Boot -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+                       <groupId>org.springframework.boot</groupId>
+                       <artifactId>spring-boot-starter-jdbc</artifactId>
+               </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-devtools</artifactId>
+        </dependency>
+
+        <!-- Camel -->
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-platform-http-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-kafka-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.springboot</groupId>
+            <artifactId>camel-sql-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jta</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.5</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                               <groupId>org.apache.camel</groupId>
+                               
<artifactId>camel-package-maven-plugin</artifactId>
+                               <version>${project.version}</version>
+                               <configuration>
+                                       <startingFolder></startingFolder>
+                                       <filter>spring-boot</filter>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                        <goals>
+                                                       
<goal>prepare-example</goal>
+                                               </goals>
+                                               <phase>process-resources</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${spring-boot-version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/KafkaRoute.java
 
b/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/KafkaRoute.java
new file mode 100644
index 0000000..8290b4d
--- /dev/null
+++ 
b/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/KafkaRoute.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.kafka;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.springframework.stereotype.Component;
+
+@Component
+public class KafkaRoute extends RouteBuilder {
+
+    @Override
+    public void configure() throws Exception {
+        String insert = "insert into foo(name) values (:#word)";
+
+        from("platform-http:/send/{word}")
+            .log("http word: ${header.word}")
+            .to("sql:" + insert)
+            .setBody(simple("{\"foo\": \"${header.word}\"}"))
+            .to("kafka:foo");
+
+        from("platform-http:/send2/{word}")
+            .log("http word: ${header.word}")
+            .setBody(simple("{\"foo\": \"${header.word}\"}"))
+            .to("kafka:foo")
+            .to("sql:" + insert);
+
+        from("platform-http:/sendtx/{word}")
+            .onException(Exception.class)
+                .handled(true)
+                .rollback("Expected error when trying to insert duplicate 
values in the unique column.")
+            .end()
+            .log("http word: ${header.word}")
+            .to("sql:" + insert)
+            .setBody(simple("{\"foo\": \"${header.word}\"}"))
+            .to("kafka:foo?transacted=true");
+
+        from("platform-http:/sendtx2/{word}")
+            .onException(Exception.class)
+                .handled(true)
+                .rollback("Expected error when trying to insert duplicate 
values in the unique column.")
+            .end()
+            .log("http word: ${header.word}")
+            .setBody(simple("{\"foo\": \"${header.word}\"}"))
+            .to("kafka:foo?transacted=true")
+            .to("sql:" + insert);
+    }
+}
diff --git 
a/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/MyCamelApplication.java
 
b/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/MyCamelApplication.java
new file mode 100644
index 0000000..c4060de
--- /dev/null
+++ 
b/kafka-sql-transaction/src/main/java/org/apache/camel/example/kafka/MyCamelApplication.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.kafka;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class MyCamelApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(MyCamelApplication.class, args);
+    }
+
+}
diff --git a/kafka-sql-transaction/src/main/resources/application.yaml 
b/kafka-sql-transaction/src/main/resources/application.yaml
new file mode 100644
index 0000000..c2d96d0
--- /dev/null
+++ b/kafka-sql-transaction/src/main/resources/application.yaml
@@ -0,0 +1,44 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+camel:
+  component:
+    kafka:
+      brokers: localhost:9092
+      retries: 2
+spring:
+  datasource:
+    url: jdbc:postgresql://localhost:5432/test
+    username: test
+    password: test
+  sql:
+    init:
+      # run the schema.sql at initialization time
+      mode: always
+      # continue to run if the initialization fails
+      continue-on-error: true
+logging:
+  level:
+    org.apache.camel.component.kafka: trace
+    org.apache.kafka: trace
+    org.apache.kafka.clients.producer.KafkaProducer: trace
+    org.apache.kafka.clients.producer.ProducerConfig: info
+    org.apache.kafka.clients.NetworkClient: warn
+    org.apache.kafka.clients.Metadata: warn
+    org.apache.kafka.clients.producer: warn
+    org.apache.kafka.common.metrics: warn
+    org.apache.kafka.common.telemetry: warn
\ No newline at end of file
diff --git a/kafka-sql-transaction/src/main/resources/schema.sql 
b/kafka-sql-transaction/src/main/resources/schema.sql
new file mode 100644
index 0000000..78ea255
--- /dev/null
+++ b/kafka-sql-transaction/src/main/resources/schema.sql
@@ -0,0 +1,4 @@
+create table foo(
+  id SERIAL PRIMARY KEY,
+  name VARCHAR (10) NOT NULL unique
+);
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a209f56..6804134 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
                <module>jira</module>
                <module>jolokia</module>
                <module>kafka-avro</module>
+        <module>kafka-sql-transaction</module>
                <module>kafka-oauth</module>
                <module>kafka-offsetrepository</module>
                <module>kamelet-chucknorris</module>

Reply via email to