http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java deleted file mode 100644 index 7bebe06..0000000 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ /dev/null @@ -1,1836 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.*; -import java.nio.channels.FileChannel; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.*; - -import scala.Tuple2; -import scala.Tuple3; -import scala.Tuple4; -import scala.collection.JavaConverters; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.base.Throwables; -import com.google.common.io.Files; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.input.PortableDataStream; -import org.apache.spark.partial.BoundedDouble; -import org.apache.spark.partial.PartialResult; -import org.apache.spark.rdd.RDD; -import org.apache.spark.serializer.KryoSerializer; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.util.LongAccumulator; -import org.apache.spark.util.StatCounter; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaSparkContext sc; - private transient File tempDir; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - } - - @SuppressWarnings("unchecked") - @Test - public void sparkContextUnion() { - // Union of non-specialized JavaRDDs - List<String> strings = Arrays.asList("Hello", "World"); - JavaRDD<String> s1 = sc.parallelize(strings); - JavaRDD<String> s2 = sc.parallelize(strings); - // Varargs - JavaRDD<String> sUnion = sc.union(s1, s2); - assertEquals(4, sUnion.count()); - // List - List<JavaRDD<String>> list = new ArrayList<>(); - list.add(s2); - sUnion = sc.union(s1, list); - assertEquals(4, sUnion.count()); - - // Union of JavaDoubleRDDs - List<Double> doubles = Arrays.asList(1.0, 2.0); - JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD dUnion = sc.union(d1, d2); - assertEquals(4, dUnion.count()); - - // Union of JavaPairRDDs - List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); - pairs.add(new Tuple2<>(1, 2)); - pairs.add(new Tuple2<>(3, 4)); - JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2); - assertEquals(4, pUnion.count()); - } - - @SuppressWarnings("unchecked") - @Test - public void intersection() { - List<Integer> ints1 = Arrays.asList(1, 10, 2, 3, 4, 5); - List<Integer> ints2 = Arrays.asList(1, 6, 2, 3, 7, 8); - JavaRDD<Integer> s1 = sc.parallelize(ints1); - JavaRDD<Integer> s2 = sc.parallelize(ints2); - - JavaRDD<Integer> intersections = s1.intersection(s2); - assertEquals(3, intersections.count()); - - JavaRDD<Integer> empty = sc.emptyRDD(); - JavaRDD<Integer> emptyIntersection = empty.intersection(s2); - assertEquals(0, emptyIntersection.count()); - - List<Double> doubles = Arrays.asList(1.0, 2.0); - JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD dIntersection = d1.intersection(d2); - assertEquals(2, dIntersection.count()); - - List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); - pairs.add(new Tuple2<>(1, 2)); - pairs.add(new Tuple2<>(3, 4)); - JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> pIntersection = p1.intersection(p2); - assertEquals(2, pIntersection.count()); - } - - @Test - public void sample() { - List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - JavaRDD<Integer> rdd = sc.parallelize(ints); - // the seeds here are "magic" to make this work out nicely - JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 8); - assertEquals(2, sample20.count()); - JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 2); - assertEquals(2, sample20WithoutReplacement.count()); - } - - @Test - public void randomSplit() { - List<Integer> ints = new ArrayList<>(1000); - for (int i = 0; i < 1000; i++) { - ints.add(i); - } - JavaRDD<Integer> rdd = sc.parallelize(ints); - JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 31); - // the splits aren't perfect -- not enough data for them to be -- just check they're about right - assertEquals(3, splits.length); - long s0 = splits[0].count(); - long s1 = splits[1].count(); - long s2 = splits[2].count(); - assertTrue(s0 + " not within expected range", s0 > 150 && s0 < 250); - assertTrue(s1 + " not within expected range", s1 > 250 && s0 < 350); - assertTrue(s2 + " not within expected range", s2 > 430 && s2 < 570); - } - - @Test - public void sortByKey() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); - pairs.add(new Tuple2<>(0, 4)); - pairs.add(new Tuple2<>(3, 2)); - pairs.add(new Tuple2<>(-1, 1)); - - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - - // Default comparator - JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey(); - assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); - List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); - assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); - assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); - - // Custom comparator - sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false); - assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); - sortedPairs = sortedRDD.collect(); - assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); - assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); - } - - @SuppressWarnings("unchecked") - @Test - public void repartitionAndSortWithinPartitions() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); - pairs.add(new Tuple2<>(0, 5)); - pairs.add(new Tuple2<>(3, 8)); - pairs.add(new Tuple2<>(2, 6)); - pairs.add(new Tuple2<>(0, 8)); - pairs.add(new Tuple2<>(3, 8)); - pairs.add(new Tuple2<>(1, 3)); - - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - - Partitioner partitioner = new Partitioner() { - @Override - public int numPartitions() { - return 2; - } - @Override - public int getPartition(Object key) { - return (Integer) key % 2; - } - }; - - JavaPairRDD<Integer, Integer> repartitioned = - rdd.repartitionAndSortWithinPartitions(partitioner); - assertTrue(repartitioned.partitioner().isPresent()); - assertEquals(repartitioned.partitioner().get(), partitioner); - List<List<Tuple2<Integer, Integer>>> partitions = repartitioned.glom().collect(); - assertEquals(partitions.get(0), - Arrays.asList(new Tuple2<>(0, 5), new Tuple2<>(0, 8), new Tuple2<>(2, 6))); - assertEquals(partitions.get(1), - Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8))); - } - - @Test - public void emptyRDD() { - JavaRDD<String> rdd = sc.emptyRDD(); - assertEquals("Empty RDD shouldn't have any values", 0, rdd.count()); - } - - @Test - public void sortBy() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<>(); - pairs.add(new Tuple2<>(0, 4)); - pairs.add(new Tuple2<>(3, 2)); - pairs.add(new Tuple2<>(-1, 1)); - - JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs); - - // compare on first value - JavaRDD<Tuple2<Integer, Integer>> sortedRDD = - rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> t) { - return t._1(); - } - }, true, 2); - - assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); - List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); - assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); - assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); - - // compare on second value - sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> t) { - return t._2(); - } - }, true, 2); - assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); - sortedPairs = sortedRDD.collect(); - assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1)); - assertEquals(new Tuple2<>(0, 4), sortedPairs.get(2)); - } - - @Test - public void foreach() { - final LongAccumulator accum = sc.sc().longAccumulator(); - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - accum.add(1); - } - }); - assertEquals(2, accum.value().intValue()); - } - - @Test - public void foreachPartition() { - final LongAccumulator accum = sc.sc().longAccumulator(); - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreachPartition(new VoidFunction<Iterator<String>>() { - @Override - public void call(Iterator<String> iter) { - while (iter.hasNext()) { - iter.next(); - accum.add(1); - } - } - }); - assertEquals(2, accum.value().intValue()); - } - - @Test - public void toLocalIterator() { - List<Integer> correct = Arrays.asList(1, 2, 3, 4); - JavaRDD<Integer> rdd = sc.parallelize(correct); - List<Integer> result = Lists.newArrayList(rdd.toLocalIterator()); - assertEquals(correct, result); - } - - @Test - public void zipWithUniqueId() { - List<Integer> dataArray = Arrays.asList(1, 2, 3, 4); - JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId(); - JavaRDD<Long> indexes = zip.values(); - assertEquals(4, new HashSet<>(indexes.collect()).size()); - } - - @Test - public void zipWithIndex() { - List<Integer> dataArray = Arrays.asList(1, 2, 3, 4); - JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex(); - JavaRDD<Long> indexes = zip.values(); - List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L); - assertEquals(correctIndexes, indexes.collect()); - } - - @SuppressWarnings("unchecked") - @Test - public void lookup() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Apples", "Fruit"), - new Tuple2<>("Oranges", "Fruit"), - new Tuple2<>("Oranges", "Citrus") - )); - assertEquals(2, categories.lookup("Oranges").size()); - assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0))); - } - - @Test - public void groupBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() { - @Override - public Boolean call(Integer x) { - return x % 2 == 0; - } - }; - JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); - assertEquals(2, oddsAndEvens.count()); - assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - assertEquals(2, oddsAndEvens.count()); - assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @Test - public void groupByOnPairRDD() { - // Regression test for SPARK-4459 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Tuple2<Integer, Integer>, Boolean> areOdd = - new Function<Tuple2<Integer, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<Integer, Integer> x) { - return (x._1() % 2 == 0) && (x._2() % 2 == 0); - } - }; - JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); - JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd); - assertEquals(2, oddsAndEvens.count()); - assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = pairRDD.groupBy(areOdd, 1); - assertEquals(2, oddsAndEvens.count()); - assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @SuppressWarnings("unchecked") - @Test - public void keyByOnPairRDD() { - // Regression test for SPARK-4459 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Tuple2<Integer, Integer>, String> sumToString = - new Function<Tuple2<Integer, Integer>, String>() { - @Override - public String call(Tuple2<Integer, Integer> x) { - return String.valueOf(x._1() + x._2()); - } - }; - JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); - JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString); - assertEquals(7, keyed.count()); - assertEquals(1, (long) keyed.lookup("2").get(0)._1()); - } - - @SuppressWarnings("unchecked") - @Test - public void cogroup() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Apples", "Fruit"), - new Tuple2<>("Oranges", "Fruit"), - new Tuple2<>("Oranges", "Citrus") - )); - JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", 2), - new Tuple2<>("Apples", 3) - )); - JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = - categories.cogroup(prices); - assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); - assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); - - cogrouped.collect(); - } - - @SuppressWarnings("unchecked") - @Test - public void cogroup3() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Apples", "Fruit"), - new Tuple2<>("Oranges", "Fruit"), - new Tuple2<>("Oranges", "Citrus") - )); - JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", 2), - new Tuple2<>("Apples", 3) - )); - JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", 21), - new Tuple2<>("Apples", 42) - )); - - JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped = - categories.cogroup(prices, quantities); - assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); - assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); - assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); - - - cogrouped.collect(); - } - - @SuppressWarnings("unchecked") - @Test - public void cogroup4() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Apples", "Fruit"), - new Tuple2<>("Oranges", "Fruit"), - new Tuple2<>("Oranges", "Citrus") - )); - JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", 2), - new Tuple2<>("Apples", 3) - )); - JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", 21), - new Tuple2<>("Apples", 42) - )); - JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("Oranges", "BR"), - new Tuple2<>("Apples", "US") - )); - - JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, - Iterable<String>>> cogrouped = categories.cogroup(prices, quantities, countries); - assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); - assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); - assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); - assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4())); - - cogrouped.collect(); - } - - @SuppressWarnings("unchecked") - @Test - public void leftOuterJoin() { - JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 2), - new Tuple2<>(2, 1), - new Tuple2<>(3, 1) - )); - JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 'x'), - new Tuple2<>(2, 'y'), - new Tuple2<>(2, 'z'), - new Tuple2<>(4, 'w') - )); - List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - assertEquals(5, joined.size()); - Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter( - new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() { - @Override - public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) { - return !tup._2()._2().isPresent(); - } - }).first(); - assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; - - int sum = rdd.fold(0, add); - assertEquals(33, sum); - - sum = rdd.reduce(add); - assertEquals(33, sum); - } - - @Test - public void treeReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; - for (int depth = 1; depth <= 10; depth++) { - int sum = rdd.treeReduce(add, depth); - assertEquals(-5, sum); - } - } - - @Test - public void treeAggregate() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; - for (int depth = 1; depth <= 10; depth++) { - int sum = rdd.treeAggregate(0, add, add, depth); - assertEquals(-5, sum); - } - } - - @SuppressWarnings("unchecked") - @Test - public void aggregateByKey() { - JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs( - Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(5, 1), - new Tuple2<>(5, 3)), 2); - - Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(), - new Function2<Set<Integer>, Integer, Set<Integer>>() { - @Override - public Set<Integer> call(Set<Integer> a, Integer b) { - a.add(b); - return a; - } - }, - new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() { - @Override - public Set<Integer> call(Set<Integer> a, Set<Integer> b) { - a.addAll(b); - return a; - } - }).collectAsMap(); - assertEquals(3, sets.size()); - assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1)); - assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3)); - assertEquals(new HashSet<>(Arrays.asList(1, 3)), sets.get(5)); - } - - @SuppressWarnings("unchecked") - @Test - public void foldByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - assertEquals(1, sums.lookup(1).get(0).intValue()); - assertEquals(2, sums.lookup(2).get(0).intValue()); - assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @SuppressWarnings("unchecked") - @Test - public void reduceByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - assertEquals(1, counts.lookup(1).get(0).intValue()); - assertEquals(2, counts.lookup(2).get(0).intValue()); - assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map<Integer, Integer> localCounts = counts.collectAsMap(); - assertEquals(1, localCounts.get(1).intValue()); - assertEquals(2, localCounts.get(2).intValue()); - assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - assertEquals(1, localCounts.get(1).intValue()); - assertEquals(2, localCounts.get(2).intValue()); - assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void approximateResults() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Map<Integer, Long> countsByValue = rdd.countByValue(); - assertEquals(2, countsByValue.get(1).longValue()); - assertEquals(1, countsByValue.get(13).longValue()); - - PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1); - Map<Integer, BoundedDouble> finalValue = approx.getFinalValue(); - assertEquals(2.0, finalValue.get(1).mean(), 0.01); - assertEquals(1.0, finalValue.get(13).mean(), 0.01); - } - - @Test - public void take() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - assertEquals(1, rdd.first().intValue()); - rdd.take(2); - rdd.takeSample(false, 2, 42); - } - - @Test - public void isEmpty() { - assertTrue(sc.emptyRDD().isEmpty()); - assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty()); - assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty()); - assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter( - new Function<Integer,Boolean>() { - @Override - public Boolean call(Integer i) { - return i < 0; - } - }).isEmpty()); - assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter( - new Function<Integer, Boolean>() { - @Override - public Boolean call(Integer i) { - return i > 1; - } - }).isEmpty()); - } - - @Test - public void cartesian() { - JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World")); - JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD); - assertEquals(new Tuple2<>("Hello", 1.0), cartesian.first()); - } - - @Test - public void javaDoubleRDD() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - JavaDoubleRDD distinct = rdd.distinct(); - assertEquals(5, distinct.count()); - JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() { - @Override - public Boolean call(Double x) { - return x > 2.0; - } - }); - assertEquals(3, filter.count()); - JavaDoubleRDD union = rdd.union(rdd); - assertEquals(12, union.count()); - union = union.cache(); - assertEquals(12, union.count()); - - assertEquals(20, rdd.sum(), 0.01); - StatCounter stats = rdd.stats(); - assertEquals(20, stats.sum(), 0.01); - assertEquals(20/6.0, rdd.mean(), 0.01); - assertEquals(20/6.0, rdd.mean(), 0.01); - assertEquals(6.22222, rdd.variance(), 0.01); - assertEquals(rdd.variance(), rdd.popVariance(), 1e-14); - assertEquals(7.46667, rdd.sampleVariance(), 0.01); - assertEquals(2.49444, rdd.stdev(), 0.01); - assertEquals(rdd.stdev(), rdd.popStdev(), 1e-14); - assertEquals(2.73252, rdd.sampleStdev(), 0.01); - - rdd.first(); - rdd.take(5); - } - - @Test - public void javaDoubleRDDHistoGram() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - // Test using generated buckets - Tuple2<double[], long[]> results = rdd.histogram(2); - double[] expected_buckets = {1.0, 2.5, 4.0}; - long[] expected_counts = {2, 2}; - assertArrayEquals(expected_buckets, results._1(), 0.1); - assertArrayEquals(expected_counts, results._2()); - // Test with provided buckets - long[] histogram = rdd.histogram(expected_buckets); - assertArrayEquals(expected_counts, histogram); - // SPARK-5744 - assertArrayEquals( - new long[] {0}, - sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0})); - } - - private static class DoubleComparator implements Comparator<Double>, Serializable { - @Override - public int compare(Double o1, Double o2) { - return o1.compareTo(o2); - } - } - - @Test - public void max() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double max = rdd.max(new DoubleComparator()); - assertEquals(4.0, max, 0.001); - } - - @Test - public void min() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double max = rdd.min(new DoubleComparator()); - assertEquals(1.0, max, 0.001); - } - - @Test - public void naturalMax() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double max = rdd.max(); - assertEquals(4.0, max, 0.0); - } - - @Test - public void naturalMin() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double max = rdd.min(); - assertEquals(1.0, max, 0.0); - } - - @Test - public void takeOrdered() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2, new DoubleComparator())); - assertEquals(Arrays.asList(1.0, 2.0), rdd.takeOrdered(2)); - } - - @Test - public void top() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - List<Integer> top2 = rdd.top(2); - assertEquals(Arrays.asList(4, 3), top2); - } - - private static class AddInts implements Function2<Integer, Integer, Integer> { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - } - - @Test - public void reduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - int sum = rdd.reduce(new AddInts()); - assertEquals(10, sum); - } - - @Test - public void reduceOnJavaDoubleRDD() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double sum = rdd.reduce(new Function2<Double, Double, Double>() { - @Override - public Double call(Double v1, Double v2) { - return v1 + v2; - } - }); - assertEquals(10.0, sum, 0.001); - } - - @Test - public void fold() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - int sum = rdd.fold(0, new AddInts()); - assertEquals(10, sum); - } - - @Test - public void aggregate() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - int sum = rdd.aggregate(0, new AddInts(), new AddInts()); - assertEquals(10, sum); - } - - @Test - public void map() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }).cache(); - doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer x) { - return new Tuple2<>(x, x); - } - }).cache(); - pairs.collect(); - JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { - @Override - public String call(Integer x) { - return x.toString(); - } - }).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }); - assertEquals("Hello", words.first()); - assertEquals(11, words.count()); - - JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair( - new PairFlatMapFunction<String, String, String>() { - @Override - public Iterator<Tuple2<String, String>> call(String s) { - List<Tuple2<String, String>> pairs = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs.add(new Tuple2<>(word, word)); - } - return pairs.iterator(); - } - } - ); - assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first()); - assertEquals(11, pairsRDD.count()); - - JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() { - @Override - public Iterator<Double> call(String s) { - List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths.iterator(); - } - }); - assertEquals(5.0, doubles.first(), 0.01); - assertEquals(11, pairsRDD.count()); - } - - @SuppressWarnings("unchecked") - @Test - public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( - new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { - return Collections.singletonList(item.swap()).iterator(); - } - }); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Tuple2<String, Integer> call(Tuple2<Integer, String> item) { - return item.swap(); - } - }).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions( - new FlatMapFunction<Iterator<Integer>, Integer>() { - @Override - public Iterator<Integer> call(Iterator<Integer> iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); - } - }); - assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - - @Test - public void mapPartitionsWithIndex() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex( - new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { - @Override - public Iterator<Integer> call(Integer index, Iterator<Integer> iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); - } - }, false); - assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void getNumPartitions(){ - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2); - JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>("a", 1), - new Tuple2<>("aa", 2), - new Tuple2<>("aaa", 3) - ), 2); - assertEquals(3, rdd1.getNumPartitions()); - assertEquals(2, rdd2.getNumPartitions()); - assertEquals(2, rdd3.getNumPartitions()); - } - - @Test - public void repartition() { - // Shrinking number of partitions - JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2); - JavaRDD<Integer> repartitioned1 = in1.repartition(4); - List<List<Integer>> result1 = repartitioned1.glom().collect(); - assertEquals(4, result1.size()); - for (List<Integer> l : result1) { - assertFalse(l.isEmpty()); - } - - // Growing number of partitions - JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4); - JavaRDD<Integer> repartitioned2 = in2.repartition(2); - List<List<Integer>> result2 = repartitioned2.glom().collect(); - assertEquals(2, result2.size()); - for (List<Integer> l: result2) { - assertFalse(l.isEmpty()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void persist() { - JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY()); - assertEquals(20, doubleRDD.sum(), 0.1); - - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY()); - assertEquals("a", pairRDD.first()._2()); - - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - rdd = rdd.persist(StorageLevel.DISK_ONLY()); - assertEquals(1, rdd.first().intValue()); - } - - @Test - public void iterator() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = TaskContext$.MODULE$.empty(); - assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue()); - } - - @Test - public void glom() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - assertEquals("[1, 2]", rdd.glom().first().toString()); - } - - // File input / output tests are largely adapted from FileSuite: - - @Test - public void textFiles() throws IOException { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsTextFile(outputDir); - // Read the plain text file and check it's OK - File outputFile = new File(outputDir, "part-00000"); - String content = Files.toString(outputFile, StandardCharsets.UTF_8); - assertEquals("1\n2\n3\n4\n", content); - // Also try reading it in as a text file RDD - List<String> expected = Arrays.asList("1", "2", "3", "4"); - JavaRDD<String> readRDD = sc.textFile(outputDir); - assertEquals(expected, readRDD.collect()); - } - - @Test - public void wholeTextFiles() throws Exception { - byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); - byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8); - - String tempDirName = tempDir.getAbsolutePath(); - String path1 = new Path(tempDirName, "part-00000").toUri().getPath(); - String path2 = new Path(tempDirName, "part-00001").toUri().getPath(); - - Files.write(content1, new File(path1)); - Files.write(content2, new File(path2)); - - Map<String, String> container = new HashMap<>(); - container.put(path1, new Text(content1).toString()); - container.put(path2, new Text(content2).toString()); - - JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3); - List<Tuple2<String, String>> result = readRDD.collect(); - - for (Tuple2<String, String> res : result) { - // Note that the paths from `wholeTextFiles` are in URI format on Windows, - // for example, file:/C:/a/b/c. - assertEquals(res._2(), container.get(new Path(res._1()).toUri().getPath())); - } - } - - @Test - public void textFilesCompressed() throws IOException { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsTextFile(outputDir, DefaultCodec.class); - - // Try reading it in as a text file RDD - List<String> expected = Arrays.asList("1", "2", "3", "4"); - JavaRDD<String> readRDD = sc.textFile(outputDir); - assertEquals(expected, readRDD.collect()); - } - - @SuppressWarnings("unchecked") - @Test - public void sequenceFile() { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, - Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() { - @Override - public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) { - return new Tuple2<>(pair._1().get(), pair._2().toString()); - } - }); - assertEquals(pairs, readRDD.collect()); - } - - @Test - public void binaryFiles() throws Exception { - // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); - - String tempDirName = tempDir.getAbsolutePath(); - File file1 = new File(tempDirName + "/part-00000"); - - FileOutputStream fos1 = new FileOutputStream(file1); - - FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - channel1.close(); - JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName, 3); - List<Tuple2<String, PortableDataStream>> result = readRDD.collect(); - for (Tuple2<String, PortableDataStream> res : result) { - assertArrayEquals(content1, res._2().toArray()); - } - } - - @Test - public void binaryFilesCaching() throws Exception { - // Reusing the wholeText files example - byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8); - - String tempDirName = tempDir.getAbsolutePath(); - File file1 = new File(tempDirName + "/part-00000"); - - FileOutputStream fos1 = new FileOutputStream(file1); - - FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - channel1.close(); - - JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache(); - readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() { - @Override - public void call(Tuple2<String, PortableDataStream> pair) { - pair._2().toArray(); // force the file to read - } - }); - - List<Tuple2<String, PortableDataStream>> result = readRDD.collect(); - for (Tuple2<String, PortableDataStream> res : result) { - assertArrayEquals(content1, res._2().toArray()); - } - } - - @Test - public void binaryRecords() throws Exception { - // Reusing the wholeText files example - byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8); - int numOfCopies = 10; - String tempDirName = tempDir.getAbsolutePath(); - File file1 = new File(tempDirName + "/part-00000"); - - FileOutputStream fos1 = new FileOutputStream(file1); - - FileChannel channel1 = fos1.getChannel(); - - for (int i = 0; i < numOfCopies; i++) { - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - } - channel1.close(); - - JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName, content1.length); - assertEquals(numOfCopies,readRDD.count()); - List<byte[]> result = readRDD.collect(); - for (byte[] res : result) { - assertArrayEquals(content1, res); - } - } - - @SuppressWarnings("unchecked") - @Test - public void writeWithNewAPIHadoopFile() { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsNewAPIHadoopFile( - outputDir, IntWritable.class, Text.class, - org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = - sc.sequenceFile(outputDir, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @SuppressWarnings("unchecked") - @Test - public void readWithNewAPIHadoopFile() throws IOException { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, - org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, - IntWritable.class, Text.class, Job.getInstance().getConfiguration()); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void objectFilesOfInts() { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsObjectFile(outputDir); - // Try reading the output back as an object file - List<Integer> expected = Arrays.asList(1, 2, 3, 4); - JavaRDD<Integer> readRDD = sc.objectFile(outputDir); - assertEquals(expected, readRDD.collect()); - } - - @SuppressWarnings("unchecked") - @Test - public void objectFilesOfComplexTypes() { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.saveAsObjectFile(outputDir); - // Try reading the output back as an object file - JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir); - assertEquals(pairs, readRDD.collect()); - } - - @SuppressWarnings("unchecked") - @Test - public void hadoopFile() { - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @SuppressWarnings("unchecked") - @Test - public void hadoopFileCompressed() { - String outputDir = new File(tempDir, "output_compressed").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, - DefaultCodec.class); - - JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); - - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void zip() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }); - JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() { - @Override - public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) { - return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); - } - }; - - JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); - assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @SuppressWarnings("deprecation") - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - final Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - intAccum.add(x); - } - }); - assertEquals((Integer) 25, intAccum.value()); - - final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - doubleAccum.add((double) x); - } - }); - assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - floatAccum.add((float) x); - } - }); - assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test - public void keyBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() { - @Override - public String call(Integer t) { - return t.toString(); - } - }).collect(); - assertEquals(new Tuple2<>("1", 1), s.get(0)); - assertEquals(new Tuple2<>("2", 2), s.get(1)); - } - - @Test - public void checkpointAndComputation() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath()); - assertFalse(rdd.isCheckpointed()); - rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint - assertTrue(rdd.isCheckpointed()); - assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect()); - } - - @Test - public void checkpointAndRestore() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath()); - assertFalse(rdd.isCheckpointed()); - rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint - assertTrue(rdd.isCheckpointed()); - - assertTrue(rdd.getCheckpointFile().isPresent()); - JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); - assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); - } - - @Test - public void combineByKey() { - JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); - Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() { - @Override - public Integer call(Integer v1) { - return v1 % 3; - } - }; - Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() { - @Override - public Integer call(Integer v1) { - return v1; - } - }; - - Function2<Integer, Integer, Integer> mergeValueFunction = - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer v1, Integer v2) { - return v1 + v2; - } - }; - - JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction) - .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction); - Map<Integer, Integer> results = combinedRDD.collectAsMap(); - ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7); - assertEquals(expected, results); - - Partitioner defaultPartitioner = Partitioner.defaultPartitioner( - combinedRDD.rdd(), - JavaConverters.collectionAsScalaIterableConverter( - Collections.<RDD<?>>emptyList()).asScala().toSeq()); - combinedRDD = originalRDD.keyBy(keyFunction) - .combineByKey( - createCombinerFunction, - mergeValueFunction, - mergeValueFunction, - defaultPartitioner, - false, - new KryoSerializer(new SparkConf())); - results = combinedRDD.collectAsMap(); - assertEquals(expected, results); - } - - @SuppressWarnings("unchecked") - @Test - public void mapOnPairRDD() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); - JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair( - new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) { - return new Tuple2<>(in._2(), in._1()); - } - }); - assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - - } - - @SuppressWarnings("unchecked") - @Test - public void collectPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); - - List<Integer>[] parts = rdd1.collectPartitions(new int[] {0}); - assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[] {1, 2}); - assertEquals(Arrays.asList(3, 4), parts[0]); - assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - assertEquals(Arrays.asList(new Tuple2<>(1, 1), - new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[] {0})[0]); - - List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2}); - assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - assertEquals(Arrays.asList(new Tuple2<>(5, 1), - new Tuple2<>(6, 0), - new Tuple2<>(7, 1)), - parts2[1]); - } - - @Test - public void countApproxDistinct() { - List<Integer> arrayData = new ArrayList<>(); - int size = 100; - for (int i = 0; i < 100000; i++) { - arrayData.add(i % size); - } - JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10); - assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1); - } - - @Test - public void countApproxDistinctByKey() { - List<Tuple2<Integer, Integer>> arrayData = new ArrayList<>(); - for (int i = 10; i < 100; i++) { - for (int j = 0; j < i; j++) { - arrayData.add(new Tuple2<>(i, j)); - } - } - double relativeSD = 0.001; - JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); - List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); - for (Tuple2<Integer, Long> resItem : res) { - double count = resItem._1(); - long resCount = resItem._2(); - double error = Math.abs((resCount - count) / count); - assertTrue(error < 0.1); - } - - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair( - new PairFunction<Integer, Integer, int[]>() { - @Override - public Tuple2<Integer, int[]> call(Integer x) { - return new Tuple2<>(x, new int[]{x}); - } - }); - pairRDD.collect(); // Works fine - pairRDD.collectAsMap(); // Used to crash with ClassCastException - } - - @SuppressWarnings("unchecked") - @Test - public void collectAsMapAndSerialize() throws Exception { - JavaPairRDD<String,Integer> rdd = - sc.parallelizePairs(Arrays.asList(new Tuple2<>("foo", 1))); - Map<String,Integer> map = rdd.collectAsMap(); - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - new ObjectOutputStream(bytes).writeObject(map); - Map<String,Integer> deserializedMap = (Map<String,Integer>) - new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject(); - assertEquals(1, deserializedMap.get("foo").intValue()); - } - - @Test - @SuppressWarnings("unchecked") - public void sampleByKey() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); - Map<Integer, Double> fractions = new HashMap<>(); - fractions.put(0, 0.5); - fractions.put(1, 1.0); - JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L); - Map<Integer, Long> wrCounts = wr.countByKey(); - assertEquals(2, wrCounts.size()); - assertTrue(wrCounts.get(0) > 0); - assertTrue(wrCounts.get(1) > 0); - JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L); - Map<Integer, Long> worCounts = wor.countByKey(); - assertEquals(2, worCounts.size()); - assertTrue(worCounts.get(0) > 0); - assertTrue(worCounts.get(1) > 0); - } - - @Test - @SuppressWarnings("unchecked") - public void sampleByKeyExact() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); - Map<Integer, Double> fractions = new HashMap<>(); - fractions.put(0, 0.5); - fractions.put(1, 1.0); - JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L); - Map<Integer, Long> wrExactCounts = wrExact.countByKey(); - assertEquals(2, wrExactCounts.size()); - assertTrue(wrExactCounts.get(0) == 2); - assertTrue(wrExactCounts.get(1) == 4); - JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L); - Map<Integer, Long> worExactCounts = worExact.countByKey(); - assertEquals(2, worExactCounts.size()); - assertTrue(worExactCounts.get(0) == 2); - assertTrue(worExactCounts.get(1) == 4); - } - - private static class SomeCustomClass implements Serializable { - SomeCustomClass() { - // Intentionally left blank - } - } - - @Test - public void collectUnderlyingScalaRDD() { - List<SomeCustomClass> data = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - data.add(new SomeCustomClass()); - } - JavaRDD<SomeCustomClass> rdd = sc.parallelize(data); - SomeCustomClass[] collected = - (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect(); - assertEquals(data.size(), collected.length); - } - - private static final class BuggyMapFunction<T> implements Function<T, T> { - - @Override - public T call(T x) { - throw new IllegalStateException("Custom exception!"); - } - } - - @Test - public void collectAsync() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<List<Integer>> future = rdd.collectAsync(); - List<Integer> result = future.get(); - assertEquals(data, result); - assertFalse(future.isCancelled()); - assertTrue(future.isDone()); - assertEquals(1, future.jobIds().size()); - } - - @Test - public void takeAsync() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<List<Integer>> future = rdd.takeAsync(1); - List<Integer> result = future.get(); - assertEquals(1, result.size()); - assertEquals((Integer) 1, result.get(0)); - assertFalse(future.isCancelled()); - assertTrue(future.isDone()); - assertEquals(1, future.jobIds().size()); - } - - @Test - public void foreachAsync() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Void> future = rdd.foreachAsync( - new VoidFunction<Integer>() { - @Override - public void call(Integer integer) { - // intentionally left blank. - } - } - ); - future.get(); - assertFalse(future.isCancelled()); - assertTrue(future.isDone()); - assertEquals(1, future.jobIds().size()); - } - - @Test - public void countAsync() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Long> future = rdd.countAsync(); - long count = future.get(); - assertEquals(data.size(), count); - assertFalse(future.isCancelled()); - assertTrue(future.isDone()); - assertEquals(1, future.jobIds().size()); - } - - @Test - public void testAsyncActionCancellation() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() { - @Override - public void call(Integer integer) throws InterruptedException { - Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. - } - }); - future.cancel(true); - assertTrue(future.isCancelled()); - assertTrue(future.isDone()); - try { - future.get(2000, TimeUnit.MILLISECONDS); - fail("Expected future.get() for cancelled job to throw CancellationException"); - } catch (CancellationException ignored) { - // pass - } - } - - @Test - public void testAsyncActionErrorWrapping() throws Exception { - List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); - JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync(); - try { - future.get(2, TimeUnit.SECONDS); - fail("Expected future.get() for failed job to throw ExcecutionException"); - } catch (ExecutionException ee) { - assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!")); - } - assertTrue(future.isDone()); - } - - static class Class1 {} - static class Class2 {} - - @Test - public void testRegisterKryoClasses() { - SparkConf conf = new SparkConf(); - conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class }); - assertEquals( - Class1.class.getName() + "," + Class2.class.getName(), - conf.get("spark.kryo.classesToRegister")); - } - - @Test - public void testGetPersistentRDDs() { - java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs(); - assertTrue(cachedRddsMap.isEmpty()); - JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache(); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache(); - cachedRddsMap = sc.getPersistentRDDs(); - assertEquals(2, cachedRddsMap.size()); - assertEquals("RDD1", cachedRddsMap.get(0).name()); - assertEquals("RDD2", cachedRddsMap.get(1).name()); - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java new file mode 100644 index 0000000..e22ad89 --- /dev/null +++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark; + +import java.io.File; +import java.io.Serializable; +import java.util.*; + +import scala.Tuple2; + +import com.google.common.collect.Iterables; +import com.google.common.io.Files; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.util.Utils; + +/** + * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 + * lambda syntax. + */ +public class Java8RDDAPISuite implements Serializable { + private static int foreachCalls = 0; + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaAPISuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void foreachWithAnonymousClass() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(new VoidFunction<String>() { + @Override + public void call(String s) { + foreachCalls++; + } + }); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void foreach() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(x -> foreachCalls++); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void groupBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function<Integer, Boolean> isOdd = x -> x % 2 == 0; + JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + + oddsAndEvens = rdd.groupBy(isOdd, 1); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + } + + @Test + public void leftOuterJoin() { + JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) + )); + JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') + )); + List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = + rdd1.leftOuterJoin(rdd2).collect(); + Assert.assertEquals(5, joined.size()); + Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); + Assert.assertEquals(3, firstUnmatched._1().intValue()); + } + + @Test + public void foldReduce() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; + + int sum = rdd.fold(0, add); + Assert.assertEquals(33, sum); + + sum = rdd.reduce(add); + Assert.assertEquals(33, sum); + } + + @Test + public void foldByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); + Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); + Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); + Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); + } + + @Test + public void reduceByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); + Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); + Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); + Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); + + Map<Integer, Integer> localCounts = counts.collectAsMap(); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + + localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + } + + @Test + public void map() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); + doubles.collect(); + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) + .cache(); + pairs.collect(); + JavaRDD<String> strings = rdd.map(Object::toString).cache(); + strings.collect(); + } + + @Test + public void flatMap() { + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", + "The quick brown fox jumps over the lazy dog.")); + JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); + + Assert.assertEquals("Hello", words.first()); + Assert.assertEquals(11, words.count()); + + JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { + List<Tuple2<String, String>> pairs2 = new LinkedList<>(); + for (String word : s.split(" ")) { + pairs2.add(new Tuple2<>(word, word)); + } + return pairs2.iterator(); + }); + + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); + Assert.assertEquals(11, pairs.count()); + + JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { + List<Double> lengths = new LinkedList<>(); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); + } + return lengths.iterator(); + }); + + Assert.assertEquals(5.0, doubles.first(), 0.01); + Assert.assertEquals(11, pairs.count()); + } + + @Test + public void mapsFromPairsToPairs() { + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); + + // Regression test for SPARK-668: + JavaPairRDD<String, Integer> swapped = + pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator()); + swapped.collect(); + + // There was never a bug here, but it's worth testing: + pairRDD.map(Tuple2::swap).collect(); + } + + @Test + public void mapPartitions() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); + JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); + } + return Collections.singletonList(sum).iterator(); + }); + + Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); + } + + @Test + public void sequenceFile() { + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + String outputDir = new File(tempDir, "output").getAbsolutePath(); + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); + + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + + // Try reading the output back as an object file + JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) + .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); + Assert.assertEquals(pairs, readRDD.collect()); + Utils.deleteRecursively(tempDir); + } + + @Test + public void zip() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); + JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); + zipped.count(); + } + + @Test + public void zipPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); + JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); + FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = + (Iterator<Integer> i, Iterator<String> s) -> { + int sizeI = 0; + while (i.hasNext()) { + sizeI += 1; + i.next(); + } + int sizeS = 0; + while (s.hasNext()) { + sizeS += 1; + s.next(); + } + return Arrays.asList(sizeI, sizeS).iterator(); + }; + JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); + Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); + } + + @Test + public void keyBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); + List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); + } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + JavaPairRDD<Integer, Integer> rdd3 = + rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); + Assert.assertEquals(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); + } + + @Test + public void collectPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); + + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), + rdd2.collectPartitions(new int[]{0})[0]); + + List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), + parts2[1]); + } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); + JavaPairRDD<Integer, int[]> pairRDD = + rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); + pairRDD.collect(); // Works fine + pairRDD.collectAsMap(); // Used to crash with ClassCastException + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org