[FLINK-1201] [gelly] use vertex/edge instead of tuples in the test fixed a few warnings in pagerank example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60363727 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60363727 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60363727 Branch: refs/heads/master Commit: 60363727ccf276e804defec4ddcc013085dfbd1b Parents: cac0e1d Author: vasia <vasilikikala...@gmail.com> Authored: Tue Dec 9 20:18:52 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:12 2015 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/graph/Graph.java | 5 ++--- .../apache/flink/graph/example/PageRankExample.java | 16 +++++----------- .../apache/flink/graph/test/TestForeachEdge.java | 11 +++++------ 3 files changed, 12 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/60363727/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 2cb50f5..9433eb2 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -10,8 +10,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,K extends Serializablr & Comparable, - VV implements Serializable, + * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. @@ -815,7 +814,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return Graph.create(edges, mapper, env); } - public Graph<K,VV,EV> run (GraphAlgorithm algorithm) { + public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) { return algorithm.run(this); } http://git-wip-us.apache.org/repos/asf/flink/blob/60363727/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java index 48a7b15..0fc8084 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java @@ -3,20 +3,13 @@ package flink.graphs.example; import flink.graphs.*; import flink.graphs.library.PageRank; + import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; -import java.util.Collection; -import java.util.List; - - public class PageRankExample implements ProgramDescription { public static void main (String [] args) throws Exception { @@ -27,7 +20,7 @@ public class PageRankExample implements ProgramDescription { DataSet<Edge<Long,Double>> links = getLinksDataSet(env); - Graph<Long, Double, Double> network = new Graph(pages, links, env); + Graph<Long, Double, Double> network = new Graph<Long, Double, Double>(pages, links, env); DataSet<Vertex<Long,Double>> pageRanks = network.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices(); @@ -46,8 +39,8 @@ public class PageRankExample implements ProgramDescription { private static long numPages = 10; private static int maxIterations = 10; - - private static DataSet<Vertex<Long,Double>> getPagesDataSet(ExecutionEnvironment env) { + @SuppressWarnings("serial") + private static DataSet<Vertex<Long,Double>> getPagesDataSet(ExecutionEnvironment env) { return env.generateSequence(1, numPages) .map(new MapFunction<Long, Vertex<Long, Double>>() { @Override @@ -58,6 +51,7 @@ public class PageRankExample implements ProgramDescription { } + @SuppressWarnings("serial") private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) { return env.generateSequence(1, numPages) .flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() { http://git-wip-us.apache.org/repos/asf/flink/blob/60363727/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestForeachEdge.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestForeachEdge.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestForeachEdge.java index 4e17cc6..34f121f 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestForeachEdge.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestForeachEdge.java @@ -8,7 +8,6 @@ import java.util.LinkedList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; import org.junit.runner.RunWith; @@ -82,13 +81,13 @@ public class TestForeachEdge extends JavaProgramTestBase { long weight = Long.MAX_VALUE; long minNeighorId = 0; - for (Tuple3<Long, Long, Long> edge: outEdges) { - if (edge.f2 < weight) { - weight = edge.f2; - minNeighorId = edge.f1; + for (Edge<Long, Long> edge: outEdges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getTarget(); } } - return new Tuple2<Long, Long>(v.f0, minNeighorId); + return new Tuple2<Long, Long>(v.getId(), minNeighorId); } }); verticesWithLowestOutNeighbor.writeAsCsv(resultPath);