CAMEL-9957 Let the KafkaEndpoint decide which kind of producer need to be return
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fb3728ce Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fb3728ce Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fb3728ce Branch: refs/heads/camel-2.17.x Commit: fb3728cef8f20889d0fa8c0878c0552690a53623 Parents: 11587eb Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue May 17 12:00:33 2016 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue May 17 12:09:03 2016 +0800 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaEndpoint.java | 8 +++++++- .../apache/camel/component/kafka/KafkaProducer.java | 15 +++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fb3728ce/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index d995475..999be5d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -72,7 +73,12 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS @Override public Producer createProducer() throws Exception { - return createProducer(this); + KafkaProducer producer = createProducer(this); + if (isSynchronous()) { + return new SynchronousDelegateProducer(producer); + } else { + return producer; + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/fb3728ce/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 2138df1..1254e97 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -126,6 +126,7 @@ public class KafkaProducer extends DefaultAsyncProducer { @Override @SuppressWarnings("unchecked") + // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it public void process(Exchange exchange) throws Exception { ProducerRecord record = createRecorder(exchange); kafkaProducer.send(record).get(); @@ -135,19 +136,13 @@ public class KafkaProducer extends DefaultAsyncProducer { @SuppressWarnings("unchecked") public boolean process(Exchange exchange, AsyncCallback callback) { try { - if (endpoint.isSynchronous()) { - // force process using synchronous call on kafka - process(exchange); - } else { - ProducerRecord record = createRecorder(exchange); - kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); - // return false to process asynchronous - return false; - } + ProducerRecord record = createRecorder(exchange); + kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); + // return false to process asynchronous + return false; } catch (Exception ex) { exchange.setException(ex); } - callback.done(true); return true; }