CAMEL-10271: Fixed camel-jt400 consumer to be scheduled so it reads from the
queue as the jt400 library is not event based.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77e417c2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77e417c2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77e417c2
Branch: refs/heads/camel-2.17.x
Commit: 77e417c2775b15f24cc6f47df7fbdfe7c4a436af
Parents: 1f7abfc
Author: Claus Ibsen
Authored: Mon Sep 5 18:22:28 2016 +0200
Committer: Claus Ibsen
Committed: Mon Sep 5 18:23:18 2016 +0200
--
.../component/jt400/Jt400Configuration.java | 14 +++
.../component/jt400/Jt400DataQueueConsumer.java | 42 ++--
.../camel/component/jt400/Jt400Endpoint.java| 22 +-
3 files changed, 54 insertions(+), 24 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
--
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index b33a25d..9a2383f 100644
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -113,6 +113,9 @@ public class Jt400Configuration {
@UriParam
private Integer[] outputFieldsLengthArray;
+@UriParam(label = "consumer", defaultValue = "3")
+private int readTimeout = 3;
+
public Jt400Configuration(String endpointUri, AS400ConnectionPool
connectionPool) throws URISyntaxException {
ObjectHelper.notNull(endpointUri, "endpointUri", this);
ObjectHelper.notNull(connectionPool, "connectionPool", this);
@@ -301,6 +304,17 @@ public class Jt400Configuration {
this.outputFieldsLengthArray = outputFieldsLengthArray;
}
+public int getReadTimeout() {
+return readTimeout;
+}
+
+/**
+ * Timeout in millis the consumer will wait while trying to read a new
message of the data queue.
+ */
+public void setReadTimeout(int readTimeout) {
+this.readTimeout = readTimeout;
+}
+
public void setOutputFieldsIdx(String outputFieldsIdx) {
if (outputFieldsIdx != null) {
String[] outputArray = outputFieldsIdx.split(",");
http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
--
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index c3ff46e..6bf9788 100644
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -22,16 +22,15 @@ import com.ibm.as400.access.DataQueueEntry;
import com.ibm.as400.access.KeyedDataQueue;
import com.ibm.as400.access.KeyedDataQueueEntry;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.impl.ScheduledPollConsumer;
/**
- * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
+ * A scheduled {@link org.apache.camel.Consumer} that polls a data queue for
data
*/
-public class Jt400DataQueueConsumer extends PollingConsumerSupport {
+public class Jt400DataQueueConsumer extends ScheduledPollConsumer {
-private final Jt400Endpoint endpoint;
-
/**
* Performs the lifecycle logic of this consumer.
*/
@@ -40,13 +39,28 @@ public class Jt400DataQueueConsumer extends
PollingConsumerSupport {
/**
* Creates a new consumer instance
*/
-protected Jt400DataQueueConsumer(Jt400Endpoint endpoint) {
-super(endpoint);
-this.endpoint = endpoint;
+public Jt400DataQueueConsumer(Jt400Endpoint endpoint, Processor processor)
{
+super(endpoint, processor);
this.queueService = new Jt400DataQueueService(endpoint);
}
@Override
+public Jt400Endpoint getEndpoint() {
+return (Jt400Endpoint) super.getEndpoint();
+}
+
+@Override
+protected int poll() throws Exception {
+Exchange exchange = receive(getEndpoint().getReadTimeout());
+if (exchange != null) {
+