CAMEL-9957 Let the KafkaEndpoint decide which kind of producer need to be return


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fb3728ce
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fb3728ce
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fb3728ce

Branch: refs/heads/camel-2.17.x
Commit: fb3728cef8f20889d0fa8c0878c0552690a53623
Parents: 11587eb
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue May 17 12:00:33 2016 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue May 17 12:09:03 2016 +0800

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaEndpoint.java  |  8 +++++++-
 .../apache/camel/component/kafka/KafkaProducer.java  | 15 +++++----------
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fb3728ce/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index d995475..999be5d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -72,7 +73,12 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
 
     @Override
     public Producer createProducer() throws Exception {
-        return createProducer(this);
+        KafkaProducer producer = createProducer(this);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(producer);
+        } else {
+            return producer;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/fb3728ce/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2138df1..1254e97 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -126,6 +126,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
     @Override
     @SuppressWarnings("unchecked")
+    // Camel calls this method if the endpoint isSynchronous(), as the 
KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
         ProducerRecord record = createRecorder(exchange);
         kafkaProducer.send(record).get();
@@ -135,19 +136,13 @@ public class KafkaProducer extends DefaultAsyncProducer {
     @SuppressWarnings("unchecked")
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            if (endpoint.isSynchronous()) {
-                // force process using synchronous call on kafka
-                process(exchange);
-            } else {
-                ProducerRecord record = createRecorder(exchange);
-                kafkaProducer.send(record, new KafkaProducerCallBack(exchange, 
callback));
-                // return false to process asynchronous
-                return false;
-            }
+            ProducerRecord record = createRecorder(exchange);
+            kafkaProducer.send(record, new KafkaProducerCallBack(exchange, 
callback));
+            // return false to process asynchronous
+            return false;
         } catch (Exception ex) {
             exchange.setException(ex);
         }
-
         callback.done(true);
         return true;
     }

Reply via email to