IGNITE-2123: Need to add EntryProcessorExample to cache examples
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32cec994 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32cec994 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32cec994 Branch: refs/heads/ignite-gg-10837 Commit: 32cec99465e667e98010da87140a9eb80bfca743 Parents: 24eccb8 Author: Roman Shtykh <app...@gmail.com> Authored: Fri Dec 18 15:09:00 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Fri Dec 18 15:09:00 2015 +0300 ---------------------------------------------------------------------- RELEASE_NOTES.txt | 1 + .../datagrid/CacheEntryProcessorExample.java | 157 +++++++++++++++++++ .../datagrid/CacheEntryProcessorExample.java | 147 +++++++++++++++++ .../ScalarCacheEntryProcessorExample.scala | 125 +++++++++++++++ .../ignite/examples/CacheExamplesSelfTest.java | 8 + .../java8/examples/CacheExamplesSelfTest.java | 8 + .../tests/examples/ScalarExamplesSelfTest.scala | 5 + 7 files changed, 451 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/RELEASE_NOTES.txt ---------------------------------------------------------------------- diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt index 243ec18..b0822c9 100644 --- a/RELEASE_NOTES.txt +++ b/RELEASE_NOTES.txt @@ -18,6 +18,7 @@ Apache Ignite In-Memory Data Fabric 1.5 * Fixed and improved cache types configuration. * Fixed cache rebalancing. * Many stability and fault-tolerance fixes. +* Added example to demonstrate the usage of EntryProcessor. Complete list of closed issues: https://issues.apache.org/jira/issues/?jql=project%20%3D%20IGNITE%20AND%20fixVersion%20%3D%201.5%20AND%20status%20%3D%20closed http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java new file mode 100644 index 0000000..38d9631 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; + +/** + * This example demonstrates the simplest code that populates the distributed cache + * and co-locates simple closure execution with each key. The goal of this particular + * example is to provide the simplest code example of this logic using EntryProcessor. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheEntryProcessorExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName(); + + /** Number of keys. */ + private static final int KEY_CNT = 20; + + /** Keys predefined set. */ + private static final Set<Integer> KEYS_SET; + + /** + * Initializes keys set that is used in bulked operations in the example. + */ + static { + KEYS_SET = new HashSet<>(); + + for (int i = 0; i < KEY_CNT; i++) + KEYS_SET.add(i); + } + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Entry processor example started."); + + try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) { + // Demonstrates usage of EntryProcessor.invoke(...) method. + populateEntriesWithInvoke(cache); + + // Demonstrates usage of EntryProcessor.invokeAll(...) method. + incrementEntriesWithInvokeAll(cache); + } + } + } + + /** + * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method. + * + * @param cache Cache that must be populated. + */ + private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) { + // Must be no entry in the cache at this point. + printCacheEntries(cache); + + System.out.println(""); + System.out.println(">> Populating the cache using EntryProcessor."); + + // Invokes EntryProcessor for every key sequentially. + for (int i = 0; i < KEY_CNT; i++) { + cache.invoke(i, new EntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, + Object... objects) throws EntryProcessorException { + // Initializes entry's value if it's not set. + if (entry.getValue() == null) + entry.setValue((entry.getKey() + 1) * 10); + + return null; + } + }); + } + + // Print outs entries that are set using the EntryProcessor above. + printCacheEntries(cache); + } + + /** + * Increments values of entries stored in the cache using + * {@link IgniteCache#invokeAll(Set, EntryProcessor, Object...)} method. + * + * @param cache Cache instance. + */ + private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) { + System.out.println(""); + System.out.println(">> Incrementing values in the cache using EntryProcessor."); + + // Using EntryProcessor.invokeAll to increment every value in place. + cache.invokeAll(KEYS_SET, new EntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, + Object... arguments) throws EntryProcessorException { + + entry.setValue(entry.getValue() + 5); + + return null; + } + }); + + // Print outs entries that are incremented using the EntryProcessor above. + printCacheEntries(cache); + } + + /** + * Prints out all the entries that are stored in a cache. + * + * @param cache Cache. + */ + private static void printCacheEntries(IgniteCache<Integer, Integer> cache) { + System.out.println(); + System.out.println(">>> Entries in the cache."); + + Map<Integer, Integer> entries = cache.getAll(KEYS_SET); + + if (entries.isEmpty()) + System.out.println("No entries in the cache."); + else { + for (Map.Entry<Integer, Integer> entry : entries.entrySet()) + System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']'); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java new file mode 100644 index 0000000..fd07fa5 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.datagrid; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.processor.EntryProcessor; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; + +/** + * This example demonstrates the simplest code that populates the distributed cache and co-locates simple closure + * execution with each key. The goal of this particular example is to provide the simplest code example of this logic + * using EntryProcessor. + * <p> + * Remote nodes should always be started with special configuration file which enables P2P + * class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with + * {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheEntryProcessorExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName(); + + /** Number of keys. */ + private static final int KEY_CNT = 20; + + /** Set of predefined keys. */ + private static final Set<Integer> KEYS_SET; + + /** + * Initializes keys set that is used in bulked operations in the example. + */ + static { + KEYS_SET = new HashSet<>(); + + for (int i = 0; i < KEY_CNT; i++) + KEYS_SET.add(i); + } + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Entry processor example started."); + + try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) { + // Demonstrates usage of EntryProcessor.invoke(...) method. + populateEntriesWithInvoke(cache); + + // Demonstrates usage of EntryProcessor.invokeAll(...) method. + incrementEntriesWithInvokeAll(cache); + } + } + } + + /** + * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method. + * + * @param cache Cache that must be populated. + */ + private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) { + // Must be no entry in the cache at this point. + printCacheEntries(cache); + + System.out.println(""); + System.out.println(">> Populating the cache using EntryProcessor."); + + // Invokes EntryProcessor for every key sequentially. + for (int i = 0; i < KEY_CNT; i++) { + cache.invoke(i, (entry, object) -> { + // Initializes entry's value if it's not set. + if (entry.getValue() == null) + entry.setValue((entry.getKey() + 1) * 10); + return null; + }); + } + + // Print outs entries that are set using the EntryProcessor above. + printCacheEntries(cache); + } + + /** + * Increments values of entries stored in the cache using {@link IgniteCache#invokeAll(Set, EntryProcessor, + * Object...)} method. + * + * @param cache Cache instance. + */ + private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) { + System.out.println(""); + System.out.println(">> Incrementing values in the cache using EntryProcessor."); + + // Using EntryProcessor.invokeAll to increment every value in place. + cache.invokeAll(KEYS_SET, (entry, object) -> { + entry.setValue(entry.getValue() + 5); + + return null; + }); + + // Print outs entries that are incremented using the EntryProcessor above. + printCacheEntries(cache); + } + + /** + * Prints out all the entries that are stored in a cache. + * + * @param cache Cache. + */ + private static void printCacheEntries(IgniteCache<Integer, Integer> cache) { + System.out.println(); + System.out.println(">>> Entries in the cache."); + + Map<Integer, Integer> entries = cache.getAll(KEYS_SET); + + if (entries.isEmpty()) + System.out.println("No entries in the cache."); + else { + for (Map.Entry<Integer, Integer> entry : entries.entrySet()) + System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']'); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala new file mode 100644 index 0000000..ffcbbfd --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples + +import javax.cache.processor.{EntryProcessor, MutableEntry} + +import org.apache.ignite.IgniteCache +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +/** + * This example demonstrates the simplest code that populates the distributed cache + * and co-locates simple closure execution with each key. The goal of this particular + * example is to provide the simplest code example of this logic using EntryProcessor. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +object ScalarCacheEntryProcessorExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Name of cache. */ + private val CACHE_NAME = ScalarCacheEntryProcessorExample.getClass.getSimpleName + + /** Number of keys. */ + private val KEY_CNT = 20 + + /** Type alias. */ + type Cache = IgniteCache[String, Int] + + /* + * Note that in case of `LOCAL` configuration, + * since there is no distribution, values may come back as `nulls`. + */ + scalar(CONFIG) { + println() + println(">>> Entry processor example started.") + + val cache = createCache$[String, Int](CACHE_NAME) + + try { + populateEntriesWithInvoke(cache) + + checkEntriesInCache(cache) + + incrementEntriesWithInvoke(cache) + + checkEntriesInCache(cache) + } + finally { + cache.destroy() + } + } + + private def checkEntriesInCache(cache: Cache) { + println() + println(">>> Entries in the cache.") + + (0 until KEY_CNT).foreach(i => + println("Entry: " + cache.get(i.toString))) + } + + /** + * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to create + * entries when they don't exist. + * + * @param cache Cache to populate. + */ + private def populateEntriesWithInvoke(cache: Cache) { + (0 until KEY_CNT).foreach(i => + cache.invoke(i.toString, + new EntryProcessor[String, Int, Object]() { + override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = { + if (e.getValue == null) + e.setValue(i) + + null + } + } + ) + ) + } + + /** + * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to increment + * entries values. + * + * @param cache Cache to populate. + */ + private def incrementEntriesWithInvoke(cache: Cache) { + println() + println(">>> Incrementing values.") + + (0 until KEY_CNT).foreach(i => + cache.invoke(i.toString, + new EntryProcessor[String, Int, Object]() { + override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = { + Option(e.getValue) foreach (v => e.setValue(v + 1)) + + null + } + } + ) + ) + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index 050c59f..39c2ea6 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.examples; import org.apache.ignite.examples.datagrid.CacheAffinityExample; +import org.apache.ignite.examples.datagrid.CacheEntryProcessorExample; import org.apache.ignite.examples.datagrid.CacheApiExample; import org.apache.ignite.examples.datagrid.CacheContinuousQueryExample; import org.apache.ignite.examples.datagrid.CacheDataStreamerExample; @@ -49,6 +50,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ + public void testCacheEntryProcessorExample() throws Exception { + CacheEntryProcessorExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ public void testCacheAtomicLongExample() throws Exception { IgniteAtomicLongExample.main(EMPTY_ARGS); } http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java index 4446521..e44fec3 100644 --- a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.java8.examples; import org.apache.ignite.examples.java8.datagrid.CacheAffinityExample; +import org.apache.ignite.examples.java8.datagrid.CacheEntryProcessorExample; import org.apache.ignite.examples.java8.datagrid.CacheApiExample; import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest; @@ -36,6 +37,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { CacheAffinityExample.main(EMPTY_ARGS); } + /** + * @throws Exception If failed. + */ + public void testCacheEntryProcessorExample() throws Exception { + CacheEntryProcessorExample.main(EMPTY_ARGS); + } + // TODO: IGNITE-711 next example(s) should be implemented for java 8 // or testing method(s) should be removed if example(s) does not applicable for java 8. // /** http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala index ef56434..94c41ad 100644 --- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala +++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala @@ -35,6 +35,11 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik } /** */ + def testScalarCacheEntryProcessorExample() { + ScalarCacheEntryProcessorExample.main(EMPTY_ARGS) + } + + /** */ def testScalarCacheExample() { ScalarCacheExample.main(EMPTY_ARGS) }