Repository: camel Updated Branches: refs/heads/master 32b9bf2b8 -> 2bedc52ea
CAMEL-9958 : Create an ehcache based idempotent repository Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2bedc52e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2bedc52e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2bedc52e Branch: refs/heads/master Commit: 2bedc52ea8e40106ec004a30f1585bcc9c1dbd60 Parents: 32b9bf2 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed May 11 11:09:25 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed May 11 11:09:25 2016 +0200 ---------------------------------------------------------------------- .../idempotent/EhcacheIdempotentRepository.java | 78 +++++++++++ .../component/ehcache/EhcacheProducerTest.java | 18 +-- .../component/ehcache/EhcacheTestSupport.java | 12 +- .../EhcacheIdempotentRepositoryTest.java | 134 +++++++++++++++++++ .../test/resources/ehcache/ehcache-config.xml | 4 + 5 files changed, 236 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2bedc52e/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepository.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepository.java new file mode 100644 index 0000000..6018fb8 --- /dev/null +++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepository.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ehcache.processor.idempotent; + +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.ehcache.Cache; +import org.ehcache.CacheManager; + +public class EhcacheIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { + + private String repositoryName; + private Cache<String, Boolean> cache; + private CacheManager cacheManager; + + public EhcacheIdempotentRepository(CacheManager cacheManager) { + this(cacheManager, EhcacheIdempotentRepository.class.getSimpleName()); + } + + public EhcacheIdempotentRepository(CacheManager cacheManager, String repositoryName) { + this.repositoryName = repositoryName; + this.cacheManager = cacheManager; + } + + @Override + protected void doStart() throws Exception { + cache = cacheManager.getCache(repositoryName, String.class, Boolean.class); + } + + @Override + protected void doStop() throws Exception { + // noop + } + + @Override + public boolean add(String key) { + return cache.putIfAbsent(key, false) == null; + } + + @Override + public boolean confirm(String key) { + return cache.replace(key, false, true); + } + + @Override + public boolean contains(String key) { + return this.cache.containsKey(key); + } + + @Override + public boolean remove(String key) { + cache.remove(key); + return true; + } + + @Override + public void clear() { + cache.clear(); + } + + public String getRepositoryName() { + return repositoryName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2bedc52e/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheProducerTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheProducerTest.java index 395e8e7..ce1d0c4 100644 --- a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheProducerTest.java +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheProducerTest.java @@ -53,7 +53,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCachePut() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val = generateRandomString(); @@ -78,7 +78,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCachePutAll() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final Map<String, String> map = generateRandomMapOfString(3); MockEndpoint mock = getMockEndpoint("mock:result"); @@ -102,7 +102,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCachePutIfAbsent() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val1 = generateRandomString(); final String val2 = generateRandomString(); @@ -143,7 +143,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheGet() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val = generateRandomString(); @@ -167,7 +167,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheGetAll() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final Map<String, String> map = generateRandomMapOfString(3); final Set<String> keys = map.keySet().stream().limit(2).collect(Collectors.toSet()); @@ -199,7 +199,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheRemove() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val = generateRandomString(); @@ -223,7 +223,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheRemoveIf() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val1 = generateRandomString(); final String val2 = generateRandomString(); @@ -262,7 +262,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheRemoveAll() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final Map<String, String> map = generateRandomMapOfString(3); final Set<String> keys = map.keySet().stream().limit(2).collect(Collectors.toSet()); @@ -290,7 +290,7 @@ public class EhcacheProducerTest extends EhcacheTestSupport { @Test public void testCacheReplace() throws Exception { - final Cache<Object, Object> cache = getCache(TEST_CACHE_NAME); + final Cache<Object, Object> cache = getTestCache(); final String key = generateRandomString(); final String val1 = generateRandomString(); final String val2 = generateRandomString(); http://git-wip-us.apache.org/repos/asf/camel/blob/2bedc52e/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java index 82dfdd7..3899ad4 100644 --- a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheTestSupport.java @@ -42,10 +42,11 @@ public class EhcacheTestSupport extends CamelTestSupport { public static final Logger LOGGER = LoggerFactory.getLogger(EhcacheTestSupport.class); public static final String EHCACHE_CONFIG = "/ehcache/ehcache-config.xml"; public static final String TEST_CACHE_NAME = "mycache"; + public static final String IDEMPOTENT_TEST_CACHE_NAME = "idempotent"; @Rule public final TestName testName = new TestName(); - public CacheManager cacheManager; + protected CacheManager cacheManager; @Override protected void doPreSetup() throws Exception { @@ -77,6 +78,15 @@ public class EhcacheTestSupport extends CamelTestSupport { return cacheManager.getCache(name, Object.class, Object.class); } + protected Cache<Object, Object> getTestCache() { + return cacheManager.getCache(TEST_CACHE_NAME, Object.class, Object.class); + } + + + protected Cache<String, Boolean> getIdempotentCache() { + return cacheManager.getCache(IDEMPOTENT_TEST_CACHE_NAME, String.class, Boolean.class); + } + protected String generateRandomString() { return UUID.randomUUID().toString(); } http://git-wip-us.apache.org/repos/asf/camel/blob/2bedc52e/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepositoryTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepositoryTest.java new file mode 100644 index 0000000..e2fa324 --- /dev/null +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/processor/idempotent/EhcacheIdempotentRepositoryTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ehcache.processor.idempotent; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.ehcache.EhcacheTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.ehcache.Cache; +import org.junit.Test; + +public class EhcacheIdempotentRepositoryTest extends EhcacheTestSupport { + + private EhcacheIdempotentRepository repo; + private Cache<String, Boolean> cache; + private String key01; + private String key02; + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + repo = new EhcacheIdempotentRepository(cacheManager, IDEMPOTENT_TEST_CACHE_NAME); + + cache = getIdempotentCache(); + cache.clear(); + + key01 = generateRandomString(); + key02 = generateRandomString(); + } + + @Test + public void testAdd() throws Exception { + // add first key + assertTrue(repo.add(key01)); + assertTrue(cache.containsKey(key01)); + + // try to add the same key again + assertFalse(repo.add(key01)); + + // try to add an other one + assertTrue(repo.add(key02)); + assertTrue(cache.containsKey(key02)); + } + + @Test + public void testConfirm() throws Exception { + // add first key and confirm + assertTrue(repo.add(key01)); + assertTrue(repo.confirm(key01)); + + // try to confirm a key that isn't there + assertFalse(repo.confirm(key02)); + } + + @Test + public void testContains() throws Exception { + assertFalse(repo.contains(key01)); + + // add key and check again + assertTrue(repo.add(key01)); + assertTrue(repo.contains(key01)); + + } + + @Test + public void testRemove() throws Exception { + // add key to remove + assertTrue(repo.add(key01)); + assertTrue(repo.add(key02)); + assertTrue(cache.containsKey(key01)); + assertTrue(cache.containsKey(key02)); + + // clear repo + repo.clear(); + assertFalse(cache.containsKey(key01)); + assertFalse(cache.containsKey(key02)); + } + + @Test + public void testClear() throws Exception { + // add key to remove + assertTrue(repo.add(key01)); + assertTrue(repo.confirm(key01)); + + // remove key + assertTrue(repo.remove(key01)); + assertFalse(repo.confirm(key01)); + + // try to remove a key that isn't there + repo.remove(key02); + } + + @Test + public void testRepositoryInRoute() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:out"); + mock.expectedBodiesReceived("a", "b"); + // c is a duplicate + + // should be started + assertEquals("Should be started", true, repo.getStatus().isStarted()); + + // send 3 message with one duplicated key (key01) + template.sendBodyAndHeader("direct://in", "a", "messageId", key01); + template.sendBodyAndHeader("direct://in", "b", "messageId", key02); + template.sendBodyAndHeader("direct://in", "c", "messageId", key01); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct://in") + .idempotentConsumer(header("messageId"), repo) + .to("mock://out"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/2bedc52e/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml b/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml index c0e7822..dd61f90 100644 --- a/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml +++ b/components/camel-ehcache/src/test/resources/ehcache/ehcache-config.xml @@ -22,6 +22,10 @@ <ehcache:key-type>java.lang.Object</ehcache:key-type> <ehcache:value-type>java.lang.Object</ehcache:value-type> </ehcache:cache> + <ehcache:cache alias="idempotent" uses-template="default-template"> + <ehcache:key-type>java.lang.String</ehcache:key-type> + <ehcache:value-type>java.lang.Boolean</ehcache:value-type> + </ehcache:cache> <ehcache:cache-template name="default-template"> <ehcache:expiry>