Repository: camel Updated Branches: refs/heads/master a29e33078 -> ff1c9598c
CAMEL-8750 Camel-Infinispan: Add Remove, RemoveAsync, Replace, ReplaceAsync operation for a specific value Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40ac1495 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40ac1495 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40ac1495 Branch: refs/heads/master Commit: 40ac14959f0ed5f37d7a7f9e8396a082ecb83d2b Parents: a29e330 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed May 6 08:53:43 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed May 6 18:03:32 2015 +0200 ---------------------------------------------------------------------- .../infinispan/InfinispanConstants.java | 1 + .../infinispan/InfinispanOperation.java | 40 ++++- .../infinispan/InfinispanProducerTest.java | 162 +++++++++++++++++++ 3 files changed, 197 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java index 481446c..2dfe003 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java @@ -22,6 +22,7 @@ public interface InfinispanConstants { String CACHE_NAME = "CamelInfinispanCacheName"; String KEY = "CamelInfinispanKey"; String VALUE = "CamelInfinispanValue"; + String OLD_VALUE = "CamelInfinispanOldValue"; String MAP = "CamelInfinispanMap"; String OPERATION = "CamelInfinispanOperation"; String PUT = "CamelInfinispanOperationPut"; http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java index 38b3e74..efbd876 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java @@ -214,12 +214,24 @@ public class InfinispanOperation { && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); - result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } } else { - result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } else { + result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } } } else { - result = cache.replace(getKey(exchange), getValue(exchange)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replace(getKey(exchange), getValue(exchange)); + } else { + result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange)); + } } setResult(result, exchange); } @@ -234,12 +246,24 @@ public class InfinispanOperation { && !ObjectHelper.isEmpty(exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT))) { long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class); String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class); - result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } else { + result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit)); + } } else { - result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } else { + result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit)); + } } } else { - result = cache.replaceAsync(getKey(exchange), getValue(exchange)); + if (ObjectHelper.isEmpty(getOldValue(exchange))) { + result = cache.replaceAsync(getKey(exchange), getValue(exchange)); + } else { + result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange)); + } } setResult(result, exchange); } @@ -268,6 +292,10 @@ public class InfinispanOperation { return exchange.getIn().getHeader(InfinispanConstants.VALUE); } + Object getOldValue(Exchange exchange) { + return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE); + } + Map<? extends Object, ? extends Object> getMap(Exchange exchange) { return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP); } http://git-wip-us.apache.org/repos/asf/camel/blob/40ac1495/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java index c806847..8910493 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanProducerTest.java @@ -688,6 +688,89 @@ public class InfinispanProducerTest extends InfinispanTestSupport { String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); assertEquals(null, resultGet); } + + @Test + public void replaceAValueByKeyWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replace", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REPLACE); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + } + + @Test + public void replaceAValueByKeyWithLifespanWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replace", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REPLACE); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(LIFESPAN_TIME * 1000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void replaceAValueByKeyWithLifespanAndMaxIdleTimeWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replace", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.OPERATION, InfinispanConstants.REPLACE); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(10000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + @Test public void replaceAValueByKeyAsync() throws Exception { @@ -764,6 +847,85 @@ public class InfinispanProducerTest extends InfinispanTestSupport { String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); assertEquals(null, resultGet); } + + @Test + public void replaceAValueByKeyAsyncWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + } + + @Test + public void replaceAValueByKeyWithLifespanAsyncWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_TIME)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(LIFESPAN_TIME * 1000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } + + @Test + public void replaceAValueByKeyWithLifespanAndMaxIdleTimeAsyncWithOldValue() throws Exception { + currentCache().put(KEY_ONE, VALUE_ONE); + + Exchange exchange = template.request("direct:replaceasync", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + exchange.getIn().setHeader(InfinispanConstants.VALUE, VALUE_TWO); + exchange.getIn().setHeader(InfinispanConstants.OLD_VALUE, VALUE_ONE); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME, new Long(LIFESPAN_FOR_MAX_IDLE)); + exchange.getIn().setHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.SECONDS.toString()); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME, new Long(MAX_IDLE_TIME)); + exchange.getIn().setHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.SECONDS.toString()); + } + }); + + assertEquals(exchange.getIn().getHeader(InfinispanConstants.RESULT, Boolean.class), true); + assertEquals(currentCache().get(KEY_ONE), VALUE_TWO); + + Thread.sleep(10000); + + exchange = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(InfinispanConstants.KEY, KEY_ONE); + } + }); + String resultGet = exchange.getIn().getHeader(InfinispanConstants.RESULT, String.class); + assertEquals(null, resultGet); + } @Test public void deletesExistingValueByKey() throws Exception {