http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index 57045a1..db87eb9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -52,7 +52,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> { .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(1) .equalTo(0) - .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>()) + .with(new JoinEdgeWithVertexDegree<>()) .setParallelism(parallelism) .name("Edge target degrees"); }
http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 06a7fd2..a703789 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -93,18 +93,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { throws Exception { // s, t, bitmask DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges() - .flatMap(new EmitAndFlipEdge<K, EV>()) + .flatMap(new EmitAndFlipEdge<>()) .setParallelism(parallelism) .name("Emit and flip edge") .groupBy(0, 1) - .reduceGroup(new ReduceBitmask<K>()) + .reduceGroup(new ReduceBitmask<>()) .setParallelism(parallelism) .name("Reduce bitmask"); // s, d(s) DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder .groupBy(0) - .reduceGroup(new DegreeCount<K>()) + .reduceGroup(new DegreeCount<>()) .setParallelism(parallelism) .name("Degree count"); @@ -113,7 +113,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> { .leftOuterJoin(vertexDegrees) .where(0) .equalTo(0) - .with(new JoinVertexWithVertexDegrees<K, VV>()) + .with(new JoinVertexWithVertexDegrees<>()) .setParallelism(parallelism) .name("Zero degree vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index dc071cf..38c7995 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // t DataSet<Vertex<K, LongValue>> targetIds = input .getEdges() - .map(new MapEdgeToTargetId<K, EV>()) + .map(new MapEdgeToTargetId<>()) .setParallelism(parallelism) .name("Edge to target ID"); // t, d(t) DataSet<Vertex<K, LongValue>> targetDegree = targetIds .groupBy(0) - .reduce(new DegreeCount<K>()) + .reduce(new DegreeCount<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); @@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .leftOuterJoin(targetDegree) .where(0) .equalTo(0) - .with(new JoinVertexWithVertexDegree<K, VV>()) + .with(new JoinVertexWithVertexDegree<>()) .setParallelism(parallelism) .name("Zero degree vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index 4a4689b..ef9c781 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // s DataSet<Vertex<K, LongValue>> sourceIds = input .getEdges() - .map(new MapEdgeToSourceId<K, EV>()) + .map(new MapEdgeToSourceId<>()) .setParallelism(parallelism) .name("Edge to source ID"); // s, d(s) DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds .groupBy(0) - .reduce(new DegreeCount<K>()) + .reduce(new DegreeCount<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); @@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .leftOuterJoin(sourceDegree) .where(0) .equalTo(0) - .with(new JoinVertexWithVertexDegree<K, VV>()) + .with(new JoinVertexWithVertexDegree<>()) .setParallelism(parallelism) .name("Zero degree vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index ff4285f..6825295 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -89,7 +89,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(1) .equalTo(0) - .with(new JoinEdgeDegreeWithVertexDegree<K, EV, LongValue>()) + .with(new JoinEdgeDegreeWithVertexDegree<>()) .setParallelism(parallelism) .name("Edge target degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index bd8ce3d..3fe05d9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(0) .equalTo(0) - .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>()) + .with(new JoinEdgeWithVertexDegree<>()) .setParallelism(parallelism) .name("Edge source degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index cb18d2c..6020ba3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND) .where(1) .equalTo(0) - .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>()) + .with(new JoinEdgeWithVertexDegree<>()) .setParallelism(parallelism) .name("Edge target degree"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index d2fad18..fee58a3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -103,7 +103,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input) throws Exception { MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ? - new MapEdgeToTargetId<K, EV>() : new MapEdgeToSourceId<K, EV>(); + new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>(); // v DataSet<Vertex<K, LongValue>> vertexIds = input @@ -115,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { // v, deg(v) DataSet<Vertex<K, LongValue>> degree = vertexIds .groupBy(0) - .reduce(new DegreeCount<K>()) + .reduce(new DegreeCount<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Degree count"); @@ -126,7 +126,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> { .leftOuterJoin(degree) .where(0) .equalTo(0) - .with(new JoinVertexWithVertexDegree<K, VV>()) + .with(new JoinVertexWithVertexDegree<>()) .setParallelism(parallelism) .name("Zero degree vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index 522d39c..41dc64b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -138,7 +138,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // u, d(u) if d(u) > maximumDegree DataSet<Tuple1<K>> highDegreeVertices = vertexDegree - .flatMap(new DegreeFilter<K>(maximumDegree)) + .flatMap(new DegreeFilter<>(maximumDegree)) .setParallelism(parallelism) .name("Filter high-degree vertices"); @@ -150,7 +150,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { .leftOuterJoin(highDegreeVertices, joinHint) .where(0) .equalTo(0) - .with(new ProjectVertex<K, VV>()) + .with(new ProjectVertex<>()) .setParallelism(parallelism) .name("Project low-degree vertices"); @@ -160,13 +160,13 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { .leftOuterJoin(highDegreeVertices, joinHint) .where(reduceOnTargetId.get() ? 1 : 0) .equalTo(0) - .with(new ProjectEdge<K, EV>()) + .with(new ProjectEdge<>()) .setParallelism(parallelism) .name("Project low-degree edges by " + (reduceOnTargetId.get() ? "target" : "source")) .leftOuterJoin(highDegreeVertices, joinHint) .where(reduceOnTargetId.get() ? 0 : 1) .equalTo(0) - .with(new ProjectEdge<K, EV>()) + .with(new ProjectEdge<>()) .setParallelism(parallelism) .name("Project low-degree edges by " + (reduceOnTargetId.get() ? "source" : "target")); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 511840a..0d4fa1e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -41,7 +41,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // Edges DataSet<Edge<K, EV>> edges = input .getEdges() - .filter(new RemoveSelfLoops<K, EV>()) + .filter(new RemoveSelfLoops<>()) .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index 21db233..f00a162 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -71,7 +71,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { // Edges DataSet<Edge<K, EV>> edges = input .getEdges() - .flatMap(new SymmetrizeAndRemoveSelfLoops<K, EV>(clipAndFlip)) + .flatMap(new SymmetrizeAndRemoveSelfLoops<>(clipAndFlip)) .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java index 4cb4e01..6dcf766 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java @@ -88,7 +88,7 @@ public class Translate { TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); return vertices - .map(new TranslateVertexId<OLD, NEW, VV>(translator)) + .map(new TranslateVertexId<>(translator)) .returns(returnType) .setParallelism(parallelism) .name("Translate vertex IDs"); @@ -172,7 +172,7 @@ public class Translate { TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType); return edges - .map(new TranslateEdgeId<OLD, NEW, EV>(translator)) + .map(new TranslateEdgeId<>(translator)) .returns(returnType) .setParallelism(parallelism) .name("Translate edge IDs"); @@ -257,7 +257,7 @@ public class Translate { TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType); return vertices - .map(new TranslateVertexValue<K, OLD, NEW>(translator)) + .map(new TranslateVertexValue<>(translator)) .returns(returnType) .setParallelism(parallelism) .name("Translate vertex values"); @@ -341,7 +341,7 @@ public class Translate { TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType); return edges - .map(new TranslateEdgeValue<K, OLD, NEW>(translator)) + .map(new TranslateEdgeValue<>(translator)) .returns(returnType) .setParallelism(parallelism) .name("Translate edge values"); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java index 029e2c4..97c93e2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java @@ -128,7 +128,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> { DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges) .where(1) .equalTo(1) - .with(new ProjectionTopSimple<KT, KB, EV>()) + .with(new ProjectionTopSimple<>()) .name("Simple top projection"); return Graph.fromDataSet(topVertices, newEdges, context); @@ -172,7 +172,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> { DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges = edges.join(edges) .where(0) .equalTo(0) - .with(new ProjectionBottomSimple<KT, KB, EV>()) + .with(new ProjectionBottomSimple<>()) .name("Simple bottom projection"); return Graph.fromDataSet(bottomVertices, newEdges, context); @@ -218,7 +218,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> { DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices) .where(1) .equalTo(1) - .with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>()) + .with(new ProjectionTopFull<>()) .name("Full top projection"); return Graph.fromDataSet(topVertices, newEdges, context); @@ -284,7 +284,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> { DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = edgesWithVertices.join(edgesWithVertices) .where(0) .equalTo(0) - .with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>()) + .with(new ProjectionBottomFull<>()) .name("Full bottom projection"); return Graph.fromDataSet(bottomVertices, newEdges, context); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java index fca9d8b..d5a70f3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java @@ -91,7 +91,7 @@ public class GraphGeneratorUtils { */ public static <K, EV> DataSet<Vertex<K, NullValue>> vertexSet(DataSet<Edge<K, EV>> edges, int parallelism) { DataSet<Vertex<K, NullValue>> vertexSet = edges - .flatMap(new EmitSrcAndTarget<K, EV>()) + .flatMap(new EmitSrcAndTarget<>()) .setParallelism(parallelism) .name("Emit source and target labels"); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java index d14d32c..1960aa3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java @@ -156,7 +156,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> { .rebalance() .setParallelism(parallelism) .name("Rebalance") - .flatMap(new GenerateEdges<T>(vertexCount, scale, a, b, c, noiseEnabled, noise)) + .flatMap(new GenerateEdges<>(vertexCount, scale, a, b, c, noiseEnabled, noise)) .setParallelism(parallelism) .name("RMat graph edges"); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java index 72e18ae..f09a890 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java @@ -58,7 +58,7 @@ public class GSAConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) { - this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVarsGather.add(new Tuple2<>(name, data)); } /** @@ -68,7 +68,7 @@ public class GSAConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSetForSumFunction(String name, DataSet<?> data) { - this.bcVarsSum.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVarsSum.add(new Tuple2<>(name, data)); } /** @@ -78,7 +78,7 @@ public class GSAConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSetForApplyFunction(String name, DataSet<?> data) { - this.bcVarsApply.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVarsApply.add(new Tuple2<>(name, data)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index 12d4977..5f04b70 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -169,24 +169,24 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati case OUT: neighbors = iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()); + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>()); break; case IN: neighbors = iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()); + .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<>()); break; case ALL: neighbors = iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>()).union(iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>())); + .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<>())); break; default: neighbors = iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()); + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<>()); break; } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java index 9846286..ccf2bb1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java @@ -73,14 +73,14 @@ public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Gr public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) { DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices() - .map(new AddScoreToVertexValuesMapper<K>()); + .map(new AddScoreToVertexValuesMapper<>()); Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices = Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected(); - return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<K>(), - new VertexLabelUpdater<K>(delta), maxIterations) - .mapVertices(new RemoveScoreFromVertexValuesMapper<K>()); + return graphWithScoredVertices.runScatterGatherIteration(new LabelMessenger<>(), + new VertexLabelUpdater<>(delta), maxIterations) + .mapVertices(new RemoveScoreFromVertexValuesMapper<>()); } @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index a3110ab..5cb8abe 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -73,12 +72,12 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1); Graph<K, VV, NullValue> undirectedGraph = graph - .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) + .mapEdges(new MapTo<>(NullValue.getInstance())) .getUndirected(); return undirectedGraph.runScatterGatherIteration( - new CCMessenger<K, VV>(valueTypeInfo), - new CCUpdater<K, VV>(), + new CCMessenger<>(valueTypeInfo), + new CCUpdater<>(), maxIterations).getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index 37e5cab..230f88e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -74,13 +73,13 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1); Graph<K, VV, NullValue> undirectedGraph = graph - .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) + .mapEdges(new MapTo<>(NullValue.getInstance())) .getUndirected(); return undirectedGraph.runGatherSumApplyIteration( new GatherNeighborIds<>(valueTypeInfo), new SelectMinId<>(valueTypeInfo), - new UpdateComponentId<K, VV>(valueTypeInfo), + new UpdateComponentId<>(valueTypeInfo), maxIterations).getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java index 2d0b8da..28e9168 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java @@ -51,9 +51,9 @@ public class GSASingleSourceShortestPaths<K, VV> implements @Override public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) { - return input.mapVertices(new InitVerticesMapper<K, VV>(srcVertexId)) + return input.mapVertices(new InitVerticesMapper<>(srcVertexId)) .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), - new UpdateDistance<K>(), maxIterations) + new UpdateDistance<>(), maxIterations) .getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 1e700f4..880a67b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -77,9 +76,9 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV> TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1); // iteratively adopt the most frequent label among the neighbors of each vertex return input - .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance())) + .mapEdges(new MapTo<>(NullValue.getInstance())) .runScatterGatherIteration( - new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations) + new SendNewLabelToNeighbors<>(valueType), new UpdateVertexLabel<>(), maxIterations) .getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 15f0a84..8f41fa0 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -51,8 +51,8 @@ public class SingleSourceShortestPaths<K, VV> implements GraphAlgorithm<K, VV, D @Override public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) { - return input.mapVertices(new InitVerticesMapper<K, VV>(srcVertexId)) - .runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(), + return input.mapVertices(new InitVerticesMapper<>(srcVertexId)) + .runScatterGatherIteration(new MinDistanceMessenger<>(), new VertexDistanceUpdater<>(), maxIterations).getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java index 44ea988..a1498df 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java @@ -102,11 +102,11 @@ public class Summarization<K, VV, EV> // group vertices by value and create vertex group items DataSet<VertexGroupItem<K, VV>> vertexGroupItems = input.getVertices() .groupBy(1) - .reduceGroup(new VertexGroupReducer<K, VV>()); + .reduceGroup(new VertexGroupReducer<>()); // create super vertices DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems - .filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>()) - .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>()); + .filter(new VertexGroupItemToSummarizedVertexFilter<>()) + .map(new VertexGroupItemToSummarizedVertexMapper<>()); // ------------------------- // build super edges @@ -114,22 +114,22 @@ public class Summarization<K, VV, EV> // create mapping between vertices and their representative DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems - .filter(new VertexGroupItemToRepresentativeFilter<K, VV>()) - .map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>()); + .filter(new VertexGroupItemToRepresentativeFilter<>()) + .map(new VertexGroupItemToVertexWithRepresentativeMapper<>()); // join edges with vertex representatives and update source and target identifiers DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges() .join(vertexToRepresentativeMap) .where(0) // source vertex id .equalTo(0) // vertex id - .with(new SourceVertexJoinFunction<K, EV>()) + .with(new SourceVertexJoinFunction<>()) .join(vertexToRepresentativeMap) .where(1) // target vertex id .equalTo(0) // vertex id - .with(new TargetVertexJoinFunction<K, EV>()); + .with(new TargetVertexJoinFunction<>()); // create super edges DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping .groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2) - .reduceGroup(new EdgeGroupReducer<K, EV>()); + .reduceGroup(new EdgeGroupReducer<>()); return Graph.fromDataSet(summarizedVertices, summarizedEdges, input.getContext()); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java index 2ae6120..23f942c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java @@ -65,21 +65,21 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements DataSet<Edge<K, EV>> edges = input.getEdges(); // annotate edges with degrees - DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>()) - .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>()) - .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>()); + DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>()) + .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>()) + .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>()); // project edges by degrees - DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>()); + DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>()); // project edges by vertex id - DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>()); + DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>()); DataSet<Tuple3<K, K, K>> triangles = edgesByDegree // build triads .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING) - .reduceGroup(new TriadBuilder<K>()) + .reduceGroup(new TriadBuilder<>()) // filter triads - .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>()); + .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>()); return triangles; } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 981110f..55d3056 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -119,13 +119,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // u, edge count DataSet<Tuple2<K, LongValue>> triangleVertices = triangles - .flatMap(new SplitTriangles<K>()) + .flatMap(new SplitTriangles<>()) .name("Split triangle vertices"); // u, triangle count DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices .groupBy(0) - .reduce(new CountTriangles<K>()) + .reduce(new CountTriangles<>()) .setCombineHint(CombineHint.HASH) .name("Count triangles"); @@ -140,7 +140,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .leftOuterJoin(vertexTriangleCount) .where(0) .equalTo(0) - .with(new JoinVertexDegreeWithTriangleCount<K>()) + .with(new JoinVertexDegreeWithTriangleCount<>()) .setParallelism(parallelism) .name("Clustering coefficient"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 00b2210..52d3c10 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -84,11 +84,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { // u, v, bitmask where u < v DataSet<Tuple3<K, K, ByteValue>> filteredByID = input .getEdges() - .map(new OrderByID<K, EV>()) + .map(new OrderByID<>()) .setParallelism(parallelism) .name("Order by ID") .groupBy(0, 1) - .reduceGroup(new ReduceBitmask<K>()) + .reduceGroup(new ReduceBitmask<>()) .setParallelism(parallelism) .name("Flatten by ID"); @@ -99,11 +99,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { // u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees - .map(new OrderByDegree<K, EV>()) + .map(new OrderByDegree<>()) .setParallelism(parallelism) .name("Order by degree") .groupBy(0, 1) - .reduceGroup(new ReduceBitmask<K>()) + .reduceGroup(new ReduceBitmask<>()) .setParallelism(parallelism) .name("Flatten by degree"); @@ -111,7 +111,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree .groupBy(0) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new GenerateTriplets<K>()) + .reduceGroup(new GenerateTriplets<>()) .name("Generate triplets"); // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph @@ -119,16 +119,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { .join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND) .where(1, 2) .equalTo(0, 1) - .with(new ProjectTriangles<K>()) + .with(new ProjectTriangles<>()) .name("Triangle listing"); if (permuteResults) { triangles = triangles - .flatMap(new PermuteResult<K>()) + .flatMap(new PermuteResult<>()) .name("Permute triangle vertices"); } else if (sortTriangleVertices.get()) { triangles = triangles - .map(new SortTriangleVertices<K>()) + .map(new SortTriangleVertices<>()) .name("Sort triangle vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 0beb989..e1c7655 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -118,13 +118,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // u, 1 DataSet<Tuple2<K, LongValue>> triangleVertices = triangles - .flatMap(new SplitTriangles<K>()) + .flatMap(new SplitTriangles<>()) .name("Split triangle vertices"); // u, triangle count DataSet<Tuple2<K, LongValue>> vertexTriangleCount = triangleVertices .groupBy(0) - .reduce(new CountTriangles<K>()) + .reduce(new CountTriangles<>()) .setCombineHint(CombineHint.HASH) .name("Count triangles"); @@ -139,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .leftOuterJoin(vertexTriangleCount) .where(0) .equalTo(0) - .with(new JoinVertexDegreeWithTriangleCount<K>()) + .with(new JoinVertexDegreeWithTriangleCount<>()) .setParallelism(parallelism) .name("Clustering coefficient"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 2472744..3f1b00a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -83,7 +83,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { // u, v where u < v DataSet<Tuple2<K, K>> filteredByID = input .getEdges() - .flatMap(new FilterByID<K, EV>()) + .flatMap(new FilterByID<>()) .setParallelism(parallelism) .name("Filter by ID"); @@ -94,7 +94,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { // u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) DataSet<Tuple2<K, K>> filteredByDegree = pairDegree - .flatMap(new FilterByDegree<K, EV>()) + .flatMap(new FilterByDegree<>()) .setParallelism(parallelism) .name("Filter by degree"); @@ -102,7 +102,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { DataSet<Tuple3<K, K, K>> triplets = filteredByDegree .groupBy(0) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new GenerateTriplets<K>()) + .reduceGroup(new GenerateTriplets<>()) .name("Generate triplets"); // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w @@ -110,16 +110,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> { .join(filteredByID, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND) .where(1, 2) .equalTo(0, 1) - .with(new ProjectTriangles<K>()) + .with(new ProjectTriangles<>()) .name("Triangle listing"); if (permuteResults) { triangles = triangles - .flatMap(new PermuteResult<K>()) + .flatMap(new PermuteResult<>()) .name("Permute triangle vertices"); } else if (sortTriangleVertices.get()) { triangles = triangles - .map(new SortTriangleVertices<K>()) + .map(new SortTriangleVertices<>()) .name("Sort triangle vertices"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java index 6b41ee4..e59240b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java @@ -129,17 +129,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { throws Exception { DataSet<Tuple2<K, K>> edges = input .getEdges() - .map(new ExtractEdgeIDs<K, EV>()) + .map(new ExtractEdgeIDs<>()) .setParallelism(parallelism) .name("Extract edge IDs"); // ID, hub, authority DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges - .map(new InitializeScores<K>()) + .map(new InitializeScores<>()) .setParallelism(parallelism) .name("Initial scores") .groupBy(0) - .reduce(new SumScores<K>()) + .reduce(new SumScores<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); @@ -153,18 +153,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .coGroup(edges) .where(0) .equalTo(1) - .with(new Hubbiness<K>()) + .with(new Hubbiness<>()) .setParallelism(parallelism) .name("Hub") .groupBy(0) - .reduce(new SumScore<K>()) + .reduce(new SumScore<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); // sum-of-hubbiness-squared DataSet<DoubleValue> hubbinessSumSquared = hubbiness - .map(new Square<K>()) + .map(new Square<>()) .setParallelism(parallelism) .name("Square") .reduce(new Sum()) @@ -177,18 +177,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .coGroup(edges) .where(0) .equalTo(0) - .with(new Authority<K>()) + .with(new Authority<>()) .setParallelism(parallelism) .name("Authority") .groupBy(0) - .reduce(new SumScore<K>()) + .reduce(new SumScore<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); // sum-of-authority-squared DataSet<DoubleValue> authoritySumSquared = authority - .map(new Square<K>()) + .map(new Square<>()) .setParallelism(parallelism) .name("Square") .reduce(new Sum()) @@ -201,7 +201,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE) .where(0) .equalTo(0) - .with(new JoinAndNormalizeHubAndAuthority<K>()) + .with(new JoinAndNormalizeHubAndAuthority<>()) .withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED) .withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED) .setParallelism(parallelism) @@ -214,7 +214,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE) .where(0) .equalTo(0) - .with(new ChangeInScores<K>()) + .with(new ChangeInScores<>()) .setParallelism(parallelism) .name("Change in scores"); @@ -225,7 +225,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { return iterative .closeWith(passThrough) - .map(new TranslateResult<K>()) + .map(new TranslateResult<>()) .setParallelism(parallelism) .name("Map result"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java index af56e50..71c37aa 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java @@ -151,20 +151,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Edge<K, LongValue>> edgeSourceDegree = input .run(new EdgeSourceDegrees<K, VV, EV>() .setParallelism(parallelism)) - .map(new ExtractSourceDegree<K, EV>()) + .map(new ExtractSourceDegree<>()) .setParallelism(parallelism) .name("Extract source degree"); // vertices with zero in-edges DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree - .flatMap(new InitializeSourceVertices<K>()) + .flatMap(new InitializeSourceVertices<>()) .withBroadcastSet(vertexCount, VERTEX_COUNT) .setParallelism(parallelism) .name("Initialize source vertex scores"); // s, initial pagerank(s) DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree - .map(new InitializeVertexScores<K>()) + .map(new InitializeVertexScores<>()) .withBroadcastSet(vertexCount, VERTEX_COUNT) .setParallelism(parallelism) .name("Initialize scores"); @@ -178,18 +178,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .coGroup(edgeSourceDegree) .where(0) .equalTo(0) - .with(new SendScore<K>()) + .with(new SendScore<>()) .setParallelism(parallelism) .name("Send score") .groupBy(0) - .reduce(new SumScore<K>()) + .reduce(new SumScore<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum"); // ignored ID, total pagerank DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores - .reduce(new SumVertexScores<K>()) + .reduce(new SumVertexScores<>()) .setParallelism(parallelism) .name("Sum"); @@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .union(sourceVertices) .setParallelism(parallelism) .name("Union with source vertices") - .map(new AdjustScores<K>(dampingFactor)) + .map(new AdjustScores<>(dampingFactor)) .withBroadcastSet(sumOfScores, SUM_OF_SCORES) .withBroadcastSet(vertexCount, VERTEX_COUNT) .setParallelism(parallelism) @@ -211,7 +211,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .join(adjustedScores) .where(0) .equalTo(0) - .with(new ChangeInScores<K>()) + .with(new ChangeInScores<>()) .setParallelism(parallelism) .name("Change in scores"); @@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { return iterative .closeWith(passThrough) - .map(new TranslateResult<K>()) + .map(new TranslateResult<>()) .setParallelism(parallelism) .name("Map result"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java index c88e401..7294fd1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -92,15 +92,15 @@ extends GraphAnalyticBase<K, VV, EV, Result> { // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair - .flatMap(new EdgeStats<K, EV>()) + .flatMap(new EdgeStats<>()) .setParallelism(parallelism) .name("Edge stats") .groupBy(0, 1) - .reduceGroup(new ReduceEdgeStats<K>()) + .reduceGroup(new ReduceEdgeStats<>()) .setParallelism(parallelism) .name("Reduce edge stats") .groupBy(0) - .reduce(new SumEdgeStats<K>()) + .reduce(new SumEdgeStats<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum edge stats"); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java index 4c0d654..8c520e6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -103,11 +103,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> { // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair - .map(new EdgeStats<K, EV>()) + .map(new EdgeStats<>()) .setParallelism(parallelism) .name("Edge stats") .groupBy(0) - .reduce(new SumEdgeStats<K>()) + .reduce(new SumEdgeStats<>()) .setCombineHint(CombineHint.HASH) .setParallelism(parallelism) .name("Sum edge stats"); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index d761f60..752e206 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -153,7 +153,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input .run(new VertexDegree<K, VV, EV>() .setParallelism(parallelism)) - .map(new VertexInverseLogDegree<K>()) + .map(new VertexInverseLogDegree<>()) .setParallelism(parallelism) .name("Vertex score"); @@ -172,7 +172,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple4<IntValue, K, K, FloatValue>> groupSpans = sourceInverseLogDegree .groupBy(0) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new GenerateGroupSpans<K>()) + .reduceGroup(new GenerateGroupSpans<>()) .setParallelism(parallelism) .name("Generate group spans"); @@ -181,7 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .rebalance() .setParallelism(parallelism) .name("Rebalance") - .flatMap(new GenerateGroups<K>()) + .flatMap(new GenerateGroups<>()) .setParallelism(parallelism) .name("Generate groups"); @@ -189,19 +189,19 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple3<K, K, FloatValue>> twoPaths = groups .groupBy(0, 1) .sortGroup(2, Order.ASCENDING) - .reduceGroup(new GenerateGroupPairs<K>()) + .reduceGroup(new GenerateGroupPairs<>()) .name("Generate group pairs"); // t, u, adamic-adar score GroupReduceOperator<Tuple3<K, K, FloatValue>, Result<K>> scores = twoPaths .groupBy(0, 1) - .reduceGroup(new ComputeScores<K>(minimumScore, minimumRatio)) + .reduceGroup(new ComputeScores<>(minimumScore, minimumRatio)) .name("Compute scores"); if (minimumRatio > 0.0f) { // total score, number of pairs of neighbors DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree - .map(new ComputeScoreFromVertex<K>()) + .map(new ComputeScoreFromVertex<>()) .setParallelism(parallelism) .name("Average score") .sum(0) @@ -213,7 +213,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { if (mirrorResults) { return scores - .flatMap(new MirrorResult<K, Result<K>>()) + .flatMap(new MirrorResult<>()) .name("Mirror results"); } else { return scores; http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 8e820ac..92bf9e3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -199,7 +199,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = neighborDegree .groupBy(0) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new GenerateGroupSpans<K, EV>(groupSize)) + .reduceGroup(new GenerateGroupSpans<>(groupSize)) .setParallelism(parallelism) .name("Generate group spans"); @@ -208,7 +208,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .rebalance() .setParallelism(parallelism) .name("Rebalance") - .flatMap(new GenerateGroups<K>()) + .flatMap(new GenerateGroups<>()) .setParallelism(parallelism) .name("Generate groups"); @@ -216,20 +216,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { DataSet<Tuple3<K, K, IntValue>> twoPaths = groups .groupBy(0, 1) .sortGroup(2, Order.ASCENDING) - .reduceGroup(new GenerateGroupPairs<K>(groupSize)) + .reduceGroup(new GenerateGroupPairs<>(groupSize)) .name("Generate group pairs"); // t, u, intersection, union DataSet<Result<K>> scores = twoPaths .groupBy(0, 1) - .reduceGroup(new ComputeScores<K>(unboundedScores, + .reduceGroup(new ComputeScores<>(unboundedScores, minimumScoreNumerator, minimumScoreDenominator, maximumScoreNumerator, maximumScoreDenominator)) .name("Compute scores"); if (mirrorResults) { scores = scores - .flatMap(new MirrorResult<K, Result<K>>()) + .flatMap(new MirrorResult<>()) .name("Mirror results"); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java index 69fcc52..39b9bcf 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java @@ -48,7 +48,7 @@ public class VertexCentricConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSet(String name, DataSet<?> data) { - this.bcVars.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVars.add(new Tuple2<>(name, data)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java index c30b1a7..6c06a3a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java @@ -176,9 +176,9 @@ public class VertexCentricIteration<K, VV, EV, Message> DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs = iteration.getSolutionSet().join(iteration.getWorkset()) .where(0).equalTo(0) - .with(new AppendVertexState<K, VV, Message>()) - .returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>>( - vertexType, nullableMsgTypeInfo)); + .with(new AppendVertexState<>()) + .returns(new TupleTypeInfo<>( + vertexType, nullableMsgTypeInfo)); VertexComputeUdf<K, VV, EV, Message> vertexUdf = new VertexComputeUdf<>(computeFunction, intermediateTypeInfo); @@ -190,11 +190,11 @@ public class VertexCentricIteration<K, VV, EV, Message> // compute the solution set delta DataSet<Vertex<K, VV>> solutionSetDelta = superstepComputation.flatMap( - new ProjectNewVertexValue<K, VV, Message>()).returns(vertexType); + new ProjectNewVertexValue<>()).returns(vertexType); // compute the inbox of each vertex for the next superstep (new workset) DataSet<Tuple2<K, Either<NullValue, Message>>> allMessages = superstepComputation.flatMap( - new ProjectMessages<K, VV, Message>()).returns(workSetTypeInfo); + new ProjectMessages<>()).returns(workSetTypeInfo); DataSet<Tuple2<K, Either<NullValue, Message>>> newWorkSet = allMessages; http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java index 6a62847..0422f13 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java @@ -59,7 +59,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSetForScatterFunction(String name, DataSet<?> data) { - this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVarsScatter.add(new Tuple2<>(name, data)); } /** @@ -69,7 +69,7 @@ public class ScatterGatherConfiguration extends IterationConfiguration { * @param data The data set to be broadcasted. */ public void addBroadcastSetForGatherFunction(String name, DataSet<?> data) { - this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, data)); + this.bcVarsGather.add(new Tuple2<>(name, data)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 3e2ac23..8082cd9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -46,7 +46,7 @@ public class GraphUtils { */ public static <T> DataSet<LongValue> count(DataSet<T> input) { return input - .map(new MapTo<T, LongValue>(new LongValue(1))) + .map(new MapTo<>(new LongValue(1))) .returns(LONG_VALUE_TYPE_INFO) .name("Emit 1") .reduce(new AddLongValue()) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index 57aa987..b85b6e6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -48,11 +48,11 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, @Override public boolean validate(Graph<K, VV, EV> graph) throws Exception { DataSet<Tuple1<K>> edgeIds = graph.getEdges() - .flatMap(new MapEdgeIds<K, EV>()).distinct(); + .flatMap(new MapEdgeIds<>()).distinct(); DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0) - .equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1); + .equalTo(0).with(new GroupInvalidIds<>()).first(1); - return invalidIds.map(new KToTupleMap<K>()).count() == 0; + return invalidIds.map(new KToTupleMap<>()).count() == 0; } private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java index f89d4f5..1afb5da 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java @@ -120,7 +120,7 @@ public class AsmTestBase { return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), vertexCount, edgeCount) .generate() - .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify<>()); } /** @@ -149,6 +149,6 @@ public class AsmTestBase { return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), vertexCount, edgeCount) .generate() - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<>(false)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java index 22b47fe..63bf133 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java @@ -53,7 +53,7 @@ extends AsmTestBase { "(5,3,((null),(1,1,0),(4,2,2)))"; DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedSimpleGraph - .run(new EdgeDegreesPair<IntValue, NullValue, NullValue>()); + .run(new EdgeDegreesPair<>()); TestBaseUtils.compareResultAsText(degreesPair.collect(), expectedResult); } @@ -62,7 +62,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph(10, 16) - .run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()); + .run(new EdgeDegreesPair<>()); Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>>() .run(degreesPair) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java index f0d51d2..967cfb2 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java @@ -53,7 +53,7 @@ extends AsmTestBase { "(5,3,((null),(1,1,0)))"; DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedSimpleGraph - .run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>()); + .run(new EdgeSourceDegrees<>()); TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult); } @@ -62,7 +62,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph(10, 16) - .run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()); + .run(new EdgeSourceDegrees<>()); Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>() .run(sourceDegrees) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java index 6d58bb0..abb76c4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java @@ -53,7 +53,7 @@ extends AsmTestBase { "(5,3,((null),(4,2,2)))"; DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedSimpleGraph - .run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>()); + .run(new EdgeTargetDegrees<>()); TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult); } @@ -62,7 +62,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph(10, 16) - .run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()); + .run(new EdgeTargetDegrees<>()); Checksum checksum = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, Degrees>>>() .run(targetDegrees) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java index 5214282..91f354f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java @@ -43,7 +43,7 @@ extends AsmTestBase { public void testWithSimpleDirectedGraph() throws Exception { DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph - .run(new VertexDegrees<IntValue, NullValue, NullValue>()); + .run(new VertexDegrees<>()); String expectedResult = "(0,(2,2,0))\n" + @@ -60,7 +60,7 @@ extends AsmTestBase { public void testWithSimpleUndirectedGraph() throws Exception { DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph - .run(new VertexDegrees<IntValue, NullValue, NullValue>()); + .run(new VertexDegrees<>()); String expectedResult = "(0,(2,2,2))\n" + @@ -100,7 +100,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph(10, 16) - .run(new VertexDegrees<LongValue, NullValue, NullValue>()); + .run(new VertexDegrees<>()); Checksum checksum = new ChecksumHashCode<Vertex<LongValue, Degrees>>() .run(degrees) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java index 5f492e4..1cae2e7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java @@ -59,7 +59,7 @@ extends AsmTestBase { "(5,3,((null),1,4))"; DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph - .run(new EdgeDegreePair<IntValue, NullValue, NullValue>()); + .run(new EdgeDegreePair<>()); TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult); @@ -74,7 +74,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16) - .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()); + .run(new EdgeDegreePair<>()); Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>() .run(degreePairOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java index 393220d..2d8b2e3 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java @@ -59,7 +59,7 @@ extends AsmTestBase { "(5,3,((null),1))"; DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph - .run(new EdgeSourceDegree<IntValue, NullValue, NullValue>()); + .run(new EdgeSourceDegree<>()); TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult); @@ -74,7 +74,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph(10, 16) - .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()); + .run(new EdgeSourceDegree<>()); Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>() .run(sourceDegreeOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java index 782296a..a7c88a1 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java @@ -59,7 +59,7 @@ extends AsmTestBase { "(5,3,((null),4))"; DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph - .run(new EdgeTargetDegree<IntValue, NullValue, NullValue>()); + .run(new EdgeTargetDegree<>()); TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult); @@ -74,7 +74,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph(10, 16) - .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()); + .run(new EdgeSourceDegree<>()); Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>() .run(targetDegreeOnTargetId) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java index 192782d..bc76bff 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java @@ -50,7 +50,7 @@ extends AsmTestBase { "(5,1)"; DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph - .run(new VertexDegree<IntValue, NullValue, NullValue>()); + .run(new VertexDegree<>()); TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult); @@ -67,7 +67,7 @@ extends AsmTestBase { long expectedDegree = completeGraphVertexCount - 1; DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph - .run(new VertexDegree<LongValue, NullValue, NullValue>()); + .run(new VertexDegree<>()); for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) { assertEquals(expectedDegree, vertex.getValue().getValue()); @@ -109,7 +109,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph(10, 16) - .run(new VertexDegree<LongValue, NullValue, NullValue>()); + .run(new VertexDegree<>()); Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>() .run(degreeOnSourceId) http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java index f03d82c..51e7712 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java @@ -24,7 +24,6 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.metric.ChecksumHashCode; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.junit.Test; @@ -41,7 +40,7 @@ extends AsmTestBase { public void testWithSimpleGraph() throws Exception { Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph - .run(new MaximumDegree<IntValue, NullValue, NullValue>(3)); + .run(new MaximumDegree<>(3)); String expectedVerticesResult = "(0,(null))\n" + @@ -67,8 +66,8 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { Checksum checksum = undirectedRMatGraph(10, 16) - .run(new MaximumDegree<LongValue, NullValue, NullValue>(16)) - .run(new ChecksumHashCode<LongValue, NullValue, NullValue>()) + .run(new MaximumDegree<>(16)) + .run(new ChecksumHashCode<>()) .execute(); assertEquals(805, checksum.getCount()); http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java index a3aad4b..751d030 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java @@ -70,7 +70,7 @@ public class SimplifyTest { "(1,0,(null))"; Graph<IntValue, NullValue, NullValue> simpleGraph = graph - .run(new Simplify<IntValue, NullValue, NullValue>()); + .run(new Simplify<>()); TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult); } http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java index 6ff4292..68b4e0c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java @@ -71,7 +71,7 @@ public class SimplifyTest { "(2,0,(null))"; Graph<IntValue, NullValue, NullValue> simpleGraph = graph - .run(new Simplify<IntValue, NullValue, NullValue>(false)); + .run(new Simplify<>(false)); TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult); } @@ -84,7 +84,7 @@ public class SimplifyTest { "(1,0,(null))"; Graph<IntValue, NullValue, NullValue> simpleGraph = graph - .run(new Simplify<IntValue, NullValue, NullValue>(true)); + .run(new Simplify<>(true)); TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult); }