http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java deleted file mode 100644 index 52efee7..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.wordcount.util; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Provides the default data sets used for the WordCount example program. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class WordCountData { - - public static final String[] WORDS = new String[] { - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,", - "And by opposing end them?--To die,--to sleep,--", - "No more; and by a sleep to say we end", - "The heartache, and the thousand natural shocks", - "That flesh is heir to,--'tis a consummation", - "Devoutly to be wish'd. To die,--to sleep;--", - "To sleep! perchance to dream:--ay, there's the rub;", - "For in that sleep of death what dreams may come,", - "When we have shuffled off this mortal coil,", - "Must give us pause: there's the respect", - "That makes calamity of so long life;", - "For who would bear the whips and scorns of time,", - "The oppressor's wrong, the proud man's contumely,", - "The pangs of despis'd love, the law's delay,", - "The insolence of office, and the spurns", - "That patient merit of the unworthy takes,", - "When he himself might his quietus make", - "With a bare bodkin? who would these fardels bear,", - "To grunt and sweat under a weary life,", - "But that the dread of something after death,--", - "The undiscover'd country, from whose bourn", - "No traveller returns,--puzzles the will,", - "And makes us rather bear those ills we have", - "Than fly to others that we know not of?", - "Thus conscience does make cowards of us all;", - "And thus the native hue of resolution", - "Is sicklied o'er with the pale cast of thought;", - "And enterprises of great pith and moment,", - "With this regard, their currents turn awry,", - "And lose the name of action.--Soft you now!", - "The fair Ophelia!--Nymph, in thy orisons", - "Be all my sins remember'd." - }; - - public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { - return env.fromElements(WORDS); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/resources/log4j-test.properties b/flink-examples/flink-java-examples/src/main/resources/log4j-test.properties deleted file mode 100644 index 65bd0b8..0000000 --- a/flink-examples/flink-java-examples/src/main/resources/log4j-test.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/resources/log4j.properties b/flink-examples/flink-java-examples/src/main/resources/log4j.properties deleted file mode 100644 index da32ea0..0000000 --- a/flink-examples/flink-java-examples/src/main/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/resources/logback.xml b/flink-examples/flink-java-examples/src/main/resources/logback.xml deleted file mode 100644 index 95f2d04..0000000 --- a/flink-examples/flink-java-examples/src/main/resources/logback.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="INFO"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml deleted file mode 100644 index 7cb5f29..0000000 --- a/flink-examples/flink-scala-examples/pom.xml +++ /dev/null @@ -1,487 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-scala-examples</artifactId> - <name>flink-scala-examples</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - - <!-- get default data from flink-java-examples package --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.9</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>unpack</id> - <phase>prepare-package</phase> - <goals> - <goal>unpack</goal> - </goals> - <configuration> - <artifactItems> - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - <type>jar</type> - <overWrite>false</overWrite> - <outputDirectory>${project.build.directory}/classes</outputDirectory> - <includes>**/util/*Data*.class</includes> - </artifactItem> - </artifactItems> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - - <!-- KMeans --> - <execution> - <id>KMeans</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - - <configuration> - <classifier>KMeans</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.clustering.KMeans</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/clustering/KMeans.class</include> - <include>**/scala/clustering/KMeans$*.class</include> - <include>**/java/clustering/util/KMeansDataGenerator.class</include> - <include>**/java/clustering/util/KMeansData.class</include> - </includes> - </configuration> - </execution> - - <!-- Transitive Closure --> - <execution> - <id>TransitiveClosure</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TransitiveClosure</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/graph/TransitiveClosureNaive.class</include> - <include>**/scala/graph/TransitiveClosureNaive$*.class</include> - <include>**/java/graph/util/ConnectedComponentsData.class</include> - </includes> - </configuration> - </execution> - - <!-- Connected Components --> - <execution> - <id>ConnectedComponents</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>ConnectedComponents</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.graph.ConnectedComponents</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/graph/ConnectedComponents.class</include> - <include>**/scala/graph/ConnectedComponents$*.class</include> - <include>**/java/graph/util/ConnectedComponentsData.class</include> - </includes> - </configuration> - </execution> - - <!-- EnumTriangles Basic --> - <execution> - <id>EnumTrianglesBasic</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>EnumTrianglesBasic</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.graph.EnumTrianglesBasic</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/graph/EnumTrianglesBasic.class</include> - <include>**/scala/graph/EnumTrianglesBasic$*.class</include> - <include>**/java/graph/util/EnumTrianglesData.class</include> - </includes> - </configuration> - </execution> - - <!-- EnumTriangles Opt --> - <execution> - <id>EnumTrianglesOpt</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>EnumTrianglesOpt</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.graph.EnumTrianglesOpt</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/graph/EnumTrianglesOpt.class</include> - <include>**/scala/graph/EnumTrianglesOpt$*.class</include> - <include>**/java/graph/util/EnumTrianglesData.class</include> - </includes> - </configuration> - </execution> - - <!-- PageRank Basic--> - <execution> - <id>PageRankBasic</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>PageRankBasic</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.graph.PageRankBasic</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/graph/PageRankBasic.class</include> - <include>**/scala/graph/PageRankBasic$*.class</include> - <include>**/java/graph/util/PageRankData.class</include> - </includes> - </configuration> - </execution> - - <!-- These queries are currently not self-contained --> - - <!-- TPC-H Query 10 --> - - <!-- - <execution> - <id>TPCHQuery10</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TPCHQuery10</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.relational.TPCHQuery10</program-class> - </manifestEntries> - </archive> - <includes> - <include>**/scala/relational/TPCHQuery10.class</include> - <include>**/scala/relational/TPCHQuery10$*.class</include> - </includes> - </configuration> - </execution> --> - - <!-- TPC-H Query 3 --> - <!-- - <execution> - <id>TPCHQuery3</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TPCHQuery3</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.relational.TPCHQuery3</program-class> - </manifestEntries> - </archive> - <includes> - <include>**/scala/relational/TPCHQuery3.class</include> - <include>**/scala/relational/TPCHQuery3$*.class</include> - </includes> - </configuration> - </execution> --> - - <!-- WebLogAnalysis --> - <execution> - <id>WebLogAnalysis</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WebLogAnalysis</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.relational.WebLogAnalysis</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/relational/WebLogAnalysis.class</include> - <include>**/scala/relational/WebLogAnalysis$*.class</include> - <include>**/java/relational/util/WebLogData.class</include> - <include>**/java/relational/util/WebLogDataGenerator.class</include> - </includes> - </configuration> - </execution> - - <!-- WordCount --> - <execution> - <id>WordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.examples.scala.wordcount.WordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>**/scala/wordcount/WordCount.class</include> - <include>**/scala/wordcount/WordCount$*.class</include> - <include>**/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - </executions> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!-- This plugin's configuration is used to store Eclipse m2e settings only. - It has no influence on the Maven build itself and simply suppresses errors in Eclipse.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <versionRange>[2.9,)</versionRange> - <goals> - <goal>unpack</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/java/org/apache/flink/examples/scala/Dummy.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/java/org/apache/flink/examples/scala/Dummy.java b/flink-examples/flink-scala-examples/src/main/java/org/apache/flink/examples/scala/Dummy.java deleted file mode 100644 index 1887a35..0000000 --- a/flink-examples/flink-scala-examples/src/main/java/org/apache/flink/examples/scala/Dummy.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.scala; - -/** - * This dummy class exists only to create an empty - * javadoc.jar in the flink-scala-examples project. - * This is required for passing the maven central sync requirements. - */ -public class Dummy {} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala deleted file mode 100644 index 08a3e62..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.clustering - -import org.apache.flink.api.common.functions._ -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields -import org.apache.flink.api.scala._ -import org.apache.flink.configuration.Configuration -import org.apache.flink.examples.java.clustering.util.KMeansData - -import scala.collection.JavaConverters._ - -/** - * This example implements a basic K-Means clustering algorithm. - * - * K-Means is an iterative clustering algorithm and works as follows: - * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster - * centers. - * In each iteration, the algorithm computes the distance of each data point to each cluster center. - * Each point is assigned to the cluster center which is closest to it. - * Subsequently, each cluster center is moved to the center (''mean'') of all points that have - * been assigned to it. - * The moved cluster centers are fed into the next iteration. - * The algorithm terminates after a fixed number of iterations (as in this implementation) - * or if cluster centers do not (significantly) move in an iteration. - * This is the Wikipedia entry for the [[http://en.wikipedia - * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. - * - * This implementation works on two-dimensional data points. - * It computes an assignment of data points to cluster centers, i.e., - * each data point is annotated with the id of the final cluster (center) it belongs to. - * - * Input files are plain text files and must be formatted as follows: - * - * - Data points are represented as two double values separated by a blank character. - * Data points are separated by newline characters. - * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, - * y=7.2). - * - Cluster centers are represented by an integer id and a point value. - * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, - * y=3.2) and (id=2, x=2.9, y=5.7). - * - * Usage: - * {{{ - * KMeans <points path> <centers path> <result path> <num iterations> - * }}} - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.clustering.util.KMeansData]] - * and 10 iterations. - * - * This example shows how to use: - * - * - Bulk iterations - * - Broadcast variables in bulk iterations - * - Custom Java objects (PoJos) - */ -object KMeans { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - - val points: DataSet[Point] = getPointDataSet(env) - val centroids: DataSet[Centroid] = getCentroidDataSet(env) - - val finalCentroids = centroids.iterate(numIterations) { currentCentroids => - val newCentroids = points - .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids") - .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2") - .groupBy(0) - .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1") - .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id") - newCentroids - } - - val clusteredPoints: DataSet[(Int, Point)] = - points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids") - - if (fileOutput) { - clusteredPoints.writeAsCsv(outputPath, "\n", " ") - env.execute("Scala KMeans Example") - } - else { - clusteredPoints.print() - } - - } - - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { - fileOutput = true - if (programArguments.length == 4) { - pointsPath = programArguments(0) - centersPath = programArguments(1) - outputPath = programArguments(2) - numIterations = Integer.parseInt(programArguments(3)) - - true - } - else { - System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " + - "iterations>") - - false - } - } - else { - System.out.println("Executing K-Means example with default parameters and built-in default " + - "data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" We provide a data generator to create synthetic input files for this " + - "program.") - System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num " + - "iterations>") - - true - } - } - - private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { - if (fileOutput) { - env.readCsvFile[(Double, Double)]( - pointsPath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { x => new Point(x._1, x._2)} - } - else { - val points = KMeansData.POINTS map { - case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(points) - } - } - - private def getCentroidDataSet(env: ExecutionEnvironment): DataSet[Centroid] = { - if (fileOutput) { - env.readCsvFile[(Int, Double, Double)]( - centersPath, - fieldDelimiter = " ", - includedFields = Array(0, 1, 2)) - .map { x => new Centroid(x._1, x._2, x._3)} - } - else { - val centroids = KMeansData.CENTROIDS map { - case Array(id, x, y) => - new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(centroids) - } - } - - private var fileOutput: Boolean = false - private var pointsPath: String = null - private var centersPath: String = null - private var outputPath: String = null - private var numIterations: Int = 10 - - /** - * A simple two-dimensional point. - */ - class Point(var x: Double, var y: Double) extends Serializable { - def this() { - this(0, 0) - } - - def add(other: Point): Point = { - x += other.x - y += other.y - this - } - - def div(other: Long): Point = { - x /= other - y /= other - this - } - - def euclideanDistance(other: Point): Double = { - Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y)) - } - - def clear(): Unit = { - x = 0 - y = 0 - } - - override def toString: String = { - x + " " + y - } - } - - /** - * A simple two-dimensional centroid, basically a point with an ID. - */ - class Centroid(var id: Int, x: Double, y: Double) extends Point(x, y) { - def this() { - this(0, 0, 0) - } - - def this(id: Int, p: Point) { - this(id, p.x, p.y) - } - - override def toString: String = { - id + " " + super.toString - } - } - - /** Determines the closest cluster center for a data point. */ - @ForwardedFields(Array("*->_2")) - final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] { - private var centroids: Traversable[Centroid] = null - - /** Reads the centroid values from a broadcast variable into a collection. */ - override def open(parameters: Configuration) { - centroids = getRuntimeContext.getBroadcastVariable[Centroid]("centroids").asScala - } - - def map(p: Point): (Int, Point) = { - var minDistance: Double = Double.MaxValue - var closestCentroidId: Int = -1 - for (centroid <- centroids) { - val distance = p.euclideanDistance(centroid) - if (distance < minDistance) { - minDistance = distance - closestCentroidId = centroid.id - } - } - (closestCentroidId, p) - } - - } -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala deleted file mode 100644 index 38c7f40..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import org.apache.flink.api.scala._ -import org.apache.flink.examples.java.graph.util.ConnectedComponentsData -import org.apache.flink.util.Collector - -/** - * An implementation of the connected components algorithm, using a delta iteration. - * - * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the - * minimum of its own ID and its neighbors' IDs, as its new ID and tells its neighbors about its - * new ID. After the algorithm has completed, all vertices in the same component will have the same - * ID. - * - * A vertex whose component ID did not change needs not propagate its information in the next - * step. Because of that, the algorithm is easily expressible via a delta iteration. We here model - * the solution set as the vertices with their current component ids, and the workset as the changed - * vertices. Because we see all vertices initially as changed, the initial workset and the initial - * solution set are identical. Also, the delta to the solution set is consequently also the next - * workset. - * - * Input files are plain text files and must be formatted as follows: - * - * - Vertices represented as IDs and separated by new-line characters. For example, - * `"1\n2\n12\n42\n63"` gives five vertices (1), (2), (12), (42), and (63). - * - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges - * are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"` - * gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63). - * - * Usage: - * {{{ - * ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations> - * }}} - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations. - * - * - * This example shows how to use: - * - * - Delta Iterations - * - Generic-typed Functions - * - */ -object ConnectedComponents { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read vertex and edge data - // assign the initial components (equal to the vertex id) - val vertices = getVerticesDataSet(env).map { id => (id, id) }.withForwardedFields("*->_1;*->_2") - - // undirected edges by emitting for each input edge the input edges itself and an inverted - // version - val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) } - - // open a delta iteration - val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) { - (s, ws) => - - // apply the step logic: join with the edges - val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => - (edge._2, vertex._2) - }.withForwardedFieldsFirst("_2->_2").withForwardedFieldsSecond("_2->_1") - - // select the minimum neighbor - val minNeighbors = allNeighbors.groupBy(0).min(1) - - // update if the component of the candidate is smaller - val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { - (newVertex, oldVertex, out: Collector[(Long, Long)]) => - if (newVertex._2 < oldVertex._2) out.collect(newVertex) - }.withForwardedFieldsFirst("*") - - // delta and new workset are identical - (updatedComponents, updatedComponents) - } - if (fileOutput) { - verticesWithComponents.writeAsCsv(outputPath, "\n", " ") - env.execute("Scala Connected Components Example") - } else { - verticesWithComponents.print() - } - - } - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 4) { - verticesPath = args(0) - edgesPath = args(1) - outputPath = args(2) - maxIterations = args(3).toInt - - true - } else { - System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" + - " <max number of iterations>") - - false - } - } else { - System.out.println("Executing Connected Components example with built-in default data.") - System.out.println(" Provide parameters to read input data from a file.") - System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path>" + - " <max number of iterations>") - - true - } - } - - private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = { - if (fileOutput) { - env.readCsvFile[Tuple1[Long]]( - verticesPath, - includedFields = Array(0)) - .map { x => x._1 } - } - else { - env.fromCollection(ConnectedComponentsData.VERTICES) - } - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { x => (x._1, x._2)} - } - else { - val edgeData = ConnectedComponentsData.EDGES map { - case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) - } - env.fromCollection(edgeData) - } - } - - private var fileOutput: Boolean = false - private var verticesPath: String = null - private var edgesPath: String = null - private var maxIterations: Int = 10 - private var outputPath: String = null -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala deleted file mode 100644 index ae8a982..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import org.apache.flink.api.scala._ -import org.apache.flink.util.Collector -import org.apache.flink.core.fs.FileSystem.WriteMode - -object DeltaPageRank { - - private final val DAMPENING_FACTOR: Double = 0.85 - private final val NUM_VERTICES = 5 - private final val INITIAL_RANK = 1.0 / NUM_VERTICES - private final val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES - private final val THRESHOLD = 0.0001 / NUM_VERTICES - - type Page = (Long, Double) - type Adjacency = (Long, Array[Long]) - - def main(args: Array[String]) { - - val maxIterations = 100 - - val env = ExecutionEnvironment.getExecutionEnvironment - - val rawLines: DataSet[String] = env.fromElements( - "1 2 3 4", - "2 1", - "3 5", - "4 2 3", - "5 2 4") - val adjacency: DataSet[Adjacency] = rawLines - .map(str => { - val elements = str.split(' ') - val id = elements(0).toLong - val neighbors = elements.slice(1, elements.length).map(_.toLong) - (id, neighbors) - }) - - val initialRanks: DataSet[Page] = adjacency.flatMap { - (adj, out: Collector[Page]) => - { - val targets = adj._2 - val rankPerTarget = INITIAL_RANK * DAMPENING_FACTOR / targets.length - - // dampend fraction to targets - for (target <- targets) { - out.collect((target, rankPerTarget)) - } - - // random jump to self - out.collect((adj._1, RANDOM_JUMP)) - } - } - .groupBy(0).sum(1) - - val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) } - .withForwardedFields("_1") - - val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, Array(0)) { - - (solutionSet, workset) => - { - val deltas = workset.join(adjacency).where(0).equalTo(0) { - (lastDeltas, adj, out: Collector[Page]) => - { - val targets = adj._2 - val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length - - for (target <- targets) { - out.collect((target, deltaPerTarget)) - } - } - } - .groupBy(0).sum(1) - .filter(x => Math.abs(x._2) > THRESHOLD) - - val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { - (current, delta) => (current._1, current._2 + delta._2) - }.withForwardedFieldsFirst("_1") - - (rankUpdates, deltas) - } - } - - iteration.print() - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala deleted file mode 100644 index 4c05fbb..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields -import org.apache.flink.api.scala._ -import scala.collection.JavaConverters._ -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.util.Collector -import org.apache.flink.examples.java.graph.util.EnumTrianglesData -import org.apache.flink.api.common.operators.Order - -import scala.collection.mutable - - -/** - * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. - * A triangle consists of three edges that connect three vertices with each other. - * - * The algorithm works as follows: - * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices - * that are connected by two edges. Finally, all triads are filtered for which no third edge exists - * that closes the triangle. - * - * Input files are plain text files and must be formatted as follows: - * - * - Edges are represented as pairs for vertex IDs which are separated by space - * characters. Edges are separated by new-line characters. - * For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12), - * (1)-(12), and (42)-(63) that include a triangle - * - * <pre> - * (1) - * / \ - * (2)-(12) - * </pre> - * - * Usage: - * {{{ - * EnumTriangleBasic <edge path> <result path> - * }}} - * <br> - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]] - * - * This example shows how to use: - * - * - Custom Java objects which extend Tuple - * - Group Sorting - * - */ -object EnumTrianglesBasic { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val edges = getEdgeDataSet(env) - - // project edges by vertex id - val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) ) - - val triangles = edgesById - // build triads - .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) - // filter triads - .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } - .withForwardedFieldsFirst("*") - - // emit result - if (fileOutput) { - triangles.writeAsCsv(outputPath, "\n", ",") - // execute program - env.execute("TriangleEnumeration Example") - } else { - triangles.print() - } - - - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Edge(v1: Int, v2: Int) extends Serializable - case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable - - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex - * of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes - * that input edges share the first vertex and are in ascending order of the second vertex. - */ - @ForwardedFields(Array("v1->v1")) - class TriadBuilder extends GroupReduceFunction[Edge, Triad] { - - val vertices = mutable.MutableList[Integer]() - - override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { - - // clear vertex list - vertices.clear() - - // build and emit triads - for(e <- edges.asScala) { - - // combine vertex with all previously read vertices - for(v <- vertices) { - out.collect(Triad(e.v1, v, e.v2)) - } - vertices += e.v2 - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 2) { - edgePath = args(0) - outputPath = args(1) - - true - } else { - System.err.println("Usage: EnumTriangleBasic <edge path> <result path>") - - false - } - } else { - System.out.println("Executing Enum Triangles Basic example with built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") - - true - } - } - - private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { - if (fileOutput) { - env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1)) - } else { - val edges = EnumTrianglesData.EDGES.map { - case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) - } - env.fromCollection(edges) - } - } - - - private var fileOutput: Boolean = false - private var edgePath: String = null - private var outputPath: String = null - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala deleted file mode 100644 index ad7e3a4..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields -import org.apache.flink.api.scala._ -import scala.collection.JavaConverters._ -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.util.Collector -import org.apache.flink.examples.java.graph.util.EnumTrianglesData -import org.apache.flink.api.common.operators.Order -import scala.collection.mutable.MutableList - -import scala.collection.mutable - - -/** - * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. - * A triangle consists of three edges that connect three vertices with each other. - * - * The basic algorithm works as follows: - * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices - * that are connected by two edges. Finally, all triads are filtered for which no third edge exists - * that closes the triangle. - * - * For a group of ''i'' edges that share a common vertex, the number of built triads is quadratic - * ''(n*(n-1))/2)''. Therefore, an optimization of the algorithm is to group edges on the vertex - * with the smaller output degree to reduce the number of triads. - * This implementation extends the basic algorithm by computing output degrees of edge vertices and - * grouping on edges on the vertex with the smaller degree. - * - * Input files are plain text files and must be formatted as follows: - * - * - Edges are represented as pairs for vertex IDs which are separated by space - * characters. Edges are separated by new-line characters. - * For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12), - * (1)-(12), and (42)-(63) that include a triangle - * - * <pre> - * (1) - * / \ - * (2)-(12) - * </pre> - * - * Usage: - * {{{ - * EnumTriangleOpt <edge path> <result path> - * }}} - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]. - * - * This example shows how to use: - * - * - Custom Java objects which extend Tuple - * - Group Sorting - * - */ -object EnumTrianglesOpt { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val edges = getEdgeDataSet(env) - - val edgesWithDegrees = edges - // duplicate and switch edges - .flatMap(e => Seq(e, Edge(e.v2, e.v1))) - // add degree of first vertex - .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new DegreeCounter()) - // join degrees of vertices - .groupBy("v1", "v2").reduce { - (e1, e2) => - if (e1.d2 == 0) { - new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2) - } else { - new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2) - } - }.withForwardedFields("v1;v2") - - // project edges by degrees, vertex with smaller degree comes first - val edgesByDegree = edgesWithDegrees - .map(e => if (e.d1 <= e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1)) - // project edges by Id, vertex with smaller Id comes first - val edgesById = edgesByDegree - .map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1)) - - val triangles = edgesByDegree - // build triads - .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) - // filter triads - .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t} - .withForwardedFieldsFirst("*") - - // emit result - if (fileOutput) { - triangles.writeAsCsv(outputPath, "\n", ",") - // execute program - env.execute("TriangleEnumeration Example") - } else { - triangles.print() - } - - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Edge(v1: Int, v2: Int) extends Serializable - - case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable - - case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable - - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Counts the number of edges that share a common vertex. - * Emits one edge for each input edge with a degree annotation for the shared vertex. - * For each emitted edge, the first vertex is the vertex with the smaller id. - */ - class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] { - - val vertices = mutable.MutableList[Integer]() - var groupVertex = 0 - - override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = { - - // empty vertex list - vertices.clear() - - // collect all vertices - for (e <- edges.asScala) { - groupVertex = e.v1 - if (!vertices.contains(e.v2) && e.v1 != e.v2) { - vertices += e.v2 - } - } - - // count vertices to obtain degree of groupVertex - val degree = vertices.length - - // create and emit edges with degrees - for (v <- vertices) { - if (v < groupVertex) { - out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree)) - } else { - out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0)) - } - } - } - } - - /** - * Builds triads (triples of vertices) from pairs of edges that share a vertex. - * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by - * vertexId. - * Assumes that input edges share the first vertex and are in ascending order of the second - * vertex. - */ - @ForwardedFields(Array("v1")) - class TriadBuilder extends GroupReduceFunction[Edge, Triad] { - - val vertices = mutable.MutableList[Integer]() - - override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { - - // clear vertex list - vertices.clear() - - // build and emit triads - for (e <- edges.asScala) { - // combine vertex with all previously read vertices - for (v <- vertices) { - out.collect(Triad(e.v1, v, e.v2)) - } - vertices += e.v2 - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 2) { - edgePath = args(0) - outputPath = args(1) - - true - } else { - System.err.println("Usage: EnumTriangleOpt <edge path> <result path>") - - false - } - } else { - System.out.println("Executing Enum Triangles Optimized example with built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") - - true - } - } - - private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { - if (fileOutput) { - env.readCsvFile[Edge]( - edgePath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - } else { - val edges = EnumTrianglesData.EDGES.map { - case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])} - env.fromCollection(edges) - } - } - - - private var fileOutput: Boolean = false - private var edgePath: String = null - private var outputPath: String = null - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala deleted file mode 100644 index e1d4af6..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import java.lang.Iterable - -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.scala._ -import org.apache.flink.examples.java.graph.util.PageRankData -import org.apache.flink.api.java.aggregation.Aggregations.SUM - -import org.apache.flink.util.Collector - -import scala.collection.JavaConverters._ - -/** - * A basic implementation of the Page Rank algorithm using a bulk iteration. - * - * This implementation requires a set of pages and a set of directed links as input and works as - * follows. - * - * In each iteration, the rank of every page is evenly distributed to all pages it points to. Each - * page collects the partial ranks of all pages that point to it, sums them up, and applies a - * dampening factor to the sum. The result is the new rank of the page. A new iteration is started - * with the new ranks of all pages. This implementation terminates after a fixed number of - * iterations. This is the Wikipedia entry for the - * [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] - * - * Input files are plain text files and must be formatted as follows: - * - * - Pages represented as an (long) ID separated by new-line characters. - * For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63. - * - Links are represented as pairs of page IDs which are separated by space characters. Links - * are separated by new-line characters. - * For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12), - * (1)->(12), and (42)->(63). For this simple implementation it is required that each page has - * at least one incoming and one outgoing link (a page can point to itself). - * - * Usage: - * {{{ - * PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> - * }}} - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. - * - * This example shows how to use: - * - * - Bulk Iterations - * - Default Join - * - Configure user-defined functions using constructor parameters. - * - */ -object PageRankBasic { - - private final val DAMPENING_FACTOR: Double = 0.85 - private final val EPSILON: Double = 0.0001 - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val pages = getPagesDataSet(env) - val links = getLinksDataSet(env) - - // assign initial ranks to pages - val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId") - - // build adjacency list from link input - val adjacencyLists = links - .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { - override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = { - var outputId = -1L - val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } - out.collect(new AdjacencyList(outputId, outputList.toArray)) - } - }) - - // start iteration - val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { - currentRanks => - val newRanks = currentRanks - // distribute ranks to target pages - .join(adjacencyLists).where("pageId").equalTo("sourceId") { - (page, adjacent, out: Collector[Page]) => - val targets = adjacent.targetIds - val len = targets.length - adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) } - } - // collect ranks and sum them up - .groupBy("pageId").aggregate(SUM, "rank") - // apply dampening factor - .map { p => - Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) - }.withForwardedFields("pageId") - - // terminate if no rank update was significant - val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { - (current, next, out: Collector[Int]) => - // check for significant update - if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) - } - (newRanks, termination) - } - - val result = finalRanks - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", " ") - // execute program - env.execute("Basic PageRank Example") - } else { - result.print() - } - } - - // ************************************************************************* - // USER TYPES - // ************************************************************************* - - case class Link(sourceId: Long, targetId: Long) - - case class Page(pageId: Long, rank: Double) - - case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 5) { - pagesInputPath = args(0) - linksInputPath = args(1) - outputPath = args(2) - numPages = args(3).toLong - maxIterations = args(4).toInt - - true - } else { - System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - - false - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in " + - "default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - - numPages = PageRankData.getNumberOfPages - - true - } - } - - private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { - if (fileOutput) { - env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") - .map(x => x._1) - } else { - env.generateSequence(1, 15) - } - } - - private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { - if (fileOutput) { - env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", - includedFields = Array(0, 1)) - } else { - val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], - v2.asInstanceOf[Long])} - env.fromCollection(edges) - } - } - - private var fileOutput: Boolean = false - private var pagesInputPath: String = null - private var linksInputPath: String = null - private var outputPath: String = null - private var numPages: Long = 0 - private var maxIterations: Int = 10 - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala deleted file mode 100644 index 3de0f2e..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.graph - -import org.apache.flink.api.scala._ -import org.apache.flink.examples.java.graph.util.ConnectedComponentsData -import org.apache.flink.util.Collector - -object TransitiveClosureNaive { - - def main (args: Array[String]): Unit = { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - - val edges = getEdgesDataSet(env) - - val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] => - - val nextPaths = prevPaths - .join(edges) - .where(1).equalTo(0) { - (left, right) => (left._1,right._2) - }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2") - .union(prevPaths) - .groupBy(0, 1) - .reduce((l, r) => l).withForwardedFields("_1; _2") - - val terminate = prevPaths - .coGroup(nextPaths) - .where(0).equalTo(0) { - (prev, next, out: Collector[(Long, Long)]) => { - val prevPaths = prev.toSet - for (n <- next) - if (!prevPaths.contains(n)) out.collect(n) - } - }.withForwardedFieldsSecond("*") - (nextPaths, terminate) - } - - if (fileOutput) { - paths.writeAsCsv(outputPath, "\n", " ") - env.execute("Scala Transitive Closure Example") - } else { - paths.print() - } - - - - } - - - private var fileOutput: Boolean = false - private var edgesPath: String = null - private var outputPath: String = null - private var maxIterations: Int = 10 - - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { - fileOutput = true - if (programArguments.length == 3) { - edgesPath = programArguments(0) - outputPath = programArguments(1) - maxIterations = Integer.parseInt(programArguments(2)) - } - else { - System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of " + - "iterations>") - return false - } - } - else { - System.out.println("Executing TransitiveClosure example with default parameters and " + - "built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of " + - "iterations>") - } - true - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { x => (x._1, x._2)} - } - else { - val edgeData = ConnectedComponentsData.EDGES map { - case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) - } - env.fromCollection(edgeData) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala deleted file mode 100644 index 3453ee8..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala.misc - -import org.apache.flink.api.scala._ - -object PiEstimation { - - def main(args: Array[String]) { - - val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000 - - val env = ExecutionEnvironment.getExecutionEnvironment - - // count how many of the samples would randomly fall into - // the upper right quadrant of the unit circle - val count = - env.generateSequence(1, numSamples) - .map { sample => - val x = Math.random() - val y = Math.random() - if (x * x + y * y < 1) 1L else 0L - } - .reduce(_ + _) - - // ratio of samples in upper right quadrant vs total samples gives surface of upper - // right quadrant, times 4 gives surface of whole unit circle, i.e. PI - val pi = count - .map ( _ * 4.0 / numSamples) - - println("We estimate Pi to be:") - - pi.print() - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala deleted file mode 100644 index 2a7b786..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala.ml - -import org.apache.flink.api.common.functions._ -import org.apache.flink.api.scala._ -import org.apache.flink.configuration.Configuration -import org.apache.flink.examples.java.ml.util.LinearRegressionData - -import scala.collection.JavaConverters._ - -/** - * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem - * using batch gradient descent algorithm. - * - * Linear Regression with BGD(batch gradient descent) algorithm is an iterative algorithm and - * works as follows: - * - * Giving a data set and target set, the BGD try to find out the best parameters for the data set - * to fit the target set. - * In each iteration, the algorithm computes the gradient of the cost function and use it to - * update all the parameters. - * The algorithm terminates after a fixed number of iterations (as in this implementation). - * With enough iteration, the algorithm can minimize the cost function and find the best parameters - * This is the Wikipedia entry for the - * [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and - * [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]]. - * - * This implementation works on one-dimensional data and finds the best two-dimensional theta to - * fit the target. - * - * Input files are plain text files and must be formatted as follows: - * - * - Data points are represented as two double values separated by a blank character. The first - * one represent the X(the training data) and the second represent the Y(target). Data points are - * separated by newline characters. - * For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points - * (x=-0.02, y=-0.04) and (x=5.3, y=10.6). - * - * This example shows how to use: - * - * - Bulk iterations - * - Broadcast variables in bulk iterations - */ -object LinearRegression { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val data = getDataSet(env) - val parameters = getParamsDataSet(env) - - val result = parameters.iterate(numIterations) { currentParameters => - val newParameters = data - .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters") - .reduce { (p1, p2) => - val result = p1._1 + p2._1 - (result, p1._2 + p2._2) - } - .map { x => x._1.div(x._2) } - newParameters - } - - if (fileOutput) { - result.writeAsText(outputPath) - env.execute("Scala Linear Regression example") - } - else { - result.print() - } - } - - /** - * A simple data sample, x means the input, and y means the target. - */ - case class Data(var x: Double, var y: Double) - - /** - * A set of parameters -- theta0, theta1. - */ - case class Params(theta0: Double, theta1: Double) { - def div(a: Int): Params = { - Params(theta0 / a, theta1 / a) - } - - def + (other: Params) = { - Params(theta0 + other.theta0, theta1 + other.theta1) - } - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Compute a single BGD type update for every parameters. - */ - class SubUpdate extends RichMapFunction[Data, (Params, Int)] { - - private var parameter: Params = null - - /** Reads the parameters from a broadcast variable into a collection. */ - override def open(parameters: Configuration) { - val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala - parameter = parameters.head - } - - def map(in: Data): (Params, Int) = { - val theta0 = - parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) - val theta1 = - parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x) - (Params(theta0, theta1), 1) - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - private var fileOutput: Boolean = false - private var dataPath: String = null - private var outputPath: String = null - private var numIterations: Int = 10 - - private def parseParameters(programArguments: Array[String]): Boolean = { - if (programArguments.length > 0) { - fileOutput = true - if (programArguments.length == 3) { - dataPath = programArguments(0) - outputPath = programArguments(1) - numIterations = programArguments(2).toInt - - true - } - else { - System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>") - - false - } - } - else { - System.out.println("Executing Linear Regression example with default parameters and " + - "built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" We provide a data generator to create synthetic input files for this " + - "program.") - System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>") - - true - } - } - - private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = { - if (fileOutput) { - env.readCsvFile[(Double, Double)]( - dataPath, - fieldDelimiter = " ", - includedFields = Array(0, 1)) - .map { t => new Data(t._1, t._2) } - } - else { - val data = LinearRegressionData.DATA map { - case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(data) - } - } - - private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = { - val params = LinearRegressionData.PARAMS map { - case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) - } - env.fromCollection(params) - } -}