Repository: incubator-fluo-recipes Updated Branches: refs/heads/master 573aeb788 -> 10ccb16ae
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java deleted file mode 100644 index 3f3a84c..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import java.io.File; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import org.apache.commons.io.FileUtils; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.core.serialization.SimpleSerializer; -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypeLayer; -import org.apache.fluo.recipes.core.types.TypedSnapshot; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * This test configures a small buffer size and verifies that multiple passes are made to process - * updates. - */ -public class BigUpdateIT { - private static final TypeLayer tl = new TypeLayer(new StringEncoder()); - - private MiniFluo miniFluo; - - private CollisionFreeMap<String, Long> wcMap; - - static final String MAP_ID = "bu"; - - public static class LongCombiner implements Combiner<String, Long> { - - @Override - public Optional<Long> combine(String key, Iterator<Long> updates) { - long[] count = new long[] {0}; - updates.forEachRemaining(l -> count[0] += l); - return Optional.of(count[0]); - } - } - - static final Column DSCOL = new Column("debug", "sum"); - - private static AtomicInteger globalUpdates = new AtomicInteger(0); - - public static class MyObserver extends UpdateObserver<String, Long> { - - @Override - public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) { - TypedTransactionBase ttx = tl.wrap(tx); - - Map<String, Long> expectedOld = new HashMap<>(); - - - while (updates.hasNext()) { - Update<String, Long> update = updates.next(); - - if (update.getOldValue().isPresent()) { - expectedOld.put("side:" + update.getKey(), update.getOldValue().get()); - } - - ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get()); - } - - // get last values set to verify same as passed in old value - Map<String, Long> actualOld = - Maps.transformValues( - ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL)) - .toStringMap(), m -> m.get(DSCOL).toLong()); - - MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld); - - Assert.assertTrue(diff.toString(), diff.areEqual()); - - globalUpdates.incrementAndGet(); - } - } - - @Before - public void setUpFluo() throws Exception { - FileUtils.deleteQuietly(new File("target/mini")); - - FluoConfiguration props = new FluoConfiguration(); - props.setApplicationName("eqt"); - props.setWorkerThreads(20); - props.setMiniDataDir("target/mini"); - - SimpleSerializer.setSerializer(props, TestSerializer.class); - - CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class, - MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10)); - - miniFluo = FluoFactory.newMiniFluo(props); - - wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration()); - - globalUpdates.set(0); - } - - @After - public void tearDownFluo() throws Exception { - if (miniFluo != null) { - miniFluo.close(); - } - } - - @Test - public void testBigUpdates() { - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - updateMany(fc); - - miniFluo.waitForObservers(); - - int numUpdates = 0; - - try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { - checkUpdates(snap, 1, 1000); - numUpdates = globalUpdates.get(); - // there are two buckets, expect update processing at least twice per bucket - Assert.assertTrue(numUpdates >= 4); - } - - updateMany(fc); - updateMany(fc); - - miniFluo.waitForObservers(); - - try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { - checkUpdates(snap, 3, 1000); - numUpdates = globalUpdates.get() - numUpdates; - Assert.assertTrue(numUpdates >= 4); - } - - for (int i = 0; i < 10; i++) { - updateMany(fc); - } - - miniFluo.waitForObservers(); - - try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { - checkUpdates(snap, 13, 1000); - numUpdates = globalUpdates.get() - numUpdates; - Assert.assertTrue(numUpdates >= 4); - } - } - } - - private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) { - RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build(); - - int row = 0; - - for (ColumnScanner columns : rows) { - Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow()); - - for (ColumnValue columnValue : columns) { - Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn()); - Assert - .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue()); - } - } - - Assert.assertEquals(expectedRows, row); - } - - private void updateMany(FluoClient fc) { - try (Transaction tx = fc.newTransaction()) { - Map<String, Long> updates = new HashMap<>(); - for (int i = 0; i < 1000; i++) { - updates.put(String.format("%06d", i), 1L); - } - - wcMap.update(tx, updates); - tx.commit(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java deleted file mode 100644 index f5c832a..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/CollisionFreeMapIT.java +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import com.google.common.collect.ImmutableMap; -import org.apache.commons.io.FileUtils; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.LoaderExecutor; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.core.serialization.SimpleSerializer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class CollisionFreeMapIT { - - private MiniFluo miniFluo; - - private CollisionFreeMap<String, Long> wcMap; - - static final String MAP_ID = "wcm"; - - @Before - public void setUpFluo() throws Exception { - FileUtils.deleteQuietly(new File("target/mini")); - - FluoConfiguration props = new FluoConfiguration(); - props.setApplicationName("eqt"); - props.setWorkerThreads(20); - props.setMiniDataDir("target/mini"); - - props.addObserver(new ObserverSpecification(DocumentObserver.class.getName())); - - SimpleSerializer.setSerializer(props, TestSerializer.class); - - CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class, - WordCountObserver.class, String.class, Long.class, 17)); - - miniFluo = FluoFactory.newMiniFluo(props); - - wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration()); - } - - @After - public void tearDownFluo() throws Exception { - if (miniFluo != null) { - miniFluo.close(); - } - } - - private Map<String, Long> getComputedWordCounts(FluoClient fc) { - Map<String, Long> counts = new HashMap<>(); - - try (Snapshot snap = fc.newSnapshot()) { - - CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build(); - - for (RowColumnValue rcv : scanner) { - String[] tokens = rcv.getsRow().split(":"); - String word = tokens[2]; - Long count = Long.valueOf(tokens[1]); - - Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word)); - - counts.put(word, count); - } - } - - return counts; - } - - private Map<String, Long> computeWordCounts(FluoClient fc) { - Map<String, Long> counts = new HashMap<>(); - - try (Snapshot snap = fc.newSnapshot()) { - - - CellScanner scanner = - snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build(); - - for (RowColumnValue rcv : scanner) { - String[] words = rcv.getsValue().split("\\s+"); - for (String word : words) { - if (word.isEmpty()) { - continue; - } - - counts.merge(word, 1L, Long::sum); - } - } - } - - return counts; - } - - @Test - public void testGet() { - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - try (Transaction tx = fc.newTransaction()) { - wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L)); - tx.commit(); - } - - try (Transaction tx = fc.newTransaction()) { - wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L)); - tx.commit(); - } - - try (Transaction tx = fc.newTransaction()) { - wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L)); - tx.commit(); - } - - // try reading possibly before observer combines... will either see outstanding updates or a - // current value - try (Snapshot snap = fc.newSnapshot()) { - Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); - Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog")); - Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); - } - - miniFluo.waitForObservers(); - - // in this case there should be no updates, only a current value - try (Snapshot snap = fc.newSnapshot()) { - Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); - Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog")); - Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); - } - - Map<String, Long> expectedCounts = new HashMap<>(); - expectedCounts.put("cat", 4L); - expectedCounts.put("dog", 7L); - expectedCounts.put("fish", 2L); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (Transaction tx = fc.newTransaction()) { - wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L)); - tx.commit(); - } - - // there may be outstanding update and a current value for the key in this case - try (Snapshot snap = fc.newSnapshot()) { - Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat")); - Assert.assertNull(wcMap.get(snap, "dog")); - Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); - } - - miniFluo.waitForObservers(); - - try (Snapshot snap = fc.newSnapshot()) { - Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat")); - Assert.assertNull(wcMap.get(snap, "dog")); - Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); - } - - expectedCounts.put("cat", 5L); - expectedCounts.remove("dog"); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - } - } - - @Test - public void testBasic() { - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0001", "dog cat")); - loader.execute(new DocumentLoader("0002", "cat hamster")); - loader.execute(new DocumentLoader("0003", "milk bread cat food")); - loader.execute(new DocumentLoader("0004", "zoo big cat")); - } - - miniFluo.waitForObservers(); - - try (Snapshot snap = fc.newSnapshot()) { - Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); - Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk")); - } - - Map<String, Long> expectedCounts = new HashMap<>(); - expectedCounts.put("dog", 1L); - expectedCounts.put("cat", 4L); - expectedCounts.put("hamster", 1L); - expectedCounts.put("milk", 1L); - expectedCounts.put("bread", 1L); - expectedCounts.put("food", 1L); - expectedCounts.put("zoo", 1L); - expectedCounts.put("big", 1L); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0001", "dog feline")); - } - - miniFluo.waitForObservers(); - - expectedCounts.put("cat", 3L); - expectedCounts.put("feline", 1L); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - // swap contents of two documents... should not change doc counts - loader.execute(new DocumentLoader("0003", "zoo big cat")); - loader.execute(new DocumentLoader("0004", "milk bread cat food")); - } - - miniFluo.waitForObservers(); - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0003", "zoo big cat")); - loader.execute(new DocumentLoader("0004", "zoo big cat")); - } - - miniFluo.waitForObservers(); - - expectedCounts.put("zoo", 2L); - expectedCounts.put("big", 2L); - expectedCounts.remove("milk"); - expectedCounts.remove("bread"); - expectedCounts.remove("food"); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0002", "cat cat hamster hamster")); - } - - miniFluo.waitForObservers(); - - expectedCounts.put("cat", 4L); - expectedCounts.put("hamster", 2L); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - loader.execute(new DocumentLoader("0002", "dog hamster")); - } - - miniFluo.waitForObservers(); - - expectedCounts.put("cat", 2L); - expectedCounts.put("hamster", 1L); - expectedCounts.put("dog", 2L); - - Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); - } - } - - private static String randDocId(Random rand) { - return String.format("%04d", rand.nextInt(5000)); - } - - private static String randomDocument(Random rand) { - StringBuilder sb = new StringBuilder(); - - String sep = ""; - for (int i = 2; i < rand.nextInt(18); i++) { - sb.append(sep); - sep = " "; - sb.append(String.format("%05d", rand.nextInt(50000))); - } - - return sb.toString(); - } - - public void diff(Map<String, Long> m1, Map<String, Long> m2) { - for (String word : m1.keySet()) { - Long v1 = m1.get(word); - Long v2 = m2.get(word); - - if (v2 == null || !v1.equals(v2)) { - System.out.println(word + " " + v1 + " != " + v2); - } - } - - for (String word : m2.keySet()) { - Long v1 = m1.get(word); - Long v2 = m2.get(word); - - if (v1 == null) { - System.out.println(word + " null != " + v2); - } - } - } - - @Test - public void testStress() throws Exception { - try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { - Random rand = new Random(); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - for (int i = 0; i < 1000; i++) { - loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand))); - } - } - - miniFluo.waitForObservers(); - assertWordCountsEqual(fc); - - try (LoaderExecutor loader = fc.newLoaderExecutor()) { - for (int i = 0; i < 100; i++) { - loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand))); - } - } - - miniFluo.waitForObservers(); - assertWordCountsEqual(fc); - } - } - - private void assertWordCountsEqual(FluoClient fc) { - Map<String, Long> expected = computeWordCounts(fc); - Map<String, Long> actual = getComputedWordCounts(fc); - if (!expected.equals(actual)) { - diff(expected, actual); - Assert.fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java deleted file mode 100644 index 54f5ee1..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentLoader.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import org.apache.fluo.recipes.core.types.TypedLoader; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; - -public class DocumentLoader extends TypedLoader { - - String docid; - String doc; - - DocumentLoader(String docid, String doc) { - this.docid = docid; - this.doc = doc; - } - - @Override - public void load(TypedTransactionBase tx, Context context) throws Exception { - tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java deleted file mode 100644 index 2c79f45..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/DocumentObserver.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import java.util.HashMap; -import java.util.Map; - -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.TypedObserver; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; - -public class DocumentObserver extends TypedObserver { - - CollisionFreeMap<String, Long> wcm; - - @Override - public void init(Context context) throws Exception { - wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration()); - } - - @Override - public ObservedColumn getObservedColumn() { - return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG); - } - - static Map<String, Long> getWordCounts(String doc) { - Map<String, Long> wordCounts = new HashMap<>(); - String[] words = doc.split(" "); - for (String word : words) { - if (word.isEmpty()) { - continue; - } - wordCounts.merge(word, 1L, Long::sum); - } - - return wordCounts; - } - - @Override - public void process(TypedTransactionBase tx, Bytes row, Column col) { - String newContent = tx.get().row(row).col(col).toString(); - String currentContent = tx.get().row(row).fam("content").qual("current").toString(""); - - Map<String, Long> newWordCounts = getWordCounts(newContent); - Map<String, Long> currentWordCounts = getWordCounts(currentContent); - - Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts); - - wcm.update(tx, changes); - - tx.mutate().row(row).fam("content").qual("current").set(newContent); - } - - private static Map<String, Long> calculateChanges(Map<String, Long> newCounts, - Map<String, Long> currCounts) { - Map<String, Long> changes = new HashMap<>(); - - // guava Maps class - MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts); - - // compute the diffs for words that changed - changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue() - - vDiff.leftValue())); - - // add all new words - changes.putAll(diffs.entriesOnlyOnRight()); - - // subtract all words no longer present - changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1)); - - return changes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java index 37a2443..30f93ff 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java @@ -25,6 +25,7 @@ import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.recipes.core.common.TableOptimizations; import org.apache.fluo.recipes.core.map.CollisionFreeMap.Options; +import org.apache.fluo.recipes.core.map.it.WordCountCombiner; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java deleted file mode 100644 index b59705a..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/TestSerializer.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.recipes.core.serialization.SimpleSerializer; - -public class TestSerializer implements SimpleSerializer { - - @Override - public <T> byte[] serialize(T obj) { - return obj.toString().getBytes(); - } - - @SuppressWarnings("unchecked") - @Override - public <T> T deserialize(byte[] serObj, Class<T> clazz) { - if (clazz.equals(Long.class)) { - return (T) Long.valueOf(new String(serObj)); - } - - if (clazz.equals(String.class)) { - return (T) new String(serObj); - } - - throw new IllegalArgumentException(); - } - - @Override - public void init(SimpleConfiguration appConfig) {} - -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java deleted file mode 100644 index f757c10..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountCombiner.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import java.util.Iterator; -import java.util.Optional; - -public class WordCountCombiner implements Combiner<String, Long> { - @Override - public Optional<Long> combine(String key, Iterator<Long> updates) { - long sum = 0; - - while (updates.hasNext()) { - sum += updates.next(); - } - - if (sum == 0) { - return Optional.empty(); - } else { - return Optional.of(sum); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java deleted file mode 100644 index 221083c..0000000 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/WordCountObserver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.map; - -import java.util.Iterator; -import java.util.Optional; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; - -public class WordCountObserver extends UpdateObserver<String, Long> { - - @Override - public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) { - - while (updates.hasNext()) { - Update<String, Long> update = updates.next(); - - Optional<Long> oldVal = update.getOldValue(); - Optional<Long> newVal = update.getNewValue(); - - if (oldVal.isPresent()) { - String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey()); - tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY)); - } - - if (newVal.isPresent()) { - String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey()); - tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java new file mode 100644 index 0000000..4213cff --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import java.io.File; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.fluo.recipes.core.map.CollisionFreeMap; +import org.apache.fluo.recipes.core.map.Combiner; +import org.apache.fluo.recipes.core.map.Update; +import org.apache.fluo.recipes.core.map.UpdateObserver; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; +import org.apache.fluo.recipes.core.types.StringEncoder; +import org.apache.fluo.recipes.core.types.TypeLayer; +import org.apache.fluo.recipes.core.types.TypedSnapshot; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * This test configures a small buffer size and verifies that multiple passes are made to process + * updates. + */ +public class BigUpdateIT { + private static final TypeLayer tl = new TypeLayer(new StringEncoder()); + + private MiniFluo miniFluo; + + private CollisionFreeMap<String, Long> wcMap; + + static final String MAP_ID = "bu"; + + public static class LongCombiner implements Combiner<String, Long> { + + @Override + public Optional<Long> combine(String key, Iterator<Long> updates) { + long[] count = new long[] {0}; + updates.forEachRemaining(l -> count[0] += l); + return Optional.of(count[0]); + } + } + + static final Column DSCOL = new Column("debug", "sum"); + + private static AtomicInteger globalUpdates = new AtomicInteger(0); + + public static class MyObserver extends UpdateObserver<String, Long> { + + @Override + public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) { + TypedTransactionBase ttx = tl.wrap(tx); + + Map<String, Long> expectedOld = new HashMap<>(); + + + while (updates.hasNext()) { + Update<String, Long> update = updates.next(); + + if (update.getOldValue().isPresent()) { + expectedOld.put("side:" + update.getKey(), update.getOldValue().get()); + } + + ttx.mutate().row("side:" + update.getKey()).col(DSCOL).set(update.getNewValue().get()); + } + + // get last values set to verify same as passed in old value + Map<String, Long> actualOld = + Maps.transformValues( + ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL)) + .toStringMap(), m -> m.get(DSCOL).toLong()); + + MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld); + + Assert.assertTrue(diff.toString(), diff.areEqual()); + + globalUpdates.incrementAndGet(); + } + } + + @Before + public void setUpFluo() throws Exception { + FileUtils.deleteQuietly(new File("target/mini")); + + FluoConfiguration props = new FluoConfiguration(); + props.setApplicationName("eqt"); + props.setWorkerThreads(20); + props.setMiniDataDir("target/mini"); + + SimpleSerializer.setSerializer(props, TestSerializer.class); + + CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, LongCombiner.class, + MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10)); + + miniFluo = FluoFactory.newMiniFluo(props); + + wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration()); + + globalUpdates.set(0); + } + + @After + public void tearDownFluo() throws Exception { + if (miniFluo != null) { + miniFluo.close(); + } + } + + @Test + public void testBigUpdates() { + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + updateMany(fc); + + miniFluo.waitForObservers(); + + int numUpdates = 0; + + try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { + checkUpdates(snap, 1, 1000); + numUpdates = globalUpdates.get(); + // there are two buckets, expect update processing at least twice per bucket + Assert.assertTrue(numUpdates >= 4); + } + + updateMany(fc); + updateMany(fc); + + miniFluo.waitForObservers(); + + try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { + checkUpdates(snap, 3, 1000); + numUpdates = globalUpdates.get() - numUpdates; + Assert.assertTrue(numUpdates >= 4); + } + + for (int i = 0; i < 10; i++) { + updateMany(fc); + } + + miniFluo.waitForObservers(); + + try (TypedSnapshot snap = tl.wrap(fc.newSnapshot())) { + checkUpdates(snap, 13, 1000); + numUpdates = globalUpdates.get() - numUpdates; + Assert.assertTrue(numUpdates >= 4); + } + } + } + + private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) { + RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build(); + + int row = 0; + + for (ColumnScanner columns : rows) { + Assert.assertEquals(String.format("side:%06d", row++), columns.getsRow()); + + for (ColumnValue columnValue : columns) { + Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn()); + Assert + .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue()); + } + } + + Assert.assertEquals(expectedRows, row); + } + + private void updateMany(FluoClient fc) { + try (Transaction tx = fc.newTransaction()) { + Map<String, Long> updates = new HashMap<>(); + for (int i = 0; i < 1000; i++) { + updates.put(String.format("%06d", i), 1L); + } + + wcMap.update(tx, updates); + tx.commit(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java new file mode 100644 index 0000000..e22ddea --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.FileUtils; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.LoaderExecutor; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.fluo.recipes.core.map.CollisionFreeMap; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CollisionFreeMapIT { + + private MiniFluo miniFluo; + + private CollisionFreeMap<String, Long> wcMap; + + static final String MAP_ID = "wcm"; + + @Before + public void setUpFluo() throws Exception { + FileUtils.deleteQuietly(new File("target/mini")); + + FluoConfiguration props = new FluoConfiguration(); + props.setApplicationName("eqt"); + props.setWorkerThreads(20); + props.setMiniDataDir("target/mini"); + + props.addObserver(new ObserverSpecification(DocumentObserver.class.getName())); + + SimpleSerializer.setSerializer(props, TestSerializer.class); + + CollisionFreeMap.configure(props, new CollisionFreeMap.Options(MAP_ID, WordCountCombiner.class, + WordCountObserver.class, String.class, Long.class, 17)); + + miniFluo = FluoFactory.newMiniFluo(props); + + wcMap = CollisionFreeMap.getInstance(MAP_ID, props.getAppConfiguration()); + } + + @After + public void tearDownFluo() throws Exception { + if (miniFluo != null) { + miniFluo.close(); + } + } + + private Map<String, Long> getComputedWordCounts(FluoClient fc) { + Map<String, Long> counts = new HashMap<>(); + + try (Snapshot snap = fc.newSnapshot()) { + + CellScanner scanner = snap.scanner().over(Span.prefix("iwc:")).build(); + + for (RowColumnValue rcv : scanner) { + String[] tokens = rcv.getsRow().split(":"); + String word = tokens[2]; + Long count = Long.valueOf(tokens[1]); + + Assert.assertFalse("Word seen twice in index " + word, counts.containsKey(word)); + + counts.put(word, count); + } + } + + return counts; + } + + private Map<String, Long> computeWordCounts(FluoClient fc) { + Map<String, Long> counts = new HashMap<>(); + + try (Snapshot snap = fc.newSnapshot()) { + + + CellScanner scanner = + snap.scanner().over(Span.prefix("d:")).fetch(new Column("content", "current")).build(); + + for (RowColumnValue rcv : scanner) { + String[] words = rcv.getsValue().split("\\s+"); + for (String word : words) { + if (word.isEmpty()) { + continue; + } + + counts.merge(word, 1L, Long::sum); + } + } + } + + return counts; + } + + @Test + public void testGet() { + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + try (Transaction tx = fc.newTransaction()) { + wcMap.update(tx, ImmutableMap.of("cat", 2L, "dog", 5L)); + tx.commit(); + } + + try (Transaction tx = fc.newTransaction()) { + wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L)); + tx.commit(); + } + + try (Transaction tx = fc.newTransaction()) { + wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", 1L, "fish", 2L)); + tx.commit(); + } + + // try reading possibly before observer combines... will either see outstanding updates or a + // current value + try (Snapshot snap = fc.newSnapshot()) { + Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); + Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog")); + Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); + } + + miniFluo.waitForObservers(); + + // in this case there should be no updates, only a current value + try (Snapshot snap = fc.newSnapshot()) { + Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); + Assert.assertEquals((Long) 7L, wcMap.get(snap, "dog")); + Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); + } + + Map<String, Long> expectedCounts = new HashMap<>(); + expectedCounts.put("cat", 4L); + expectedCounts.put("dog", 7L); + expectedCounts.put("fish", 2L); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (Transaction tx = fc.newTransaction()) { + wcMap.update(tx, ImmutableMap.of("cat", 1L, "dog", -7L)); + tx.commit(); + } + + // there may be outstanding update and a current value for the key in this case + try (Snapshot snap = fc.newSnapshot()) { + Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat")); + Assert.assertNull(wcMap.get(snap, "dog")); + Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); + } + + miniFluo.waitForObservers(); + + try (Snapshot snap = fc.newSnapshot()) { + Assert.assertEquals((Long) 5L, wcMap.get(snap, "cat")); + Assert.assertNull(wcMap.get(snap, "dog")); + Assert.assertEquals((Long) 2L, wcMap.get(snap, "fish")); + } + + expectedCounts.put("cat", 5L); + expectedCounts.remove("dog"); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + } + } + + @Test + public void testBasic() { + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0001", "dog cat")); + loader.execute(new DocumentLoader("0002", "cat hamster")); + loader.execute(new DocumentLoader("0003", "milk bread cat food")); + loader.execute(new DocumentLoader("0004", "zoo big cat")); + } + + miniFluo.waitForObservers(); + + try (Snapshot snap = fc.newSnapshot()) { + Assert.assertEquals((Long) 4L, wcMap.get(snap, "cat")); + Assert.assertEquals((Long) 1L, wcMap.get(snap, "milk")); + } + + Map<String, Long> expectedCounts = new HashMap<>(); + expectedCounts.put("dog", 1L); + expectedCounts.put("cat", 4L); + expectedCounts.put("hamster", 1L); + expectedCounts.put("milk", 1L); + expectedCounts.put("bread", 1L); + expectedCounts.put("food", 1L); + expectedCounts.put("zoo", 1L); + expectedCounts.put("big", 1L); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0001", "dog feline")); + } + + miniFluo.waitForObservers(); + + expectedCounts.put("cat", 3L); + expectedCounts.put("feline", 1L); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + // swap contents of two documents... should not change doc counts + loader.execute(new DocumentLoader("0003", "zoo big cat")); + loader.execute(new DocumentLoader("0004", "milk bread cat food")); + } + + miniFluo.waitForObservers(); + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0003", "zoo big cat")); + loader.execute(new DocumentLoader("0004", "zoo big cat")); + } + + miniFluo.waitForObservers(); + + expectedCounts.put("zoo", 2L); + expectedCounts.put("big", 2L); + expectedCounts.remove("milk"); + expectedCounts.remove("bread"); + expectedCounts.remove("food"); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0002", "cat cat hamster hamster")); + } + + miniFluo.waitForObservers(); + + expectedCounts.put("cat", 4L); + expectedCounts.put("hamster", 2L); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + loader.execute(new DocumentLoader("0002", "dog hamster")); + } + + miniFluo.waitForObservers(); + + expectedCounts.put("cat", 2L); + expectedCounts.put("hamster", 1L); + expectedCounts.put("dog", 2L); + + Assert.assertEquals(expectedCounts, getComputedWordCounts(fc)); + } + } + + private static String randDocId(Random rand) { + return String.format("%04d", rand.nextInt(5000)); + } + + private static String randomDocument(Random rand) { + StringBuilder sb = new StringBuilder(); + + String sep = ""; + for (int i = 2; i < rand.nextInt(18); i++) { + sb.append(sep); + sep = " "; + sb.append(String.format("%05d", rand.nextInt(50000))); + } + + return sb.toString(); + } + + public void diff(Map<String, Long> m1, Map<String, Long> m2) { + for (String word : m1.keySet()) { + Long v1 = m1.get(word); + Long v2 = m2.get(word); + + if (v2 == null || !v1.equals(v2)) { + System.out.println(word + " " + v1 + " != " + v2); + } + } + + for (String word : m2.keySet()) { + Long v1 = m1.get(word); + Long v2 = m2.get(word); + + if (v1 == null) { + System.out.println(word + " null != " + v2); + } + } + } + + @Test + public void testStress() throws Exception { + try (FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { + Random rand = new Random(); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + for (int i = 0; i < 1000; i++) { + loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand))); + } + } + + miniFluo.waitForObservers(); + assertWordCountsEqual(fc); + + try (LoaderExecutor loader = fc.newLoaderExecutor()) { + for (int i = 0; i < 100; i++) { + loader.execute(new DocumentLoader(randDocId(rand), randomDocument(rand))); + } + } + + miniFluo.waitForObservers(); + assertWordCountsEqual(fc); + } + } + + private void assertWordCountsEqual(FluoClient fc) { + Map<String, Long> expected = computeWordCounts(fc); + Map<String, Long> actual = getComputedWordCounts(fc); + if (!expected.equals(actual)) { + diff(expected, actual); + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java new file mode 100644 index 0000000..c68b3b1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentLoader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import org.apache.fluo.recipes.core.types.TypedLoader; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; + +public class DocumentLoader extends TypedLoader { + + String docid; + String doc; + + DocumentLoader(String docid, String doc) { + this.docid = docid; + this.doc = doc; + } + + @Override + public void load(TypedTransactionBase tx, Context context) throws Exception { + tx.mutate().row("d:" + docid).fam("content").qual("new").set(doc); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java new file mode 100644 index 0000000..c469151 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.recipes.core.map.CollisionFreeMap; +import org.apache.fluo.recipes.core.types.TypedObserver; +import org.apache.fluo.recipes.core.types.TypedTransactionBase; + +public class DocumentObserver extends TypedObserver { + + CollisionFreeMap<String, Long> wcm; + + @Override + public void init(Context context) throws Exception { + wcm = CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, context.getAppConfiguration()); + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(new Column("content", "new"), NotificationType.STRONG); + } + + static Map<String, Long> getWordCounts(String doc) { + Map<String, Long> wordCounts = new HashMap<>(); + String[] words = doc.split(" "); + for (String word : words) { + if (word.isEmpty()) { + continue; + } + wordCounts.merge(word, 1L, Long::sum); + } + + return wordCounts; + } + + @Override + public void process(TypedTransactionBase tx, Bytes row, Column col) { + String newContent = tx.get().row(row).col(col).toString(); + String currentContent = tx.get().row(row).fam("content").qual("current").toString(""); + + Map<String, Long> newWordCounts = getWordCounts(newContent); + Map<String, Long> currentWordCounts = getWordCounts(currentContent); + + Map<String, Long> changes = calculateChanges(newWordCounts, currentWordCounts); + + wcm.update(tx, changes); + + tx.mutate().row(row).fam("content").qual("current").set(newContent); + } + + private static Map<String, Long> calculateChanges(Map<String, Long> newCounts, + Map<String, Long> currCounts) { + Map<String, Long> changes = new HashMap<>(); + + // guava Maps class + MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts); + + // compute the diffs for words that changed + changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue() + - vDiff.leftValue())); + + // add all new words + changes.putAll(diffs.entriesOnlyOnRight()); + + // subtract all words no longer present + changes.putAll(Maps.transformValues(diffs.entriesOnlyOnLeft(), l -> l * -1)); + + return changes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java new file mode 100644 index 0000000..0bc5ff6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/TestSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; + +public class TestSerializer implements SimpleSerializer { + + @Override + public <T> byte[] serialize(T obj) { + return obj.toString().getBytes(); + } + + @SuppressWarnings("unchecked") + @Override + public <T> T deserialize(byte[] serObj, Class<T> clazz) { + if (clazz.equals(Long.class)) { + return (T) Long.valueOf(new String(serObj)); + } + + if (clazz.equals(String.class)) { + return (T) new String(serObj); + } + + throw new IllegalArgumentException(); + } + + @Override + public void init(SimpleConfiguration appConfig) {} + +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java new file mode 100644 index 0000000..df1389f --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountCombiner.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.recipes.core.map.Combiner; + +public class WordCountCombiner implements Combiner<String, Long> { + @Override + public Optional<Long> combine(String key, Iterator<Long> updates) { + long sum = 0; + + while (updates.hasNext()) { + sum += updates.next(); + } + + if (sum == 0) { + return Optional.empty(); + } else { + return Optional.of(sum); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java new file mode 100644 index 0000000..1230d99 --- /dev/null +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.map.it; + +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.recipes.core.map.Update; +import org.apache.fluo.recipes.core.map.UpdateObserver; + +public class WordCountObserver extends UpdateObserver<String, Long> { + + @Override + public void updatingValues(TransactionBase tx, Iterator<Update<String, Long>> updates) { + + while (updates.hasNext()) { + Update<String, Long> update = updates.next(); + + Optional<Long> oldVal = update.getOldValue(); + Optional<Long> newVal = update.getNewValue(); + + if (oldVal.isPresent()) { + String oldRow = String.format("iwc:%09d:%s", oldVal.get(), update.getKey()); + tx.delete(Bytes.of(oldRow), new Column(Bytes.EMPTY, Bytes.EMPTY)); + } + + if (newVal.isPresent()) { + String newRow = String.format("iwc:%09d:%s", newVal.get(), update.getKey()); + tx.set(Bytes.of(newRow), new Column(Bytes.EMPTY, Bytes.EMPTY), Bytes.EMPTY); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/pom.xml ---------------------------------------------------------------------- diff --git a/modules/kryo/pom.xml b/modules/kryo/pom.xml index fe00f96..eaed2ee 100644 --- a/modules/kryo/pom.xml +++ b/modules/kryo/pom.xml @@ -17,7 +17,7 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-parent</artifactId> + <artifactId>fluo-recipes</artifactId> <version>1.0.0-incubating-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java ---------------------------------------------------------------------- diff --git a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java index aff9e7c..e8d7997 100644 --- a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java +++ b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java @@ -110,7 +110,7 @@ public class KryoSimplerSerializer implements SimpleSerializer, Serializable { public KryoSimplerSerializer() {} /** - * Can call this method to create a serializer w/o calling {@link #init(Configuration)} + * Can call this method to create a serializer w/o calling {@link #init(SimpleConfiguration)} */ public KryoSimplerSerializer(KryoFactory factory) { factoryType = factory.getClass().getName(); http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java ---------------------------------------------------------------------- diff --git a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java b/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java deleted file mode 100644 index 95a26a9..0000000 --- a/modules/kryo/src/test/java/org/apache/fluo/recipes/core/serialization/KryoSimpleSerializerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.core.serialization; - -import com.esotericsoftware.kryo.pool.KryoFactory; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.kryo.KryoSimplerSerializer; -import org.junit.Assert; -import org.junit.Test; - -public class KryoSimpleSerializerTest { - - private static final KryoFactory KRYO_FACTORY = new KryoSimplerSerializer.DefaultFactory(); - - public void testColumn() { - SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY); - Column before = new Column("a", "b"); - byte[] barray = serializer.serialize(before); - Column after = serializer.deserialize(barray, Column.class); - Assert.assertEquals(before, after); - } - - @Test - public void testBytes() { - SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY); - Bytes before = Bytes.of("test"); - byte[] barray = serializer.serialize(before); - Bytes after = serializer.deserialize(barray, Bytes.class); - Assert.assertEquals(before, after); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java ---------------------------------------------------------------------- diff --git a/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java b/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java new file mode 100644 index 0000000..d2306d2 --- /dev/null +++ b/modules/kryo/src/test/java/org/apache/fluo/recipes/kryo/KryoSimpleSerializerTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.kryo; + +import com.esotericsoftware.kryo.pool.KryoFactory; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.recipes.core.serialization.SimpleSerializer; +import org.apache.fluo.recipes.kryo.KryoSimplerSerializer; +import org.junit.Assert; +import org.junit.Test; + +public class KryoSimpleSerializerTest { + + private static final KryoFactory KRYO_FACTORY = new KryoSimplerSerializer.DefaultFactory(); + + public void testColumn() { + SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY); + Column before = new Column("a", "b"); + byte[] barray = serializer.serialize(before); + Column after = serializer.deserialize(barray, Column.class); + Assert.assertEquals(before, after); + } + + @Test + public void testBytes() { + SimpleSerializer serializer = new KryoSimplerSerializer(KRYO_FACTORY); + Bytes before = Bytes.of("test"); + byte[] barray = serializer.serialize(before); + Bytes after = serializer.deserialize(barray, Bytes.class); + Assert.assertEquals(before, after); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index 1a0969e..5cb80a4 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -17,7 +17,7 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-parent</artifactId> + <artifactId>fluo-recipes</artifactId> <version>1.0.0-incubating-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java deleted file mode 100644 index 4ad25e2..0000000 --- a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/FluoSparkHelperIT.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.spark; - -import java.util.List; - -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.fluo.recipes.test.FluoITHelper; -import org.apache.hadoop.fs.Path; -import org.apache.spark.api.java.JavaSparkContext; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -public class FluoSparkHelperIT extends AccumuloExportITBase { - - static JavaSparkContext ctx; - - public FluoSparkHelperIT() { - super(false); - } - - @BeforeClass - public static void setupIT() { - ctx = FluoSparkTestUtil.newSparkContext("fluo-spark-helper"); - } - - @AfterClass - public static void teardownIT() { - ctx.stop(); - } - - private List<RowColumnValue> getData() { - return FluoITHelper.parse("arow|acf|acq|aval", "brow|bcf|bcq|bval", "crow|ccf|ccq|cval"); - } - - @Test - public void testAccumuloBulkImport() throws Exception { - FluoSparkHelper fsh = - new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/")); - List<RowColumnValue> expected = getData(); - final String accumuloTable = "table1"; - getAccumuloConnector().tableOperations().create(accumuloTable); - fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), - accumuloTable, new FluoSparkHelper.BulkImportOptions()); - Assert.assertTrue(FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable, - expected)); - } - - @Test - public void testFluoBulkImport() throws Exception { - FluoSparkHelper fsh = - new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/")); - List<RowColumnValue> expected = getData(); - fsh.bulkImportRcvToFluo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), - new FluoSparkHelper.BulkImportOptions()); - - try (MiniFluo miniFluo = FluoFactory.newMiniFluo(getFluoConfiguration())) { - Assert.assertTrue(FluoITHelper.verifyFluoTable(getFluoConfiguration(), expected)); - - List<RowColumnValue> actualRead = FluoSparkHelper.toRcvRDD(fsh.readFromFluo(ctx)).collect(); - Assert.assertTrue(FluoITHelper.verifyRowColumnValues(expected, actualRead)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java new file mode 100644 index 0000000..c2d512d --- /dev/null +++ b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.spark.it; + +import java.util.List; + +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.fluo.recipes.spark.FluoSparkHelper; +import org.apache.fluo.recipes.spark.FluoSparkTestUtil; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.fluo.recipes.test.FluoITHelper; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FluoSparkHelperIT extends AccumuloExportITBase { + + static JavaSparkContext ctx; + + public FluoSparkHelperIT() { + super(false); + } + + @BeforeClass + public static void setupIT() { + ctx = FluoSparkTestUtil.newSparkContext("fluo-spark-helper"); + } + + @AfterClass + public static void teardownIT() { + ctx.stop(); + } + + private List<RowColumnValue> getData() { + return FluoITHelper.parse("arow|acf|acq|aval", "brow|bcf|bcq|bval", "crow|ccf|ccq|cval"); + } + + @Test + public void testAccumuloBulkImport() throws Exception { + FluoSparkHelper fsh = + new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/")); + List<RowColumnValue> expected = getData(); + final String accumuloTable = "table1"; + getAccumuloConnector().tableOperations().create(accumuloTable); + fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), + accumuloTable, new FluoSparkHelper.BulkImportOptions()); + Assert.assertTrue(FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable, + expected)); + } + + @Test + public void testFluoBulkImport() throws Exception { + FluoSparkHelper fsh = + new FluoSparkHelper(getFluoConfiguration(), ctx.hadoopConfiguration(), new Path("/tmp/")); + List<RowColumnValue> expected = getData(); + fsh.bulkImportRcvToFluo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), + new FluoSparkHelper.BulkImportOptions()); + + try (MiniFluo miniFluo = FluoFactory.newMiniFluo(getFluoConfiguration())) { + Assert.assertTrue(FluoITHelper.verifyFluoTable(getFluoConfiguration(), expected)); + + List<RowColumnValue> actualRead = FluoSparkHelper.toRcvRDD(fsh.readFromFluo(ctx)).collect(); + Assert.assertTrue(FluoITHelper.verifyRowColumnValues(expected, actualRead)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/test/pom.xml ---------------------------------------------------------------------- diff --git a/modules/test/pom.xml b/modules/test/pom.xml index 8bfb466..86f5fbd 100644 --- a/modules/test/pom.xml +++ b/modules/test/pom.xml @@ -17,7 +17,7 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-parent</artifactId> + <artifactId>fluo-recipes</artifactId> <version>1.0.0-incubating-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java ---------------------------------------------------------------------- diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java index ba7b1e6..acc13b7 100644 --- a/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java +++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/AccumuloExportITBase.java @@ -31,7 +31,6 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.mini.MiniFluo; import org.apache.fluo.recipes.accumulo.ops.TableOperations; -import org.apache.fluo.recipes.core.common.TableOptimizations; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/10ccb16a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5c926c7..5be0541 100644 --- a/pom.xml +++ b/pom.xml @@ -20,8 +20,7 @@ <artifactId>fluo-parent</artifactId> <version>1-incubating</version> </parent> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-parent</artifactId> + <artifactId>fluo-recipes</artifactId> <version>1.0.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <name>Fluo Recipes Parent</name> @@ -51,7 +50,7 @@ <url>https://github.com/apache/incubator-fluo-recipes/issues</url> </issueManagement> <properties> - <accumulo.version>1.6.5</accumulo.version> + <accumulo.version>1.6.6</accumulo.version> <curator.version>2.7.1</curator.version> <findbugs.maxRank>13</findbugs.maxRank> <fluo.version>1.0.0-incubating</fluo.version> @@ -225,16 +224,6 @@ <pluginManagement> <plugins> <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludes> - <exclude>README.md</exclude> - <exclude>docs/**.md</exclude> - </excludes> - </configuration> - </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <configuration> @@ -244,29 +233,6 @@ </systemPropertyVariables> </configuration> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <archive> - <manifest> - <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> - <addDefaultImplementationEntries>true</addDefaultImplementationEntries> - </manifest> - <manifestEntries> - <!-- sealing breaks ITs with shaded jar, which is used by this example --> - <Sealed>false</Sealed> - </manifestEntries> - </archive> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <configuration> - <excludePackageNames>*.impl.*</excludePackageNames> - </configuration> - </plugin> </plugins> </pluginManagement> <plugins>