http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala ---------------------------------------------------------------------- diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala new file mode 100644 index 0000000..fda377e --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.{Inet4Address, InetAddress, NetworkInterface} + +import scala.collection.JavaConverters._ +import scala.sys.process._ +import scala.util.Try + +private[spark] object DockerUtils { + + def getDockerIp(): String = { + /** If docker-machine is setup on this box, attempts to find the ip from it. */ + def findFromDockerMachine(): Option[String] = { + sys.env.get("DOCKER_MACHINE_NAME").flatMap { name => + Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption + } + } + sys.env.get("DOCKER_IP") + .orElse(findFromDockerMachine()) + .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) + .getOrElse { + // This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist + // certain interfaces. + val address = InetAddress.getLocalHost + // Address resolves to something like 127.0.1.1, which happens on Debian; try to find + // a better address using the local network interfaces + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. On windows, it returns in index order. + // It's more proper to pick ip address following system output order. + val blackListedIFs = Seq( + "vboxnet0", // Mac + "docker0" // Linux + ) + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i => + !blackListedIFs.contains(i.getName) + } + val reOrderedNetworkIFs = activeNetworkIFs.reverse + for (ni <- reOrderedNetworkIFs) { + val addresses = ni.getInetAddresses.asScala + .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq + if (addresses.nonEmpty) { + val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) + // because of Inet6Address.toHostName may add interface at the end if it knows about it + val strippedAddress = InetAddress.getByAddress(addr.getAddress) + return strippedAddress.getHostAddress + } + } + address.getHostAddress + } + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/README.md ---------------------------------------------------------------------- diff --git a/external/docker/README.md b/external/docker/README.md new file mode 100644 index 0000000..40ba9c3 --- /dev/null +++ b/external/docker/README.md @@ -0,0 +1,7 @@ +Spark docker files +=========== + +Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles), +as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker). + +Tested with Docker version 0.8.1. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/build ---------------------------------------------------------------------- diff --git a/external/docker/build b/external/docker/build new file mode 100755 index 0000000..253a2fc --- /dev/null +++ b/external/docker/build @@ -0,0 +1,22 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +docker images > /dev/null || { echo Please install docker in non-sudo mode. ; exit; } + +./spark-test/build \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-mesos/Dockerfile ---------------------------------------------------------------------- diff --git a/external/docker/spark-mesos/Dockerfile b/external/docker/spark-mesos/Dockerfile new file mode 100644 index 0000000..fb3f267 --- /dev/null +++ b/external/docker/spark-mesos/Dockerfile @@ -0,0 +1,30 @@ +# This is an example Dockerfile for creating a Spark image which can be +# references by the Spark property 'spark.mesos.executor.docker.image' +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM mesosphere/mesos:0.20.1 + +# Update the base ubuntu image with dependencies needed for Spark +RUN apt-get update && \ + apt-get install -y python libnss3 openjdk-7-jre-headless curl + +RUN mkdir /opt/spark && \ + curl http://www.apache.org/dyn/closer.lua/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.4.tgz \ + | tar -xzC /opt +ENV SPARK_HOME /opt/spark +ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/README.md ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/README.md b/external/docker/spark-test/README.md new file mode 100644 index 0000000..ec0baf6 --- /dev/null +++ b/external/docker/spark-test/README.md @@ -0,0 +1,11 @@ +Spark Docker files usable for testing and development purposes. + +These images are intended to be run like so: + + docker run -v $SPARK_HOME:/opt/spark spark-test-master + docker run -v $SPARK_HOME:/opt/spark spark-test-worker spark://<master_ip>:7077 + +Using this configuration, the containers will have their Spark directories +mounted to your actual `SPARK_HOME`, allowing you to modify and recompile +your Spark source and have them immediately usable in the docker images +(without rebuilding them). http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/base/Dockerfile ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/base/Dockerfile b/external/docker/spark-test/base/Dockerfile new file mode 100644 index 0000000..76f550f --- /dev/null +++ b/external/docker/spark-test/base/Dockerfile @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM ubuntu:precise + +# Upgrade package index +# install a few other useful packages plus Open Jdk 7 +# Remove unneeded /var/lib/apt/lists/* after install to reduce the +# docker image size (by ~30MB) +RUN apt-get update && \ + apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server && \ + rm -rf /var/lib/apt/lists/* + +ENV SCALA_VERSION 2.11.7 +ENV CDH_VERSION cdh4 +ENV SCALA_HOME /opt/scala-$SCALA_VERSION +ENV SPARK_HOME /opt/spark +ENV PATH $SPARK_HOME:$SCALA_HOME/bin:$PATH + +# Install Scala +ADD http://www.scala-lang.org/files/archive/scala-$SCALA_VERSION.tgz / +RUN (cd / && gunzip < scala-$SCALA_VERSION.tgz)|(cd /opt && tar -xvf -) +RUN rm /scala-$SCALA_VERSION.tgz http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/build ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/build b/external/docker/spark-test/build new file mode 100755 index 0000000..6f9e197 --- /dev/null +++ b/external/docker/spark-test/build @@ -0,0 +1,22 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +docker build -t spark-test-base spark-test/base/ +docker build -t spark-test-master spark-test/master/ +docker build -t spark-test-worker spark-test/worker/ http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/master/Dockerfile ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/master/Dockerfile b/external/docker/spark-test/master/Dockerfile new file mode 100644 index 0000000..f729534 --- /dev/null +++ b/external/docker/spark-test/master/Dockerfile @@ -0,0 +1,21 @@ +# Spark Master +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-test-base +ADD default_cmd /root/ +CMD ["/root/default_cmd"] http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/master/default_cmd ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/master/default_cmd b/external/docker/spark-test/master/default_cmd new file mode 100755 index 0000000..5a7da34 --- /dev/null +++ b/external/docker/spark-test/master/default_cmd @@ -0,0 +1,28 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') +echo "CONTAINER_IP=$IP" +export SPARK_LOCAL_IP=$IP +export SPARK_PUBLIC_DNS=$IP + +# Avoid the default Docker behavior of mapping our IP address to an unreachable host name +umount /etc/hosts + +/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/worker/Dockerfile ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/worker/Dockerfile b/external/docker/spark-test/worker/Dockerfile new file mode 100644 index 0000000..890febe --- /dev/null +++ b/external/docker/spark-test/worker/Dockerfile @@ -0,0 +1,22 @@ +# Spark Worker +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-test-base +ENV SPARK_WORKER_PORT 8888 +ADD default_cmd /root/ +ENTRYPOINT ["/root/default_cmd"] http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/docker/spark-test/worker/default_cmd ---------------------------------------------------------------------- diff --git a/external/docker/spark-test/worker/default_cmd b/external/docker/spark-test/worker/default_cmd new file mode 100755 index 0000000..31b06cb --- /dev/null +++ b/external/docker/spark-test/worker/default_cmd @@ -0,0 +1,28 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }') +echo "CONTAINER_IP=$IP" +export SPARK_LOCAL_IP=$IP +export SPARK_PUBLIC_DNS=$IP + +# Avoid the default Docker behavior of mapping our IP address to an unreachable host name +umount /etc/hosts + +/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1 http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/README.md ---------------------------------------------------------------------- diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md new file mode 100644 index 0000000..dc9e87f --- /dev/null +++ b/external/java8-tests/README.md @@ -0,0 +1,24 @@ +# Java 8 Test Suites + +These tests require having Java 8 installed and are isolated from the main Spark build. +If Java 8 is not your system's default Java version, you will need to point Spark's build +to your Java location. The set-up depends a bit on the build system: + +* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass + `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically + include the Java 8 test project. + + `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"` + +* For Maven users, + + Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not + automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests` + must be used. + + `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` + `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite` + + Note that the above command can only be run from project root directory since this module + depends on core and the test-jars of core and streaming. This means an install step is + required to make the test dependencies visible to the Java 8 sub-project. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml new file mode 100644 index 0000000..0ad9c53 --- /dev/null +++ b/external/java8-tests/pom.xml @@ -0,0 +1,161 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +~ Licensed to the Apache Software Foundation (ASF) under one or more +~ contributor license agreements. See the NOTICE file distributed with +~ this work for additional information regarding copyright ownership. +~ The ASF licenses this file to You under the Apache License, Version 2.0 +~ (the "License"); you may not use this file except in compliance with +~ the License. You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>java8-tests_2.11</artifactId> + <packaging>pom</packaging> + <name>Spark Project Java8 Tests POM</name> + + <properties> + <sbt.project.name>java8-tests</sbt.project.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-test-tags_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>java8-tests</id> + </profile> + </profiles> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-install-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + <configuration> + <systemPropertyVariables> + <!-- For some reason surefire isn't setting this log4j file on the + test classpath automatically. So we add it manually. --> + <log4j.configuration> + file:src/test/resources/log4j.properties + </log4j.configuration> + </systemPropertyVariables> + <skipTests>false</skipTests> + <includes> + <include>**/Suite*.java</include> + <include>**/*Suite.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <fork>true</fork> + <verbose>true</verbose> + <forceJavacCompilerUse>true</forceJavacCompilerUse> + <source>1.8</source> + <compilerVersion>1.8</compilerVersion> + <target>1.8</target> + <encoding>UTF-8</encoding> + <maxmem>1024m</maxmem> + </configuration> + </plugin> + <plugin> + <!-- disabled --> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <phase>none</phase> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>none</phase> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>none</phase> + </execution> + <execution> + <id>attach-scaladocs</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java new file mode 100644 index 0000000..c0b58e7 --- /dev/null +++ b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import java.io.File; +import java.io.Serializable; +import java.util.*; + +import scala.Tuple2; + +import com.google.common.collect.Iterables; +import com.google.common.io.Files; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.util.Utils; + +/** + * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 + * lambda syntax. + */ +public class Java8APISuite implements Serializable { + static int foreachCalls = 0; + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaAPISuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void foreachWithAnonymousClass() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(new VoidFunction<String>() { + @Override + public void call(String s) { + foreachCalls++; + } + }); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void foreach() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(x -> foreachCalls++); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void groupBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function<Integer, Boolean> isOdd = x -> x % 2 == 0; + JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + + oddsAndEvens = rdd.groupBy(isOdd, 1); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + } + + @Test + public void leftOuterJoin() { + JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) + )); + JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') + )); + List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = + rdd1.leftOuterJoin(rdd2).collect(); + Assert.assertEquals(5, joined.size()); + Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); + Assert.assertEquals(3, firstUnmatched._1().intValue()); + } + + @Test + public void foldReduce() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; + + int sum = rdd.fold(0, add); + Assert.assertEquals(33, sum); + + sum = rdd.reduce(add); + Assert.assertEquals(33, sum); + } + + @Test + public void foldByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); + Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); + Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); + Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); + } + + @Test + public void reduceByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); + Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); + Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); + Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); + + Map<Integer, Integer> localCounts = counts.collectAsMap(); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + + localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + } + + @Test + public void map() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); + doubles.collect(); + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) + .cache(); + pairs.collect(); + JavaRDD<String> strings = rdd.map(Object::toString).cache(); + strings.collect(); + } + + @Test + public void flatMap() { + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", + "The quick brown fox jumps over the lazy dog.")); + JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" "))); + + Assert.assertEquals("Hello", words.first()); + Assert.assertEquals(11, words.count()); + + JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { + List<Tuple2<String, String>> pairs2 = new LinkedList<>(); + for (String word : s.split(" ")) { + pairs2.add(new Tuple2<>(word, word)); + } + return pairs2; + }); + + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); + Assert.assertEquals(11, pairs.count()); + + JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { + List<Double> lengths = new LinkedList<>(); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); + } + return lengths; + }); + + Assert.assertEquals(5.0, doubles.first(), 0.01); + Assert.assertEquals(11, pairs.count()); + } + + @Test + public void mapsFromPairsToPairs() { + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); + + // Regression test for SPARK-668: + JavaPairRDD<String, Integer> swapped = + pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap())); + swapped.collect(); + + // There was never a bug here, but it's worth testing: + pairRDD.map(Tuple2::swap).collect(); + } + + @Test + public void mapPartitions() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); + JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); + } + return Collections.singletonList(sum); + }); + + Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); + } + + @Test + public void sequenceFile() { + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + String outputDir = new File(tempDir, "output").getAbsolutePath(); + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); + + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + + // Try reading the output back as an object file + JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) + .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); + Assert.assertEquals(pairs, readRDD.collect()); + Utils.deleteRecursively(tempDir); + } + + @Test + public void zip() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); + JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); + zipped.count(); + } + + @Test + public void zipPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); + JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); + FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = + (Iterator<Integer> i, Iterator<String> s) -> { + int sizeI = 0; + while (i.hasNext()) { + sizeI += 1; + i.next(); + } + int sizeS = 0; + while (s.hasNext()) { + sizeS += 1; + s.next(); + } + return Arrays.asList(sizeI, sizeS).iterator(); + }; + JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); + Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); + } + + @Test + public void accumulators() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + + Accumulator<Integer> intAccum = sc.intAccumulator(10); + rdd.foreach(intAccum::add); + Assert.assertEquals((Integer) 25, intAccum.value()); + + Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); + rdd.foreach(x -> doubleAccum.add((double) x)); + Assert.assertEquals((Double) 25.0, doubleAccum.value()); + + // Try a custom accumulator type + AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { + @Override + public Float addInPlace(Float r, Float t) { + return r + t; + } + @Override + public Float addAccumulator(Float r, Float t) { + return r + t; + } + @Override + public Float zero(Float initialValue) { + return 0.0f; + } + }; + + Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); + rdd.foreach(x -> floatAccum.add((float) x)); + Assert.assertEquals((Float) 25.0f, floatAccum.value()); + + // Test the setValue method + floatAccum.setValue(5.0f); + Assert.assertEquals((Float) 5.0f, floatAccum.value()); + } + + @Test + public void keyBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); + List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); + } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + JavaPairRDD<Integer, Integer> rdd3 = + rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); + Assert.assertEquals(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); + } + + @Test + public void collectPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); + + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), + rdd2.collectPartitions(new int[]{0})[0]); + + List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), + parts2[1]); + } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); + JavaPairRDD<Integer, int[]> pairRDD = + rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); + pairRDD.collect(); // Works fine + pairRDD.collectAsMap(); // Used to crash with ClassCastException + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java new file mode 100644 index 0000000..604d818 --- /dev/null +++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -0,0 +1,905 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import java.io.Serializable; +import java.util.*; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.spark.Accumulator; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; + +/** + * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 + * lambda syntax. + */ +@SuppressWarnings("unchecked") +public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { + + @Test + public void testMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(5, 5), + Arrays.asList(9, 4)); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(String::length); + JavaTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red sox")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red sox")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOX")); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> mapped = stream.mapPartitions(in -> { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + }); + JavaTestUtils.attachTestOutputStream(mapped); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduce() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6), + Arrays.asList(7, 8, 9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y); + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6), + Arrays.asList(7, 8, 9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, + (x, y) -> x - y, new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6), + Arrays.asList(7, 8, 9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3, 4, 5), + Arrays.asList(6, 7, 8), + Arrays.asList(9, 10, 11)); + + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); + + JavaTestUtils.attachTestOutputStream(transformed); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List<List<Tuple2<String, Integer>>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream<Integer> transformed1 = stream.transform(in -> null); + JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null); + JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null); + JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null); + JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null); + JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null); + JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null); + JavaPairDStream<String, String> pairTransformed4 = + pairStream.transformToPair((x, time) -> null); + + } + + @Test + public void testTransformWith() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList( + new Tuple2<>("california", "dodgers"), + new Tuple2<>("new york", "yankees")), + Arrays.asList( + new Tuple2<>("california", "sharks"), + new Tuple2<>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList( + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "mets")), + Arrays.asList( + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "islanders"))); + + + List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Sets.newHashSet( + new Tuple2<>("california", + new Tuple2<>("dodgers", "giants")), + new Tuple2<>("new york", + new Tuple2<>("yankees", "mets"))), + Sets.newHashSet( + new Tuple2<>("california", + new Tuple2<>("sharks", "ducks")), + new Tuple2<>("new york", + new Tuple2<>("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = + pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y)); + + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + for (List<Tuple2<String, Tuple2<String, String>>> res : result) { + unorderedResult.add(Sets.newHashSet(res)); + } + + Assert.assertEquals(expected, unorderedResult); + } + + + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); + List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List<List<Tuple2<String, Integer>>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); + List<List<Tuple2<Double, Character>>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); + JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null); + JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null); + + JavaPairDStream<Double, Double> transformed3 = + stream1.transformWithToPair(stream2,(x, y, z) -> null); + + JavaPairDStream<Double, Double> transformed4 = + stream1.transformWithToPair(pairStream1,(x, y, z) -> null); + + JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null); + + JavaDStream<Double> pairTransformed2_ = + pairStream1.transformWith(pairStream1,(x, y, z) -> null); + + JavaPairDStream<Double, Double> pairTransformed3 = + pairStream1.transformWithToPair(stream2,(x, y, z) -> null); + + JavaPairDStream<Double, Double> pairTransformed4 = + pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null); + } + + @Test + public void testStreamingContextTransform() { + List<List<Integer>> stream1input = Arrays.asList( + Arrays.asList(1), + Arrays.asList(2) + ); + + List<List<Integer>> stream2input = Arrays.asList( + Arrays.asList(3), + Arrays.asList(4) + ); + + List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( + Arrays.asList(new Tuple2<>(1, "x")), + Arrays.asList(new Tuple2<>(2, "y")) + ); + + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), + Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) + ); + + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); + JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); + + List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); + + // This is just to test whether this transform to JavaStream compiles + JavaDStream<Long> transformed1 = ssc.transform( + listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { + Assert.assertEquals(2, listOfRDDs.size()); + return null; + }); + + List<JavaDStream<?>> listOfDStreams2 = + Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); + + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( + listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { + Assert.assertEquals(3, listOfRDDs.size()); + JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); + JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); + JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); + JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction<Integer, Integer, Integer> mapToTuple = + (Integer i) -> new Tuple2<>(i, i); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); + }); + JavaTestUtils.attachTestOutputStream(transformed2); + List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = + JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), + Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), + Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)"))); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testForeachRDD() { + final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); + final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,1,1), + Arrays.asList(1,1,1)); + + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output + + stream.foreachRDD(rdd -> { + accumRdd.add(1); + rdd.foreach(x -> accumEle.add(1)); + }); + + // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java + stream.foreachRDD((rdd, time) -> null); + + JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(2, accumRdd.value().intValue()); + Assert.assertEquals(6, accumEle.value().intValue()); + } + + @Test + public void testPairFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>(6, "g"), + new Tuple2<>(6, "i"), + new Tuple2<>(6, "a"), + new Tuple2<>(6, "n"), + new Tuple2<>(6, "t"), + new Tuple2<>(6, "s")), + Arrays.asList( + new Tuple2<>(7, "d"), + new Tuple2<>(7, "o"), + new Tuple2<>(7, "d"), + new Tuple2<>(7, "g"), + new Tuple2<>(7, "e"), + new Tuple2<>(7, "r"), + new Tuple2<>(7, "s")), + Arrays.asList( + new Tuple2<>(9, "a"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "h"), + new Tuple2<>(9, "l"), + new Tuple2<>(9, "e"), + new Tuple2<>(9, "t"), + new Tuple2<>(9, "i"), + new Tuple2<>(9, "c"), + new Tuple2<>(9, "s"))); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter : s.split("(?!^)")) { + out.add(new Tuple2<>(s.length(), letter)); + } + return out; + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static <T extends Comparable<T>> void assertOrderInvariantEquals( + List<List<T>> expected, List<List<T>> actual) { + expected.forEach(list -> Collections.sort(list)); + List<List<T>> sortedActual = new ArrayList<>(); + actual.forEach(list -> { + List<T> sortedList = new ArrayList<>(list); + Collections.sort(sortedList); + sortedActual.add(sortedList); + }); + Assert.assertEquals(expected, sortedActual); + } + + @Test + public void testPairFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red sox")); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("giants", 6)), + Arrays.asList(new Tuple2<>("yankees", 7))); + + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = + stream.mapToPair(x -> new Tuple2<>(x, x.length())); + JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a")); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<>("california", "dodgers"), + new Tuple2<>("california", "giants"), + new Tuple2<>("new york", "yankees"), + new Tuple2<>("new york", "mets")), + Arrays.asList(new Tuple2<>("california", "sharks"), + new Tuple2<>("california", "ducks"), + new Tuple2<>("new york", "rangers"), + new Tuple2<>("new york", "islanders"))); + + List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2<>("california", 1), + new Tuple2<>("california", 3), + new Tuple2<>("new york", 4), + new Tuple2<>("new york", 1)), + Arrays.asList( + new Tuple2<>("california", 5), + new Tuple2<>("california", 5), + new Tuple2<>("new york", 3), + new Tuple2<>("new york", 1))); + + @Test + public void testPairMap() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), + Arrays.asList( + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMapPartitions() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>(1, "california"), + new Tuple2<>(3, "california"), + new Tuple2<>(4, "new york"), + new Tuple2<>(1, "new york")), + Arrays.asList( + new Tuple2<>(5, "california"), + new Tuple2<>(5, "california"), + new Tuple2<>(3, "new york"), + new Tuple2<>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { + LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); + while (in.hasNext()) { + Tuple2<String, Integer> next = in.next(); + out.add(next.swap()); + } + return out; + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMap2() { // Maps pair -> single + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1, 3, 4, 1), + Arrays.asList(5, 5, 3, 1)); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair + List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2)), + Arrays.asList( + new Tuple2<>("hi", 1), + new Tuple2<>("ho", 2))); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o")), + Arrays.asList( + new Tuple2<>(1, "h"), + new Tuple2<>(1, "i"), + new Tuple2<>(2, "h"), + new Tuple2<>(2, "o"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { + List<Tuple2<Integer, String>> out = new LinkedList<>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<>(in._2(), s.toString())); + } + return out; + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList( + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y); + + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList( + new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, + (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v : values) { + out = out + v; + } + return Optional.of(out); + }); + + JavaTestUtils.attachTestOutputStream(updated); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("california", 4), + new Tuple2<>("new york", 5)), + Arrays.asList(new Tuple2<>("california", 14), + new Tuple2<>("new york", 9)), + Arrays.asList(new Tuple2<>("california", 10), + new Tuple2<>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), + new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), + Arrays.asList( + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); + + List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5)), + Arrays.asList( + new Tuple2<>(1, 5), + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5))); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); + + JavaTestUtils.attachTestOutputStream(sorted); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToNormalRDDTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<>(3, 5), + new Tuple2<>(1, 5), + new Tuple2<>(4, 5), + new Tuple2<>(2, 5)), + Arrays.asList( + new Tuple2<>(2, 5), + new Tuple2<>(3, 5), + new Tuple2<>(4, 5), + new Tuple2<>(1, 5))); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3, 1, 4, 2), + Arrays.asList(2, 3, 4, 1)); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1())); + JavaTestUtils.attachTestOutputStream(firstParts); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("california", "DODGERS"), + new Tuple2<>("california", "GIANTS"), + new Tuple2<>("new york", "YANKEES"), + new Tuple2<>("new york", "METS")), + Arrays.asList(new Tuple2<>("california", "SHARKS"), + new Tuple2<>("california", "DUCKS"), + new Tuple2<>("new york", "RANGERS"), + new Tuple2<>("new york", "ISLANDERS"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); + JavaTestUtils.attachTestOutputStream(mapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<>("california", "dodgers1"), + new Tuple2<>("california", "dodgers2"), + new Tuple2<>("california", "giants1"), + new Tuple2<>("california", "giants2"), + new Tuple2<>("new york", "yankees1"), + new Tuple2<>("new york", "yankees2"), + new Tuple2<>("new york", "mets1"), + new Tuple2<>("new york", "mets2")), + Arrays.asList(new Tuple2<>("california", "sharks1"), + new Tuple2<>("california", "sharks2"), + new Tuple2<>("california", "ducks1"), + new Tuple2<>("california", "ducks2"), + new Tuple2<>("new york", "rangers1"), + new Tuple2<>("new york", "rangers2"), + new Tuple2<>("new york", "islanders1"), + new Tuple2<>("new york", "islanders2"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, String> flatMapped = + pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + /** + * This test is only for testing the APIs. It's not necessary to run it. + */ + public void testMapWithStateAPI() { + JavaPairRDD<String, Boolean> initialRDD = null; + JavaPairDStream<String, Integer> wordsDstream = null; + + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = + wordsDstream.mapWithState( + StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { + // Use all State's methods here + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return Optional.of(2.0); + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); + + JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = + wordsDstream.mapWithState( + StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { + state.exists(); + state.get(); + state.isTimingOut(); + state.remove(); + state.update(true); + return 2.0; + }).initialState(initialRDD) + .numPartitions(10) + .partitioner(new HashPartitioner(10)) + .timeout(Durations.seconds(10))); + + JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..eb3b1999 --- /dev/null +++ b/external/java8-tests/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN +org.spark-project.jetty.LEVEL=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala new file mode 100644 index 0000000..fa0681d --- /dev/null +++ b/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Test cases where JDK8-compiled Scala user code is used with Spark. + */ +class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext { + test("basic RDD closure test (SPARK-6152)") { + sc.parallelize(1 to 1000).map(x => x * x).count() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml new file mode 100644 index 0000000..d1c38c7 --- /dev/null +++ b/external/kinesis-asl-assembly/pom.xml @@ -0,0 +1,181 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Project Kinesis Assembly</name> + <url>http://spark.apache.org/</url> + + <properties> + <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- + Demote already included in the Spark assembly. + --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>${avro.mapred.classifier}</classifier> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <configuration> + <shadedArtifactAttached>false</shadedArtifactAttached> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resource>log4j.properties</resource> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> +</build> +</project> + http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/pom.xml ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml new file mode 100644 index 0000000..935155e --- /dev/null +++ b/external/kinesis-asl/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +~ Licensed to the Apache Software Foundation (ASF) under one or more +~ contributor license agreements. See the NOTICE file distributed with +~ this work for additional information regarding copyright ownership. +~ The ASF licenses this file to You under the Apache License, Version 2.0 +~ (the "License"); you may not use this file except in compliance with +~ the License. You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <!-- Kinesis integration is not included by default due to ASL-licensed code. --> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kinesis-asl_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Kinesis Integration</name> + + <properties> + <sbt.project.name>streaming-kinesis-asl</sbt.project.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-kinesis-client</artifactId> + <version>${aws.kinesis.client.version}</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-kinesis-producer</artifactId> + <version>${aws.kinesis.producer.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-test-tags_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> +</project> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org