[FLINK-1201] [gelly] fix ClassCastException and Type errors in mapVertices; fixes #41 and #46
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b751df28 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b751df28 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b751df28 Branch: refs/heads/master Commit: b751df28863af4a6216d967067dbb6f3729b66da Parents: 3cf734f Author: vasia <vasilikikala...@gmail.com> Authored: Tue Jan 6 20:54:25 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 51 ++++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b751df28/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 03fbf94..1cd5c90 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -106,7 +106,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return a new graph */ public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) { - DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper)); + TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); + DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper, + keyType)); return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context); } @@ -115,19 +117,24 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab <Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>> { private MapFunction<Vertex<K, VV>, NV> innerMapper; - public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper) { + private transient TypeInformation<K> keyType; + public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper, TypeInformation<K> keyType) { this.innerMapper = theMapper; + this.keyType = keyType; } public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { return new Vertex<K, NV>(value.f0, innerMapper.map(value)); } + @SuppressWarnings("unchecked") @Override public TypeInformation<Vertex<K, NV>> getProducedType() { - return new TupleTypeInfo<Vertex<K, NV>>( - ((TupleTypeInfo<?>)(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null))).getTypeAt(0), - TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null)); + TypeInformation<NV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); + @SuppressWarnings("rawtypes") + TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType); + return (TypeInformation<Vertex<K, NV>>) returnType; } } @@ -137,7 +144,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return */ public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { - DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K, EV, NV>(mapper)); + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); + DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K, EV, NV>(mapper, + keyType)); return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); } @@ -146,21 +155,25 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab <Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>> { private MapFunction<Edge<K, EV>, NV> innerMapper; + private transient TypeInformation<K> keyType; - public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper) { + public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper, TypeInformation<K> keyType) { this.innerMapper = theMapper; + this.keyType = keyType; } public Edge<K, NV> map(Edge<K, EV> value) throws Exception { return new Edge<K, NV>(value.f0, value.f1, innerMapper.map(value)); } + @SuppressWarnings("unchecked") @Override public TypeInformation<Edge<K, NV>> getProducedType() { - TypeInformation<K> keyType = ((TupleTypeInfo<?>) - (TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null))).getTypeAt(0); - return new TupleTypeInfo<Edge<K, NV>>(keyType, keyType, - TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null)); + TypeInformation<NV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); + @SuppressWarnings("rawtypes") + TypeInformation<?> returnType = new TupleTypeInfo<Edge>(Edge.class, keyType, keyType, valueType); + return (TypeInformation<Edge<K, NV>>) returnType; } } @@ -604,9 +617,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); DataSet<Vertex<K, VV>> vertices = edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) - .distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper)); + .distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType)); return new Graph<K, VV, EV>(vertices, edges, context); } @@ -615,20 +629,25 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab <Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>> { private MapFunction<K, VV> innerMapper; + private transient TypeInformation<K> keyType; - public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper) { + public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper, TypeInformation<K> keyType) { this.innerMapper = theMapper; + this.keyType = keyType; } public Vertex<K, VV> map(Tuple1<K> value) throws Exception { return new Vertex<K, VV>(value.f0, innerMapper.map(value.f0)); } + @SuppressWarnings("unchecked") @Override public TypeInformation<Vertex<K, VV>> getProducedType() { - return new TupleTypeInfo<Vertex<K, VV>>( - TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, null, null), - TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null)); + TypeInformation<VV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); + @SuppressWarnings("rawtypes") + TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType); + return (TypeInformation<Vertex<K, VV>>) returnType; } }