Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x e09eef591 -> 45317409d


CAMEL-9834 : WatchConsumer does not properly set watchIndex


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ed08a7a3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ed08a7a3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ed08a7a3

Branch: refs/heads/camel-2.17.x
Commit: ed08a7a34ade56cb177a9e5731a9926e95d6cc0c
Parents: e09eef5
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Thu Apr 7 14:14:27 2016 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Wed Apr 27 12:45:26 2016 +0200

----------------------------------------------------------------------
 components/camel-etcd/pom.xml                   |  37 +++++++
 .../camel/component/etcd/EtcdComponent.java     |  25 +----
 .../camel/component/etcd/EtcdConfiguration.java |  12 +++
 .../camel/component/etcd/EtcdWatchConsumer.java | 104 ++++++++++++++-----
 .../camel/component/etcd/EtcdKeysTest.java      |   3 +-
 .../camel/component/etcd/EtcdStatsTest.java     |   2 +-
 .../camel/component/etcd/EtcdWatchTest.java     |  74 ++++++++-----
 parent/pom.xml                                  |   2 +-
 8 files changed, 181 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-etcd/pom.xml b/components/camel-etcd/pom.xml
index 3e40304..f7a75ee 100644
--- a/components/camel-etcd/pom.xml
+++ b/components/camel-etcd/pom.xml
@@ -85,4 +85,41 @@
     </dependency>
 
   </dependencies>
+
+
+  <profiles>
+    <profile>
+      <id>etcd-skip-tests</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>etcd-tests</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java
index eab5d35..3b81579 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java
@@ -49,6 +49,7 @@ public class EtcdComponent extends UriEndpointComponent {
         }
 
         EtcdNamespace namespace = 
getCamelContext().getTypeConverter().mandatoryConvertTo(EtcdNamespace.class, 
ns);
+        EtcdConfiguration configuration = loadConfiguration(new 
EtcdConfiguration(), parameters);
 
         if (namespace != null) {
             // path must start with leading slash
@@ -58,29 +59,11 @@ public class EtcdComponent extends UriEndpointComponent {
 
             switch (namespace) {
             case stats:
-                return new EtcdStatsEndpoint(
-                    uri,
-                    this,
-                    loadConfiguration(new EtcdConfiguration(), parameters),
-                    namespace,
-                    path
-                );
+                return new EtcdStatsEndpoint(uri, this, configuration, 
namespace, path);
             case watch:
-                return new EtcdWatchEndpoint(
-                    uri,
-                    this,
-                    loadConfiguration(new EtcdConfiguration(), parameters),
-                    namespace,
-                    path
-                );
+                return new EtcdWatchEndpoint(uri, this, configuration, 
namespace, path);
             case keys:
-                return new EtcdKeysEndpoint(
-                    uri,
-                    this,
-                    loadConfiguration(new EtcdConfiguration(), parameters),
-                    namespace,
-                    path
-                );
+                return new EtcdKeysEndpoint(uri, this, configuration, 
namespace, path);
             default:
                 throw new IllegalStateException("No endpoint for " + 
remaining);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
index 9fe36cd..45d50f7 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
@@ -39,6 +39,8 @@ public class EtcdConfiguration {
     private Integer timeToLive;
     @UriParam
     private Long timeout;
+    @UriParam(label = "consumer,advance", defaultValue = "0")
+    private Long fromIndex = 0L;
 
     public String getUris() {
         return uris;
@@ -128,4 +130,14 @@ public class EtcdConfiguration {
         this.timeout = timeout;
     }
 
+    public Long getFromIndex() {
+        return fromIndex;
+    }
+
+    /**
+     * The index to watch from
+     */
+    public void setFromIndex(Long fromIndex) {
+        this.fromIndex = fromIndex;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
index 4c3e6fa..9f85a0a 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
@@ -16,12 +16,14 @@
  */
 package org.apache.camel.component.etcd;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import mousio.client.promises.ResponsePromise;
 import mousio.etcd4j.requests.EtcdKeyGetRequest;
+import mousio.etcd4j.responses.EtcdErrorCode;
+import mousio.etcd4j.responses.EtcdException;
 import mousio.etcd4j.responses.EtcdKeysResponse;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,15 +32,18 @@ import org.slf4j.LoggerFactory;
 
 public class EtcdWatchConsumer extends AbstractEtcdConsumer implements 
ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdWatchConsumer.class);
+    private static final String OUTDATED_EVENT_MSG = "requested index is 
outdated and cleared";
 
     private final EtcdWatchEndpoint endpoint;
     private final EtcdConfiguration configuration;
+    private final AtomicLong index;
 
     public EtcdWatchConsumer(EtcdWatchEndpoint endpoint, Processor processor, 
EtcdConfiguration configuration, EtcdNamespace namespace, String path) {
         super(endpoint, processor, configuration, namespace, path);
 
         this.endpoint = endpoint;
         this.configuration = configuration;
+        this.index = new AtomicLong(configuration.getFromIndex());
     }
 
     @Override
@@ -59,35 +64,78 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer 
implements ResponseP
             return;
         }
 
-        try {
-            EtcdKeysResponse response = promise.get();
-
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, 
getNamespace());
-            exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, 
response.node.key);
-            exchange.getIn().setBody(response);
-
-            getProcessor().process(exchange);
-
-            watch();
-        } catch (TimeoutException e) {
-            LOGGER.debug("Timeout watching for {}", getPath());
-
-            if (configuration.isSendEmptyExchangeOnTimeout()) {
-                Exchange exchange = endpoint.createExchange();
-                try {
+        Exchange exchange = null;
+        Throwable throwable = promise.getException();
+
+        if (throwable != null && throwable instanceof EtcdException) {
+            EtcdException exception = (EtcdException) throwable;
+            // Etcd only keeps the responses of the most recent 1000 events
+            // across all etcd keys so if we wait for a cleared index, we
+            // get "index is outdated response" like:
+            //
+            // {
+            //     "errorCode" : 401,
+            //     "message"   : "The event in requested index is outdated and 
cleared",
+            //     "cause"     : "the requested history has been cleared 
[1008/8]",
+            //     "index"     : 2007
+            // }
+            //
+            // So we set the index to the one returned by the exception + 1
+            if (isOutdatedIndexException(exception)) {
+                LOGGER.debug("Outdated index, key: {}, cause={}", getPath(), 
exception.etcdCause);
+
+                // We set the index to the one returned by the exception + 1.
+                index.set(exception.index + 1);
+
+                // Clean-up the exception so it is not rethrown/handled
+                throwable = null;
+            }
+        } else {
+            try {
+                EtcdKeysResponse response = promise.get();
+
+                exchange = endpoint.createExchange();
+                exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, 
getNamespace());
+                exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, 
response.node.key);
+                exchange.getIn().setBody(response);
+
+                // Watch from the modifiedIndex + 1 of the node we got for 
ensuring
+                // no events are missed between watch commands
+                index.set(response.node.modifiedIndex + 1);
+            } catch (TimeoutException e) {
+                LOGGER.debug("Timeout watching for {}", getPath());
+
+                if (configuration.isSendEmptyExchangeOnTimeout()) {
+                    exchange = endpoint.createExchange();
                     exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, 
getNamespace());
                     exchange.getIn().setHeader(EtcdConstants.ETCD_TIMEOUT, 
true);
                     exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, 
getPath());
                     exchange.getIn().setBody(null);
+                }
+
+                throwable = null;
+            } catch (Exception e1) {
+                throwable = e1;
+            }
 
+            if (exchange != null) {
+                try {
+                    throwable = null;
                     getProcessor().process(exchange);
-                } catch (Exception e1) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, e1);
+                } catch (Exception e) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
                 }
             }
+        }
+
+        if (throwable != null){
+            handleException("Error processing etcd response", throwable);
+        }
+
+        try {
+            watch();
         } catch (Exception e) {
-            throw new IllegalArgumentException(e);
+            handleException("Error watching key " + getPath(), e);
         }
     }
 
@@ -96,7 +144,7 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer 
implements ResponseP
             return;
         }
 
-        EtcdKeyGetRequest request = getClient().get(getPath()).waitForChange();
+        EtcdKeyGetRequest request = 
getClient().get(getPath()).waitForChange(index.get());
         if (configuration.isRecursive()) {
             request.recursive();
         }
@@ -104,10 +152,14 @@ public class EtcdWatchConsumer extends 
AbstractEtcdConsumer implements ResponseP
             request.timeout(configuration.getTimeout(), TimeUnit.MILLISECONDS);
         }
 
-        try {
-            request.send().addListener(this);
-        } catch (IOException e) {
-            throw new IllegalArgumentException(e);
+        request.send().addListener(this);
+    }
+
+    private boolean isOutdatedIndexException(EtcdException exception) {
+        if (exception.isErrorCode(EtcdErrorCode.EventIndexCleared) && 
exception.etcdMessage != null) {
+            return 
exception.etcdMessage.toLowerCase().contains(OUTDATED_EVENT_MSG);
         }
+
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
index eca2339..cf16017 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
@@ -27,10 +27,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore("Etcd must be started manually")
+//@Ignore("Etcd must be started manually")
 public class EtcdKeysTest extends EtcdTest {
 
     @Test(expected = EtcdException.class)

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
index b142355..762be01 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
@@ -26,7 +26,7 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore("Etcd must be started manually")
+//@Ignore("Etcd must be started manually")
 public class EtcdStatsTest extends EtcdTest {
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
index 0bc7ab9..60cea00 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
@@ -17,29 +17,52 @@
 package org.apache.camel.component.etcd;
 
 import mousio.etcd4j.EtcdClient;
-import org.apache.camel.Exchange;
-import org.apache.camel.Predicate;
+import mousio.etcd4j.responses.EtcdErrorCode;
+import mousio.etcd4j.responses.EtcdException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore("Etcd must be started manually")
+//@Ignore("Etcd must be started manually")
 public class EtcdWatchTest extends EtcdTest {
 
     @Test
     public void testWatchWithPath() throws Exception {
-        testWatch("mock:watch-with-path", "/myKey1", true);
+        testWatch("mock:watch-with-path", "/myKey1", 10);
     }
 
     @Test
     public void testWatchWithConfigPath() throws Exception {
-        testWatch("mock:watch-with-config-path", "/myKey2", true);
+        testWatch("mock:watch-with-config-path", "/myKey2", 10);
     }
 
     @Test
     public void testWatchRecursive() throws Exception {
-        testWatch("mock:watch-recursive", "/recursive/myKey1", true);
+        testWatch("mock:watch-recursive", "/recursive/myKey1", 10);
+    }
+
+    @Test
+    public void testWatchRecovery() throws Exception {
+        final String key = "/myKeyRecovery";
+        final EtcdClient client = getClient();
+
+        try {
+            // Delete the key if present
+            client.delete(key).send().get();
+        } catch (EtcdException e) {
+            if (!e.isErrorCode(EtcdErrorCode.KeyNotFound)) {
+                throw e;
+            }
+        }
+
+        // Fill the vent backlog ( > 1000)
+        for (int i = 0; i < 2000; i++) {
+            client.put(key, "v" + i).send().get();
+        }
+
+        context().startRoute("watchRecovery");
+
+        testWatch("mock:watch-recovery", key, 10);
     }
 
     @Test
@@ -49,33 +72,25 @@ public class EtcdWatchTest extends EtcdTest {
         mock.expectedHeaderReceived(EtcdConstants.ETCD_NAMESPACE, 
EtcdNamespace.watch.name());
         mock.expectedHeaderReceived(EtcdConstants.ETCD_PATH, "/timeoutKey");
         mock.expectedHeaderReceived(EtcdConstants.ETCD_TIMEOUT, true);
-        mock.expectedMessagesMatches(new Predicate() {
-            @Override
-            public boolean matches(Exchange exchange) {
-                return exchange.getIn().getBody() == null;
-            }
-        });
-
+        mock.allMessages().body().isNull();
         mock.assertIsSatisfied();
     }
 
-    private void testWatch(String mockEndpoint, final String key, boolean 
updateKey) throws Exception {
+    private void testWatch(String mockEndpoint, final String key, int updates) 
throws Exception {
+        final String[] values = new String[updates];
+        for (int i = 0; i< updates; i++) {
+            values[i] = key + "=myValue-" + i;
+        }
+
         MockEndpoint mock = getMockEndpoint(mockEndpoint);
         mock.expectedMessageCount(2);
         mock.expectedHeaderReceived(EtcdConstants.ETCD_NAMESPACE, 
EtcdNamespace.watch.name());
         mock.expectedHeaderReceived(EtcdConstants.ETCD_PATH, key);
-        mock.expectedMessagesMatches(new Predicate() {
-            @Override
-            public boolean matches(Exchange exchange) {
-                return exchange.getIn().getBody(String.class).startsWith(key + 
"=myValue-");
-            }
-        });
+        mock.expectedBodiesReceived(values);
 
-        if (updateKey) {
-            EtcdClient client = getClient();
-            client.put(key, "myValue-1").send().get();
-            Thread.sleep(250);
-            client.put(key, "myValue-2").send().get();
+        final EtcdClient client = getClient();
+        for (int i = 0; i< updates; i++) {
+            client.put(key, "myValue-" + i).send().get();
         }
 
         mock.assertIsSatisfied();
@@ -88,8 +103,13 @@ public class EtcdWatchTest extends EtcdTest {
                 from("etcd:watch/myKey1")
                     .process(NODE_TO_VALUE_IN)
                     .to("mock:watch-with-path");
-                from("etcd:watch/recursive?recursive=true")
+                fromF("etcd:watch/myKeyRecovery?timeout=%d&fromIndex=%d", 1000 
* 60 * 5, 1)
+                    .id("watchRecovery")
+                    .autoStartup(false)
                     .process(NODE_TO_VALUE_IN)
+                    .to("mock:watch-recovery");
+                from("etcd:watch/recursive?recursive=true")
+                   .process(NODE_TO_VALUE_IN)
                     .to("log:org.apache.camel.component.etcd?level=INFO")
                     .to("mock:watch-recursive");
                 from("etcd:watch/myKey2")

http://git-wip-us.apache.org/repos/asf/camel/blob/ed08a7a3/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index f52d59b..f8a2774 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -159,7 +159,7 @@
     <el-api-1.0-version>1.0.1</el-api-1.0-version>
     <!-- embedmongo 1.50.2 do not work -->
     <embedmongo-version>1.50.1</embedmongo-version>
-    <etcd4j-version>2.10.0</etcd4j-version>
+    <etcd4j-version>2.11.0</etcd4j-version>
     <exec-maven-plugin-version>1.4.0</exec-maven-plugin-version>
     <ezmorph-bundle-version>1.0.6_1</ezmorph-bundle-version>
     <facebook4j-core-version>2.4.2</facebook4j-core-version>

Reply via email to