Hi,I'm getting the following RuntimeException for an adaptation of the SingleSourceShortestPaths example using the Gelly API (see attachment). It's been adapted for unweighted graphs having vertices with Long values.
As an input graph I'm using the social network graph (~200MB unpacked) from here: https://snap.stanford.edu/data/higgs-twitter.html
For the small SSSPDataUnweighted graph (also attached) it terminates and computes the distances correctly.
03/16/2015 17:18:23 IterationHead(WorksetIteration (Vertex-centric iteration (org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted$VertexDistanceUpdater@dca6fe4 | org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted$MinDistanceMessenger@6577e8ce)))(2/4) switched to FAILED java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 5 maxPartition: 8 number of overflow segments: 176 bucketSize: 217 Overall memory: 20316160 Partition memory: 7208960 Message: Index: 8, Size: 7 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insert(CompactingHashTable.java:390) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTable(CompactingHashTable.java:337) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:216) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:278) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205)
at java.lang.Thread.run(Thread.java:745) Best, Mihail
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.SingleSourceShortestPathsDataUnweighted; import org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted; import org.apache.flink.types.NullValue; public class SingleSourceShortestPathsExampleUnweighted implements ProgramDescription { public static void main(String[] args) throws Exception { if (!parseParameters(args)) { return; } //Long tempLong = Long.MAX_VALUE; //System.out.println("DEBUG Long.MAX_VALUE: " + tempLong); System.out.println("DEBUG srcVertexId: " + srcVertexId); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Vertex<Long, Long>> vertices = getVerticesDataSet(env); DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env); DataSet<Vertex<Long, Long>> singleSourceShortestPathsUnweighted = graph .run(new SingleSourceShortestPathsUnweighted<Long>(srcVertexId, maxIterations)).getVertices(); // emit result if (fileOutput) { singleSourceShortestPathsUnweighted.writeAsCsv(outputPath, "\n", " "); } else { singleSourceShortestPathsUnweighted.print(); } env.execute("Single Source Shortest Paths Example"); } @Override public String getDescription() { return "Single Source Shortest Paths"; } // ****************************************************************************************************************** // UTIL METHODS // ****************************************************************************************************************** private static boolean fileOutput = false; private static Long srcVertexId = null; private static String verticesInputPath = null; private static String edgesInputPath = null; private static String outputPath = null; private static int maxIterations = 5; private static boolean parseParameters(String[] args) { if (args.length > 0) { if (args.length == 5) { fileOutput = true; srcVertexId = Long.parseLong(args[0]); verticesInputPath = args[1]; edgesInputPath = args[2]; outputPath = args[3]; maxIterations = Integer.parseInt(args[4]); } else { System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + " <input vertices path> <input edges path> <output path> <num iterations>"); return false; } } return true; } @SuppressWarnings("serial") private static DataSet<Vertex<Long, Long>> getVerticesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(verticesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class, Long.class) .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() { @Override public Vertex<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception { return new Vertex<Long, Long>(tuple2.f0, tuple2.f1); } }); } else { System.err.println("Usage: SingleSourceShortestPathsUnweighted <source vertex id>" + " <input vertices path> <input edges path> <output path> <num iterations>"); return SingleSourceShortestPathsDataUnweighted.getDefaultVertexDataSet(env); } } @SuppressWarnings("serial") private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(edgesInputPath) .fieldDelimiter(" ") .lineDelimiter("\n") .types(Long.class, Long.class) .map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() { @Override public Edge<Long, NullValue> map(Tuple2<Long, Long> tuple2) throws Exception { return new Edge<Long, NullValue>(tuple2.f0, tuple2.f1, NullValue.getInstance()); } }); } else { System.err.println("Usage: SingleSourceShortestPathsUnweighted <source vertex id>" + " <input vertices path> <input edges path> <output path> <num iterations>"); return SingleSourceShortestPathsDataUnweighted.getDefaultEdgeDataSet(env); } } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.graph.library; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import java.io.Serializable; @SuppressWarnings("serial") public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> { private final K srcVertexId; private final Integer maxIterations; public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) { this.srcVertexId = srcVertexId; this.maxIterations = maxIterations; } @Override public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId)); VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration( new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations); return mappedInput.runVertexCentricIteration(iteration); } public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> implements MapFunction<Vertex<K, Double>, Double> { private K srcVertexId; public InitVerticesMapper(K srcId) { this.srcVertexId = srcId; } public Double map(Vertex<K, Double> value) { if (value.f0.equals(srcVertexId)) { return 0.0; } else { return Double.MAX_VALUE; } } } /** * Function that updates the value of a vertex by picking the minimum * distance from all incoming messages. * * @param <K> */ public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable> extends VertexUpdateFunction<K, Double, Double> { @Override public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double> inMessages) { Double minDistance = Double.MAX_VALUE; for (double msg : inMessages) { if (msg < minDistance) { minDistance = msg; } } if (vertexValue > minDistance) { setNewVertexValue(minDistance); } } } /** * Distributes the minimum distance associated with a given vertex among all * the target vertices summed up with the edge's value. * * @param <K> */ public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable> extends MessagingFunction<K, Double, Double, Double> { @Override public void sendMessages(K vertexKey, Double newDistance) throws Exception { for (Edge<K, Double> edge : getOutgoingEdges()) { sendMessageTo(edge.getTarget(), newDistance + edge.getValue()); } } } }
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.graph.example.utils; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Vertex; import org.apache.flink.types.NullValue; import java.util.ArrayList; import java.util.List; public class SingleSourceShortestPathsDataUnweighted { public static final int NUM_VERTICES = 5; public static final Long SRC_VERTEX_ID = 1L; public static final String VERTICES = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5"; public static DataSet<Vertex<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env) { List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); vertices.add(new Vertex<Long, Long>(1L, 1L)); vertices.add(new Vertex<Long, Long>(2L, 2L)); vertices.add(new Vertex<Long, Long>(3L, 3L)); vertices.add(new Vertex<Long, Long>(4L, 4L)); vertices.add(new Vertex<Long, Long>(5L, 5L)); return env.fromCollection(vertices); } public static final String EDGES = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1"; public static final DataSource<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) { List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>(); edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance())); edges.add(new Edge<Long, NullValue>(5L, 1L, NullValue.getInstance())); return env.fromCollection(edges); } public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0\n" + "2,1\n" + "3,1\n" + "4,2\n" + "5,2"; private SingleSourceShortestPathsDataUnweighted() {} }