http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 57045a1..db87eb9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -52,7 +52,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, Degrees>>> {
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(1)
                        .equalTo(0)
-                       .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+                       .with(new JoinEdgeWithVertexDegree<>())
                                .setParallelism(parallelism)
                                .name("Edge target degrees");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 06a7fd2..a703789 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -93,18 +93,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
                        throws Exception {
                // s, t, bitmask
                DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = 
input.getEdges()
-                       .flatMap(new EmitAndFlipEdge<K, EV>())
+                       .flatMap(new EmitAndFlipEdge<>())
                                .setParallelism(parallelism)
                                .name("Emit and flip edge")
                        .groupBy(0, 1)
-                       .reduceGroup(new ReduceBitmask<K>())
+                       .reduceGroup(new ReduceBitmask<>())
                                .setParallelism(parallelism)
                                .name("Reduce bitmask");
 
                // s, d(s)
                DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
                        .groupBy(0)
-                       .reduceGroup(new DegreeCount<K>())
+                       .reduceGroup(new DegreeCount<>())
                                .setParallelism(parallelism)
                                .name("Degree count");
 
@@ -113,7 +113,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
Degrees>> {
                                .leftOuterJoin(vertexDegrees)
                                .where(0)
                                .equalTo(0)
-                               .with(new JoinVertexWithVertexDegrees<K, VV>())
+                               .with(new JoinVertexWithVertexDegrees<>())
                                        .setParallelism(parallelism)
                                        .name("Zero degree vertices");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index dc071cf..38c7995 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                // t
                DataSet<Vertex<K, LongValue>> targetIds = input
                        .getEdges()
-                       .map(new MapEdgeToTargetId<K, EV>())
+                       .map(new MapEdgeToTargetId<>())
                                .setParallelism(parallelism)
                                .name("Edge to target ID");
 
                // t, d(t)
                DataSet<Vertex<K, LongValue>> targetDegree = targetIds
                        .groupBy(0)
-                       .reduce(new DegreeCount<K>())
+                       .reduce(new DegreeCount<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
@@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .leftOuterJoin(targetDegree)
                                .where(0)
                                .equalTo(0)
-                               .with(new JoinVertexWithVertexDegree<K, VV>())
+                               .with(new JoinVertexWithVertexDegree<>())
                                        .setParallelism(parallelism)
                                        .name("Zero degree vertices");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 4a4689b..ef9c781 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -84,14 +84,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                // s
                DataSet<Vertex<K, LongValue>> sourceIds = input
                        .getEdges()
-                       .map(new MapEdgeToSourceId<K, EV>())
+                       .map(new MapEdgeToSourceId<>())
                                .setParallelism(parallelism)
                                .name("Edge to source ID");
 
                // s, d(s)
                DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
                        .groupBy(0)
-                       .reduce(new DegreeCount<K>())
+                       .reduce(new DegreeCount<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
@@ -101,7 +101,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .leftOuterJoin(sourceDegree)
                                .where(0)
                                .equalTo(0)
-                               .with(new JoinVertexWithVertexDegree<K, VV>())
+                               .with(new JoinVertexWithVertexDegree<>())
                                        .setParallelism(parallelism)
                                        .name("Zero degree vertices");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index ff4285f..6825295 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -89,7 +89,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple3<EV, LongValue, L
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(1)
                        .equalTo(0)
-                       .with(new JoinEdgeDegreeWithVertexDegree<K, EV, 
LongValue>())
+                       .with(new JoinEdgeDegreeWithVertexDegree<>())
                                .setParallelism(parallelism)
                                .name("Edge target degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index bd8ce3d..3fe05d9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>>>
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(0)
                        .equalTo(0)
-                       .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
+                       .with(new JoinEdgeWithVertexDegree<>())
                                .setParallelism(parallelism)
                                .name("Edge source degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index cb18d2c..6020ba3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, 
Tuple2<EV, LongValue>>>
                        .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
                        .where(1)
                        .equalTo(0)
-                       .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
+                       .with(new JoinEdgeWithVertexDegree<>())
                                .setParallelism(parallelism)
                                .name("Edge target degree");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index d2fad18..fee58a3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -103,7 +103,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
        public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = 
reduceOnTargetId.get() ?
-                       new MapEdgeToTargetId<K, EV>() : new 
MapEdgeToSourceId<K, EV>();
+                       new MapEdgeToTargetId<>() : new MapEdgeToSourceId<>();
 
                // v
                DataSet<Vertex<K, LongValue>> vertexIds = input
@@ -115,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                // v, deg(v)
                DataSet<Vertex<K, LongValue>> degree = vertexIds
                        .groupBy(0)
-                       .reduce(new DegreeCount<K>())
+                       .reduce(new DegreeCount<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Degree count");
@@ -126,7 +126,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, 
LongValue>> {
                                .leftOuterJoin(degree)
                                .where(0)
                                .equalTo(0)
-                               .with(new JoinVertexWithVertexDegree<K, VV>())
+                               .with(new JoinVertexWithVertexDegree<>())
                                        .setParallelism(parallelism)
                                        .name("Zero degree vertices");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index 522d39c..41dc64b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -138,7 +138,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
                // u, d(u) if d(u) > maximumDegree
                DataSet<Tuple1<K>> highDegreeVertices = vertexDegree
-                       .flatMap(new DegreeFilter<K>(maximumDegree))
+                       .flatMap(new DegreeFilter<>(maximumDegree))
                                .setParallelism(parallelism)
                                .name("Filter high-degree vertices");
 
@@ -150,7 +150,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
                        .leftOuterJoin(highDegreeVertices, joinHint)
                        .where(0)
                        .equalTo(0)
-                       .with(new ProjectVertex<K, VV>())
+                       .with(new ProjectVertex<>())
                                .setParallelism(parallelism)
                                .name("Project low-degree vertices");
 
@@ -160,13 +160,13 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> 
{
                        .leftOuterJoin(highDegreeVertices, joinHint)
                        .where(reduceOnTargetId.get() ? 1 : 0)
                        .equalTo(0)
-                               .with(new ProjectEdge<K, EV>())
+                               .with(new ProjectEdge<>())
                                .setParallelism(parallelism)
                                .name("Project low-degree edges by " + 
(reduceOnTargetId.get() ? "target" : "source"))
                        .leftOuterJoin(highDegreeVertices, joinHint)
                        .where(reduceOnTargetId.get() ? 0 : 1)
                        .equalTo(0)
-                       .with(new ProjectEdge<K, EV>())
+                       .with(new ProjectEdge<>())
                                .setParallelism(parallelism)
                                .name("Project low-degree edges by " + 
(reduceOnTargetId.get() ? "source" : "target"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 511840a..0d4fa1e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -41,7 +41,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
                // Edges
                DataSet<Edge<K, EV>> edges = input
                        .getEdges()
-                       .filter(new RemoveSelfLoops<K, EV>())
+                       .filter(new RemoveSelfLoops<>())
                                .setParallelism(parallelism)
                                .name("Remove self-loops")
                        .distinct(0, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 21db233..f00a162 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -71,7 +71,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
                // Edges
                DataSet<Edge<K, EV>> edges = input
                        .getEdges()
-                       .flatMap(new SymmetrizeAndRemoveSelfLoops<K, 
EV>(clipAndFlip))
+                       .flatMap(new 
SymmetrizeAndRemoveSelfLoops<>(clipAndFlip))
                                .setParallelism(parallelism)
                                .name("Remove self-loops")
                        .distinct(0, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 4cb4e01..6dcf766 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -88,7 +88,7 @@ public class Translate {
                TupleTypeInfo<Vertex<NEW, VV>> returnType = new 
TupleTypeInfo<>(vertexClass, newType, vertexValueType);
 
                return vertices
-                       .map(new TranslateVertexId<OLD, NEW, VV>(translator))
+                       .map(new TranslateVertexId<>(translator))
                        .returns(returnType)
                                .setParallelism(parallelism)
                                .name("Translate vertex IDs");
@@ -172,7 +172,7 @@ public class Translate {
                TupleTypeInfo<Edge<NEW, EV>> returnType = new 
TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
 
                return edges
-                       .map(new TranslateEdgeId<OLD, NEW, EV>(translator))
+                       .map(new TranslateEdgeId<>(translator))
                        .returns(returnType)
                                .setParallelism(parallelism)
                                .name("Translate edge IDs");
@@ -257,7 +257,7 @@ public class Translate {
                TupleTypeInfo<Vertex<K, NEW>> returnType = new 
TupleTypeInfo<>(vertexClass, idType, newType);
 
                return vertices
-                       .map(new TranslateVertexValue<K, OLD, NEW>(translator))
+                       .map(new TranslateVertexValue<>(translator))
                        .returns(returnType)
                                .setParallelism(parallelism)
                                .name("Translate vertex values");
@@ -341,7 +341,7 @@ public class Translate {
                TupleTypeInfo<Edge<K, NEW>> returnType = new 
TupleTypeInfo<>(edgeClass, idType, idType, newType);
 
                return edges
-                       .map(new TranslateEdgeValue<K, OLD, NEW>(translator))
+                       .map(new TranslateEdgeValue<>(translator))
                        .returns(returnType)
                                .setParallelism(parallelism)
                                .name("Translate edge values");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
index 029e2c4..97c93e2 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java
@@ -128,7 +128,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
                DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges = edges.join(edges)
                        .where(1)
                        .equalTo(1)
-                       .with(new ProjectionTopSimple<KT, KB, EV>())
+                       .with(new ProjectionTopSimple<>())
                                .name("Simple top projection");
 
                return Graph.fromDataSet(topVertices, newEdges, context);
@@ -172,7 +172,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
                DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
                        .where(0)
                        .equalTo(0)
-                       .with(new ProjectionBottomSimple<KT, KB, EV>())
+                       .with(new ProjectionBottomSimple<>())
                        .name("Simple bottom projection");
 
                return Graph.fromDataSet(bottomVertices, newEdges, context);
@@ -218,7 +218,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
                DataSet<Edge<KT, Projection<KB, VVB, VVT, EV>>> newEdges = 
edgesWithVertices.join(edgesWithVertices)
                        .where(1)
                        .equalTo(1)
-                       .with(new ProjectionTopFull<KT, KB, EV, VVT, VVB>())
+                       .with(new ProjectionTopFull<>())
                                .name("Full top projection");
 
                return Graph.fromDataSet(topVertices, newEdges, context);
@@ -284,7 +284,7 @@ public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
                DataSet<Edge<KB, Projection<KT, VVT, VVB, EV>>> newEdges = 
edgesWithVertices.join(edgesWithVertices)
                        .where(0)
                        .equalTo(0)
-                       .with(new ProjectionBottomFull<KT, KB, EV, VVT, VVB>())
+                       .with(new ProjectionBottomFull<>())
                                .name("Full bottom projection");
 
                return Graph.fromDataSet(bottomVertices, newEdges, context);

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index fca9d8b..d5a70f3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -91,7 +91,7 @@ public class GraphGeneratorUtils {
         */
        public static <K, EV> DataSet<Vertex<K, NullValue>> 
vertexSet(DataSet<Edge<K, EV>> edges, int parallelism) {
                DataSet<Vertex<K, NullValue>> vertexSet = edges
-                       .flatMap(new EmitSrcAndTarget<K, EV>())
+                       .flatMap(new EmitSrcAndTarget<>())
                                .setParallelism(parallelism)
                                .name("Emit source and target labels");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index d14d32c..1960aa3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -156,7 +156,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> 
{
                        .rebalance()
                                .setParallelism(parallelism)
                                .name("Rebalance")
-                       .flatMap(new GenerateEdges<T>(vertexCount, scale, a, b, 
c, noiseEnabled, noise))
+                       .flatMap(new GenerateEdges<>(vertexCount, scale, a, b, 
c, noiseEnabled, noise))
                                .setParallelism(parallelism)
                                .name("RMat graph edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index 72e18ae..f09a890 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -58,7 +58,7 @@ public class GSAConfiguration extends IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSetForGatherFunction(String name, DataSet<?> 
data) {
-               this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, 
data));
+               this.bcVarsGather.add(new Tuple2<>(name, data));
        }
 
        /**
@@ -68,7 +68,7 @@ public class GSAConfiguration extends IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSetForSumFunction(String name, DataSet<?> data) 
{
-               this.bcVarsSum.add(new Tuple2<String, DataSet<?>>(name, data));
+               this.bcVarsSum.add(new Tuple2<>(name, data));
        }
 
        /**
@@ -78,7 +78,7 @@ public class GSAConfiguration extends IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSetForApplyFunction(String name, DataSet<?> 
data) {
-               this.bcVarsApply.add(new Tuple2<String, DataSet<?>>(name, 
data));
+               this.bcVarsApply.add(new Tuple2<>(name, data));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 12d4977..5f04b70 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -169,24 +169,24 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                        case OUT:
                                neighbors = iteration
                                .getWorkset().join(edgeDataSet)
-                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>());
+                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<>());
                                break;
                        case IN:
                                neighbors = iteration
                                .getWorkset().join(edgeDataSet)
-                               .where(0).equalTo(1).with(new 
ProjectKeyWithNeighborIN<K, VV, EV>());
+                               .where(0).equalTo(1).with(new 
ProjectKeyWithNeighborIN<>());
                                break;
                        case ALL:
                                neighbors =  iteration
                                                .getWorkset().join(edgeDataSet)
-                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<>()).union(iteration
                                                                
.getWorkset().join(edgeDataSet)
-                                                               
.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+                                                               
.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<>()));
                                break;
                        default:
                                neighbors = iteration
                                                .getWorkset().join(edgeDataSet)
-                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>());
+                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<>());
                                break;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 9846286..ccf2bb1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -73,14 +73,14 @@ public class CommunityDetection<K> implements 
GraphAlgorithm<K, Long, Double, Gr
        public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
                DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = 
graph.getVertices()
-                       .map(new AddScoreToVertexValuesMapper<K>());
+                       .map(new AddScoreToVertexValuesMapper<>());
 
                Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
                        Graph.fromDataSet(initializedVertices, 
graph.getEdges(), graph.getContext()).getUndirected();
 
-               return graphWithScoredVertices.runScatterGatherIteration(new 
LabelMessenger<K>(),
-                       new VertexLabelUpdater<K>(delta), maxIterations)
-                               .mapVertices(new 
RemoveScoreFromVertexValuesMapper<K>());
+               return graphWithScoredVertices.runScatterGatherIteration(new 
LabelMessenger<>(),
+                       new VertexLabelUpdater<>(delta), maxIterations)
+                               .mapVertices(new 
RemoveScoreFromVertexValuesMapper<>());
        }
 
        @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index a3110ab..5cb8abe 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -73,12 +72,12 @@ public class ConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) 
graph.getVertices().getType()).getTypeAt(1);
 
                Graph<K, VV, NullValue> undirectedGraph = graph
-                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
+                       .mapEdges(new MapTo<>(NullValue.getInstance()))
                        .getUndirected();
 
                return undirectedGraph.runScatterGatherIteration(
-                       new CCMessenger<K, VV>(valueTypeInfo),
-                       new CCUpdater<K, VV>(),
+                       new CCMessenger<>(valueTypeInfo),
+                       new CCUpdater<>(),
                        maxIterations).getVertices();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 37e5cab..230f88e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -74,13 +73,13 @@ public class GSAConnectedComponents<K, VV extends 
Comparable<VV>, EV>
                TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) 
graph.getVertices().getType()).getTypeAt(1);
 
                Graph<K, VV, NullValue> undirectedGraph = graph
-                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
+                       .mapEdges(new MapTo<>(NullValue.getInstance()))
                        .getUndirected();
 
                return undirectedGraph.runGatherSumApplyIteration(
                        new GatherNeighborIds<>(valueTypeInfo),
                        new SelectMinId<>(valueTypeInfo),
-                       new UpdateComponentId<K, VV>(valueTypeInfo),
+                       new UpdateComponentId<>(valueTypeInfo),
                        maxIterations).getVertices();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 2d0b8da..28e9168 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -51,9 +51,9 @@ public class GSASingleSourceShortestPaths<K, VV> implements
        @Override
        public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) {
 
-               return input.mapVertices(new InitVerticesMapper<K, 
VV>(srcVertexId))
+               return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
                                .runGatherSumApplyIteration(new 
CalculateDistances(), new ChooseMinDistance(),
-                                               new UpdateDistance<K>(), 
maxIterations)
+                                       new UpdateDistance<>(), maxIterations)
                                                .getVertices();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 1e700f4..880a67b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -77,9 +76,9 @@ public class LabelPropagation<K, VV extends Comparable<VV>, 
EV>
                TypeInformation<VV> valueType = ((TupleTypeInfo<?>) 
input.getVertices().getType()).getTypeAt(1);
                // iteratively adopt the most frequent label among the 
neighbors of each vertex
                return input
-                       .mapEdges(new MapTo<Edge<K, EV>, 
NullValue>(NullValue.getInstance()))
+                       .mapEdges(new MapTo<>(NullValue.getInstance()))
                        .runScatterGatherIteration(
-                               new SendNewLabelToNeighbors<K, VV>(valueType), 
new UpdateVertexLabel<K, VV>(), maxIterations)
+                               new SendNewLabelToNeighbors<>(valueType), new 
UpdateVertexLabel<>(), maxIterations)
                        .getVertices();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 15f0a84..8f41fa0 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -51,8 +51,8 @@ public class SingleSourceShortestPaths<K, VV> implements 
GraphAlgorithm<K, VV, D
        @Override
        public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> input) {
 
-               return input.mapVertices(new InitVerticesMapper<K, 
VV>(srcVertexId))
-                               .runScatterGatherIteration(new 
MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
+               return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
+                               .runScatterGatherIteration(new 
MinDistanceMessenger<>(), new VertexDistanceUpdater<>(),
                                maxIterations).getVertices();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 44ea988..a1498df 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -102,11 +102,11 @@ public class Summarization<K, VV, EV>
                // group vertices by value and create vertex group items
                DataSet<VertexGroupItem<K, VV>> vertexGroupItems = 
input.getVertices()
                                .groupBy(1)
-                               .reduceGroup(new VertexGroupReducer<K, VV>());
+                               .reduceGroup(new VertexGroupReducer<>());
                // create super vertices
                DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = 
vertexGroupItems
-                               .filter(new 
VertexGroupItemToSummarizedVertexFilter<K, VV>())
-                               .map(new 
VertexGroupItemToSummarizedVertexMapper<K, VV>());
+                               .filter(new 
VertexGroupItemToSummarizedVertexFilter<>())
+                               .map(new 
VertexGroupItemToSummarizedVertexMapper<>());
 
                // -------------------------
                // build super edges
@@ -114,22 +114,22 @@ public class Summarization<K, VV, EV>
 
                // create mapping between vertices and their representative
                DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap 
= vertexGroupItems
-                       .filter(new VertexGroupItemToRepresentativeFilter<K, 
VV>())
-                       .map(new 
VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+                       .filter(new VertexGroupItemToRepresentativeFilter<>())
+                       .map(new 
VertexGroupItemToVertexWithRepresentativeMapper<>());
                // join edges with vertex representatives and update source and 
target identifiers
                DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
                                .join(vertexToRepresentativeMap)
                                .where(0)       // source vertex id
                                .equalTo(0) // vertex id
-                               .with(new SourceVertexJoinFunction<K, EV>())
+                               .with(new SourceVertexJoinFunction<>())
                                .join(vertexToRepresentativeMap)
                                .where(1)       // target vertex id
                                .equalTo(0) // vertex id
-                               .with(new TargetVertexJoinFunction<K, EV>());
+                               .with(new TargetVertexJoinFunction<>());
                // create super edges
                DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = 
edgesForGrouping
                                .groupBy(0, 1, 2) // group by source id (0), 
target id (1) and edge value (2)
-                               .reduceGroup(new EdgeGroupReducer<K, EV>());
+                               .reduceGroup(new EdgeGroupReducer<>());
 
                return Graph.fromDataSet(summarizedVertices, summarizedEdges, 
input.getContext());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 2ae6120..23f942c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -65,21 +65,21 @@ public class TriangleEnumerator<K extends Comparable<K>, 
VV, EV> implements
                DataSet<Edge<K, EV>> edges = input.getEdges();
 
                // annotate edges with degrees
-               DataSet<EdgeWithDegrees<K>> edgesWithDegrees = 
edges.flatMap(new EdgeDuplicator<K, EV>())
-                               .groupBy(0).sortGroup(1, 
Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
-                               .groupBy(EdgeWithDegrees.V1, 
EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
+               DataSet<EdgeWithDegrees<K>> edgesWithDegrees = 
edges.flatMap(new EdgeDuplicator<>())
+                               .groupBy(0).sortGroup(1, 
Order.ASCENDING).reduceGroup(new DegreeCounter<>())
+                               .groupBy(EdgeWithDegrees.V1, 
EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
 
                // project edges by degrees
-               DataSet<Edge<K, NullValue>> edgesByDegree = 
edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
+               DataSet<Edge<K, NullValue>> edgesByDegree = 
edgesWithDegrees.map(new EdgeByDegreeProjector<>());
                // project edges by vertex id
-               DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new 
EdgeByIdProjector<K>());
+               DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new 
EdgeByIdProjector<>());
 
                DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
                                // build triads
                                
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
-                               .reduceGroup(new TriadBuilder<K>())
+                               .reduceGroup(new TriadBuilder<>())
                                // filter triads
-                               .join(edgesById, 
JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 
1).with(new TriadFilter<K>());
+                               .join(edgesById, 
JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 
1).with(new TriadFilter<>());
 
                return triangles;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 981110f..55d3056 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -119,13 +119,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
 
                // u, edge count
                DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
-                       .flatMap(new SplitTriangles<K>())
+                       .flatMap(new SplitTriangles<>())
                                .name("Split triangle vertices");
 
                // u, triangle count
                DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
                        .groupBy(0)
-                       .reduce(new CountTriangles<K>())
+                       .reduce(new CountTriangles<>())
                        .setCombineHint(CombineHint.HASH)
                                .name("Count triangles");
 
@@ -140,7 +140,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .leftOuterJoin(vertexTriangleCount)
                        .where(0)
                        .equalTo(0)
-                       .with(new JoinVertexDegreeWithTriangleCount<K>())
+                       .with(new JoinVertexDegreeWithTriangleCount<>())
                                .setParallelism(parallelism)
                                .name("Clustering coefficient");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 00b2210..52d3c10 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -84,11 +84,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                // u, v, bitmask where u < v
                DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
                        .getEdges()
-                       .map(new OrderByID<K, EV>())
+                       .map(new OrderByID<>())
                                .setParallelism(parallelism)
                                .name("Order by ID")
                        .groupBy(0, 1)
-                       .reduceGroup(new ReduceBitmask<K>())
+                       .reduceGroup(new ReduceBitmask<>())
                                .setParallelism(parallelism)
                                .name("Flatten by ID");
 
@@ -99,11 +99,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 
                // u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and 
u < v)
                DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
-                       .map(new OrderByDegree<K, EV>())
+                       .map(new OrderByDegree<>())
                                .setParallelism(parallelism)
                                .name("Order by degree")
                        .groupBy(0, 1)
-                       .reduceGroup(new ReduceBitmask<K>())
+                       .reduceGroup(new ReduceBitmask<>())
                                .setParallelism(parallelism)
                                .name("Flatten by degree");
 
@@ -111,7 +111,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                DataSet<Tuple4<K, K, K, ByteValue>> triplets = filteredByDegree
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
-                       .reduceGroup(new GenerateTriplets<K>())
+                       .reduceGroup(new GenerateTriplets<>())
                                .name("Generate triplets");
 
                // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges 
in graph
@@ -119,16 +119,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                        .join(filteredByID, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
                        .where(1, 2)
                        .equalTo(0, 1)
-                       .with(new ProjectTriangles<K>())
+                       .with(new ProjectTriangles<>())
                                .name("Triangle listing");
 
                if (permuteResults) {
                        triangles = triangles
-                               .flatMap(new PermuteResult<K>())
+                               .flatMap(new PermuteResult<>())
                                        .name("Permute triangle vertices");
                } else if (sortTriangleVertices.get()) {
                        triangles = triangles
-                               .map(new SortTriangleVertices<K>())
+                               .map(new SortTriangleVertices<>())
                                        .name("Sort triangle vertices");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 0beb989..e1c7655 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -118,13 +118,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
 
                // u, 1
                DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
-                       .flatMap(new SplitTriangles<K>())
+                       .flatMap(new SplitTriangles<>())
                                .name("Split triangle vertices");
 
                // u, triangle count
                DataSet<Tuple2<K, LongValue>> vertexTriangleCount = 
triangleVertices
                        .groupBy(0)
-                       .reduce(new CountTriangles<K>())
+                       .reduce(new CountTriangles<>())
                        .setCombineHint(CombineHint.HASH)
                                .name("Count triangles");
 
@@ -139,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .leftOuterJoin(vertexTriangleCount)
                        .where(0)
                        .equalTo(0)
-                       .with(new JoinVertexDegreeWithTriangleCount<K>())
+                       .with(new JoinVertexDegreeWithTriangleCount<>())
                                .setParallelism(parallelism)
                                .name("Clustering coefficient");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 2472744..3f1b00a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -83,7 +83,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                // u, v where u < v
                DataSet<Tuple2<K, K>> filteredByID = input
                        .getEdges()
-                       .flatMap(new FilterByID<K, EV>())
+                       .flatMap(new FilterByID<>())
                                .setParallelism(parallelism)
                                .name("Filter by ID");
 
@@ -94,7 +94,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 
                // u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
                DataSet<Tuple2<K, K>> filteredByDegree = pairDegree
-                       .flatMap(new FilterByDegree<K, EV>())
+                       .flatMap(new FilterByDegree<>())
                                .setParallelism(parallelism)
                                .name("Filter by degree");
 
@@ -102,7 +102,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                DataSet<Tuple3<K, K, K>> triplets = filteredByDegree
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
-                       .reduceGroup(new GenerateTriplets<K>())
+                       .reduceGroup(new GenerateTriplets<>())
                                .name("Generate triplets");
 
                // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, 
v < w
@@ -110,16 +110,16 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
                        .join(filteredByID, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND)
                        .where(1, 2)
                        .equalTo(0, 1)
-                       .with(new ProjectTriangles<K>())
+                       .with(new ProjectTriangles<>())
                                .name("Triangle listing");
 
                if (permuteResults) {
                        triangles = triangles
-                               .flatMap(new PermuteResult<K>())
+                               .flatMap(new PermuteResult<>())
                                        .name("Permute triangle vertices");
                } else if (sortTriangleVertices.get()) {
                        triangles = triangles
-                               .map(new SortTriangleVertices<K>())
+                               .map(new SortTriangleVertices<>())
                                        .name("Sort triangle vertices");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index 6b41ee4..e59240b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -129,17 +129,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        throws Exception {
                DataSet<Tuple2<K, K>> edges = input
                        .getEdges()
-                       .map(new ExtractEdgeIDs<K, EV>())
+                       .map(new ExtractEdgeIDs<>())
                                .setParallelism(parallelism)
                                .name("Extract edge IDs");
 
                // ID, hub, authority
                DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = 
edges
-                       .map(new InitializeScores<K>())
+                       .map(new InitializeScores<>())
                                .setParallelism(parallelism)
                                .name("Initial scores")
                        .groupBy(0)
-                       .reduce(new SumScores<K>())
+                       .reduce(new SumScores<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
@@ -153,18 +153,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        .coGroup(edges)
                        .where(0)
                        .equalTo(1)
-                       .with(new Hubbiness<K>())
+                       .with(new Hubbiness<>())
                                .setParallelism(parallelism)
                                .name("Hub")
                        .groupBy(0)
-                       .reduce(new SumScore<K>())
+                       .reduce(new SumScore<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
                // sum-of-hubbiness-squared
                DataSet<DoubleValue> hubbinessSumSquared = hubbiness
-                       .map(new Square<K>())
+                       .map(new Square<>())
                                .setParallelism(parallelism)
                                .name("Square")
                        .reduce(new Sum())
@@ -177,18 +177,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        .coGroup(edges)
                        .where(0)
                        .equalTo(0)
-                       .with(new Authority<K>())
+                       .with(new Authority<>())
                                .setParallelism(parallelism)
                                .name("Authority")
                        .groupBy(0)
-                       .reduce(new SumScore<K>())
+                       .reduce(new SumScore<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
                // sum-of-authority-squared
                DataSet<DoubleValue> authoritySumSquared = authority
-                       .map(new Square<K>())
+                       .map(new Square<>())
                                .setParallelism(parallelism)
                                .name("Square")
                        .reduce(new Sum())
@@ -201,7 +201,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .fullOuterJoin(authority, 
JoinHint.REPARTITION_SORT_MERGE)
                        .where(0)
                        .equalTo(0)
-                       .with(new JoinAndNormalizeHubAndAuthority<K>())
+                       .with(new JoinAndNormalizeHubAndAuthority<>())
                        .withBroadcastSet(hubbinessSumSquared, 
HUBBINESS_SUM_SQUARED)
                        .withBroadcastSet(authoritySumSquared, 
AUTHORITY_SUM_SQUARED)
                                .setParallelism(parallelism)
@@ -214,7 +214,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .fullOuterJoin(scores, 
JoinHint.REPARTITION_SORT_MERGE)
                                .where(0)
                                .equalTo(0)
-                               .with(new ChangeInScores<K>())
+                               .with(new ChangeInScores<>())
                                        .setParallelism(parallelism)
                                        .name("Change in scores");
 
@@ -225,7 +225,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                return iterative
                        .closeWith(passThrough)
-                       .map(new TranslateResult<K>())
+                       .map(new TranslateResult<>())
                                .setParallelism(parallelism)
                                .name("Map result");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index af56e50..71c37aa 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -151,20 +151,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                DataSet<Edge<K, LongValue>> edgeSourceDegree = input
                        .run(new EdgeSourceDegrees<K, VV, EV>()
                                .setParallelism(parallelism))
-                       .map(new ExtractSourceDegree<K, EV>())
+                       .map(new ExtractSourceDegree<>())
                                .setParallelism(parallelism)
                                .name("Extract source degree");
 
                // vertices with zero in-edges
                DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
-                       .flatMap(new InitializeSourceVertices<K>())
+                       .flatMap(new InitializeSourceVertices<>())
                        .withBroadcastSet(vertexCount, VERTEX_COUNT)
                                .setParallelism(parallelism)
                                .name("Initialize source vertex scores");
 
                // s, initial pagerank(s)
                DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
-                       .map(new InitializeVertexScores<K>())
+                       .map(new InitializeVertexScores<>())
                        .withBroadcastSet(vertexCount, VERTEX_COUNT)
                                .setParallelism(parallelism)
                                .name("Initialize scores");
@@ -178,18 +178,18 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        .coGroup(edgeSourceDegree)
                        .where(0)
                        .equalTo(0)
-                       .with(new SendScore<K>())
+                       .with(new SendScore<>())
                                .setParallelism(parallelism)
                                .name("Send score")
                        .groupBy(0)
-                       .reduce(new SumScore<K>())
+                       .reduce(new SumScore<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum");
 
                // ignored ID, total pagerank
                DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
-                       .reduce(new SumVertexScores<K>())
+                       .reduce(new SumVertexScores<>())
                                .setParallelism(parallelism)
                                .name("Sum");
 
@@ -198,7 +198,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .union(sourceVertices)
                                .setParallelism(parallelism)
                                .name("Union with source vertices")
-                       .map(new AdjustScores<K>(dampingFactor))
+                       .map(new AdjustScores<>(dampingFactor))
                                .withBroadcastSet(sumOfScores, SUM_OF_SCORES)
                                .withBroadcastSet(vertexCount, VERTEX_COUNT)
                                        .setParallelism(parallelism)
@@ -211,7 +211,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                                .join(adjustedScores)
                                .where(0)
                                .equalTo(0)
-                               .with(new ChangeInScores<K>())
+                               .with(new ChangeInScores<>())
                                        .setParallelism(parallelism)
                                        .name("Change in scores");
 
@@ -222,7 +222,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                return iterative
                        .closeWith(passThrough)
-                       .map(new TranslateResult<K>())
+                       .map(new TranslateResult<>())
                                .setParallelism(parallelism)
                                .name("Map result");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index c88e401..7294fd1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -92,15 +92,15 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
                // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == 
deg(v) and u < v)
                DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = 
edgeDegreesPair
-                       .flatMap(new EdgeStats<K, EV>())
+                       .flatMap(new EdgeStats<>())
                                .setParallelism(parallelism)
                                .name("Edge stats")
                        .groupBy(0, 1)
-                       .reduceGroup(new ReduceEdgeStats<K>())
+                       .reduceGroup(new ReduceEdgeStats<>())
                                .setParallelism(parallelism)
                                .name("Reduce edge stats")
                        .groupBy(0)
-                       .reduce(new SumEdgeStats<K>())
+                       .reduce(new SumEdgeStats<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum edge stats");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 4c0d654..8c520e6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -103,11 +103,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
                // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == 
deg(v) and u < v)
                DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = 
edgeDegreePair
-                       .map(new EdgeStats<K, EV>())
+                       .map(new EdgeStats<>())
                                .setParallelism(parallelism)
                                .name("Edge stats")
                        .groupBy(0)
-                       .reduce(new SumEdgeStats<K>())
+                       .reduce(new SumEdgeStats<>())
                        .setCombineHint(CombineHint.HASH)
                                .setParallelism(parallelism)
                                .name("Sum edge stats");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index d761f60..752e206 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -153,7 +153,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = 
input
                        .run(new VertexDegree<K, VV, EV>()
                                .setParallelism(parallelism))
-                       .map(new VertexInverseLogDegree<K>())
+                       .map(new VertexInverseLogDegree<>())
                                .setParallelism(parallelism)
                                .name("Vertex score");
 
@@ -172,7 +172,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                DataSet<Tuple4<IntValue, K, K, FloatValue>> groupSpans = 
sourceInverseLogDegree
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
-                       .reduceGroup(new GenerateGroupSpans<K>())
+                       .reduceGroup(new GenerateGroupSpans<>())
                                .setParallelism(parallelism)
                                .name("Generate group spans");
 
@@ -181,7 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .rebalance()
                                .setParallelism(parallelism)
                                .name("Rebalance")
-                       .flatMap(new GenerateGroups<K>())
+                       .flatMap(new GenerateGroups<>())
                                .setParallelism(parallelism)
                                .name("Generate groups");
 
@@ -189,19 +189,19 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                DataSet<Tuple3<K, K, FloatValue>> twoPaths = groups
                        .groupBy(0, 1)
                        .sortGroup(2, Order.ASCENDING)
-                       .reduceGroup(new GenerateGroupPairs<K>())
+                       .reduceGroup(new GenerateGroupPairs<>())
                                .name("Generate group pairs");
 
                // t, u, adamic-adar score
                GroupReduceOperator<Tuple3<K, K, FloatValue>, Result<K>> scores 
= twoPaths
                        .groupBy(0, 1)
-                       .reduceGroup(new ComputeScores<K>(minimumScore, 
minimumRatio))
+                       .reduceGroup(new ComputeScores<>(minimumScore, 
minimumRatio))
                                .name("Compute scores");
 
                if (minimumRatio > 0.0f) {
                        // total score, number of pairs of neighbors
                        DataSet<Tuple2<FloatValue, LongValue>> 
sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
-                               .map(new ComputeScoreFromVertex<K>())
+                               .map(new ComputeScoreFromVertex<>())
                                        .setParallelism(parallelism)
                                        .name("Average score")
                                .sum(0)
@@ -213,7 +213,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
 
                if (mirrorResults) {
                        return scores
-                               .flatMap(new MirrorResult<K, Result<K>>())
+                               .flatMap(new MirrorResult<>())
                                        .name("Mirror results");
                } else {
                        return scores;

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 8e820ac..92bf9e3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -199,7 +199,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = 
neighborDegree
                        .groupBy(0)
                        .sortGroup(1, Order.ASCENDING)
-                       .reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
+                       .reduceGroup(new GenerateGroupSpans<>(groupSize))
                                .setParallelism(parallelism)
                                .name("Generate group spans");
 
@@ -208,7 +208,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        .rebalance()
                                .setParallelism(parallelism)
                                .name("Rebalance")
-                       .flatMap(new GenerateGroups<K>())
+                       .flatMap(new GenerateGroups<>())
                                .setParallelism(parallelism)
                                .name("Generate groups");
 
@@ -216,20 +216,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                DataSet<Tuple3<K, K, IntValue>> twoPaths = groups
                        .groupBy(0, 1)
                        .sortGroup(2, Order.ASCENDING)
-                       .reduceGroup(new GenerateGroupPairs<K>(groupSize))
+                       .reduceGroup(new GenerateGroupPairs<>(groupSize))
                                .name("Generate group pairs");
 
                // t, u, intersection, union
                DataSet<Result<K>> scores = twoPaths
                        .groupBy(0, 1)
-                       .reduceGroup(new ComputeScores<K>(unboundedScores,
+                       .reduceGroup(new ComputeScores<>(unboundedScores,
                                        minimumScoreNumerator, 
minimumScoreDenominator,
                                        maximumScoreNumerator, 
maximumScoreDenominator))
                                .name("Compute scores");
 
                if (mirrorResults) {
                        scores = scores
-                               .flatMap(new MirrorResult<K, Result<K>>())
+                               .flatMap(new MirrorResult<>())
                                        .name("Mirror results");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 69fcc52..39b9bcf 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -48,7 +48,7 @@ public class VertexCentricConfiguration extends 
IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSet(String name, DataSet<?> data) {
-               this.bcVars.add(new Tuple2<String, DataSet<?>>(name, data));
+               this.bcVars.add(new Tuple2<>(name, data));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index c30b1a7..6c06a3a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -176,9 +176,9 @@ public class VertexCentricIteration<K, VV, EV, Message>
                DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> 
verticesWithMsgs =
                                
iteration.getSolutionSet().join(iteration.getWorkset())
                                .where(0).equalTo(0)
-                               .with(new AppendVertexState<K, VV, Message>())
-                               .returns(new TupleTypeInfo<Tuple2<Vertex<K, 
VV>, Either<NullValue, Message>>>(
-                                               vertexType, 
nullableMsgTypeInfo));
+                               .with(new AppendVertexState<>())
+                               .returns(new TupleTypeInfo<>(
+                                       vertexType, nullableMsgTypeInfo));
 
                VertexComputeUdf<K, VV, EV, Message> vertexUdf =
                        new VertexComputeUdf<>(computeFunction, 
intermediateTypeInfo);
@@ -190,11 +190,11 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
                // compute the solution set delta
                DataSet<Vertex<K, VV>> solutionSetDelta = 
superstepComputation.flatMap(
-                               new ProjectNewVertexValue<K, VV, 
Message>()).returns(vertexType);
+                       new ProjectNewVertexValue<>()).returns(vertexType);
 
                // compute the inbox of each vertex for the next superstep (new 
workset)
                DataSet<Tuple2<K, Either<NullValue, Message>>> allMessages = 
superstepComputation.flatMap(
-                               new ProjectMessages<K, VV, 
Message>()).returns(workSetTypeInfo);
+                       new ProjectMessages<>()).returns(workSetTypeInfo);
 
                DataSet<Tuple2<K, Either<NullValue, Message>>> newWorkSet = 
allMessages;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
index 6a62847..0422f13 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherConfiguration.java
@@ -59,7 +59,7 @@ public class ScatterGatherConfiguration extends 
IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSetForScatterFunction(String name, DataSet<?> 
data) {
-               this.bcVarsScatter.add(new Tuple2<String, DataSet<?>>(name, 
data));
+               this.bcVarsScatter.add(new Tuple2<>(name, data));
        }
 
        /**
@@ -69,7 +69,7 @@ public class ScatterGatherConfiguration extends 
IterationConfiguration {
         * @param data The data set to be broadcasted.
         */
        public void addBroadcastSetForGatherFunction(String name, DataSet<?> 
data) {
-               this.bcVarsGather.add(new Tuple2<String, DataSet<?>>(name, 
data));
+               this.bcVarsGather.add(new Tuple2<>(name, data));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 3e2ac23..8082cd9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -46,7 +46,7 @@ public class GraphUtils {
         */
        public static <T> DataSet<LongValue> count(DataSet<T> input) {
                return input
-                       .map(new MapTo<T, LongValue>(new LongValue(1)))
+                       .map(new MapTo<>(new LongValue(1)))
                                .returns(LONG_VALUE_TYPE_INFO)
                                .name("Emit 1")
                        .reduce(new AddLongValue())

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 57aa987..b85b6e6 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -48,11 +48,11 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends 
GraphValidator<K, VV,
        @Override
        public boolean validate(Graph<K, VV, EV> graph) throws Exception {
                DataSet<Tuple1<K>> edgeIds = graph.getEdges()
-                               .flatMap(new MapEdgeIds<K, EV>()).distinct();
+                               .flatMap(new MapEdgeIds<>()).distinct();
                DataSet<K> invalidIds = 
graph.getVertices().coGroup(edgeIds).where(0)
-                               .equalTo(0).with(new GroupInvalidIds<K, 
VV>()).first(1);
+                               .equalTo(0).with(new 
GroupInvalidIds<>()).first(1);
 
-               return invalidIds.map(new KToTupleMap<K>()).count() == 0;
+               return invalidIds.map(new KToTupleMap<>()).count() == 0;
        }
 
        private static final class MapEdgeIds<K, EV> implements 
FlatMapFunction<Edge<K, EV>, Tuple1<K>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index f89d4f5..1afb5da 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -120,7 +120,7 @@ public class AsmTestBase {
 
                return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), 
vertexCount, edgeCount)
                        .generate()
-                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, 
NullValue>());
+                       .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<>());
        }
 
        /**
@@ -149,6 +149,6 @@ public class AsmTestBase {
 
                return new RMatGraph<>(env, new JDKRandomGeneratorFactory(), 
vertexCount, edgeCount)
                        .generate()
-                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, 
NullValue>(false));
+                       .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<>(false));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 22b47fe..63bf133 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
                        "(5,3,((null),(1,1,0),(4,2,2)))";
 
                DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> 
degreesPair = directedSimpleGraph
-                       .run(new EdgeDegreesPair<IntValue, NullValue, 
NullValue>());
+                       .run(new EdgeDegreesPair<>());
 
                TestBaseUtils.compareResultAsText(degreesPair.collect(), 
expectedResult);
        }
@@ -62,7 +62,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> 
degreesPair = directedRMatGraph(10, 16)
-                       .run(new EdgeDegreesPair<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeDegreesPair<>());
 
                Checksum checksum = new ChecksumHashCode<Edge<LongValue, 
Tuple3<NullValue, Degrees, Degrees>>>()
                        .run(degreesPair)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index f0d51d2..967cfb2 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
                        "(5,3,((null),(1,1,0)))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> 
sourceDegrees = directedSimpleGraph
-                               .run(new EdgeSourceDegrees<IntValue, NullValue, 
NullValue>());
+                               .run(new EdgeSourceDegrees<>());
 
                TestBaseUtils.compareResultAsText(sourceDegrees.collect(), 
expectedResult);
        }
@@ -62,7 +62,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
sourceDegrees = directedRMatGraph(10, 16)
-                       .run(new EdgeSourceDegrees<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeSourceDegrees<>());
 
                Checksum checksum = new ChecksumHashCode<Edge<LongValue, 
Tuple2<NullValue, Degrees>>>()
                        .run(sourceDegrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 6d58bb0..abb76c4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -53,7 +53,7 @@ extends AsmTestBase {
                        "(5,3,((null),(4,2,2)))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> 
targetDegrees = directedSimpleGraph
-                               .run(new EdgeTargetDegrees<IntValue, NullValue, 
NullValue>());
+                               .run(new EdgeTargetDegrees<>());
 
                TestBaseUtils.compareResultAsText(targetDegrees.collect(), 
expectedResult);
        }
@@ -62,7 +62,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> 
targetDegrees = directedRMatGraph(10, 16)
-                       .run(new EdgeTargetDegrees<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeTargetDegrees<>());
 
                Checksum checksum = new ChecksumHashCode<Edge<LongValue, 
Tuple2<NullValue, Degrees>>>()
                        .run(targetDegrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index 5214282..91f354f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -43,7 +43,7 @@ extends AsmTestBase {
        public void testWithSimpleDirectedGraph()
                        throws Exception {
                DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
-                       .run(new VertexDegrees<IntValue, NullValue, 
NullValue>());
+                       .run(new VertexDegrees<>());
 
                String expectedResult =
                        "(0,(2,2,0))\n" +
@@ -60,7 +60,7 @@ extends AsmTestBase {
        public void testWithSimpleUndirectedGraph()
                        throws Exception {
                DataSet<Vertex<IntValue, Degrees>> degrees = 
undirectedSimpleGraph
-                       .run(new VertexDegrees<IntValue, NullValue, 
NullValue>());
+                       .run(new VertexDegrees<>());
 
                String expectedResult =
                        "(0,(2,2,2))\n" +
@@ -100,7 +100,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
        throws Exception {
                DataSet<Vertex<LongValue, Degrees>> degrees = 
directedRMatGraph(10, 16)
-                       .run(new VertexDegrees<LongValue, NullValue, 
NullValue>());
+                       .run(new VertexDegrees<>());
 
                Checksum checksum = new ChecksumHashCode<Vertex<LongValue, 
Degrees>>()
                        .run(degrees)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 5f492e4..1cae2e7 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
                        "(5,3,((null),1,4))";
 
                DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
-                       .run(new EdgeDegreePair<IntValue, NullValue, 
NullValue>());
+                       .run(new EdgeDegreePair<>());
 
                
TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), 
expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, 
LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeDegreePair<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeDegreePair<>());
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
                        .run(degreePairOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 393220d..2d8b2e3 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
                        "(5,3,((null),1))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = undirectedSimpleGraph
-                       .run(new EdgeSourceDegree<IntValue, NullValue, 
NullValue>());
+                       .run(new EdgeSourceDegree<>());
 
                
TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), 
expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
sourceDegreeOnSourceId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeSourceDegree<>());
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
                        .run(sourceDegreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index 782296a..a7c88a1 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -59,7 +59,7 @@ extends AsmTestBase {
                        "(5,3,((null),4))";
 
                DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = undirectedSimpleGraph
-                               .run(new EdgeTargetDegree<IntValue, NullValue, 
NullValue>());
+                               .run(new EdgeTargetDegree<>());
 
                
TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), 
expectedResult);
 
@@ -74,7 +74,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> 
targetDegreeOnTargetId = undirectedRMatGraph(10, 16)
-                       .run(new EdgeSourceDegree<LongValue, NullValue, 
NullValue>());
+                       .run(new EdgeSourceDegree<>());
 
                Checksum checksumOnTargetId = new 
ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
                        .run(targetDegreeOnTargetId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index 192782d..bc76bff 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -50,7 +50,7 @@ extends AsmTestBase {
                        "(5,1)";
 
                DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = 
undirectedSimpleGraph
-                       .run(new VertexDegree<IntValue, NullValue, 
NullValue>());
+                       .run(new VertexDegree<>());
 
                TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), 
expectedResult);
 
@@ -67,7 +67,7 @@ extends AsmTestBase {
                long expectedDegree = completeGraphVertexCount - 1;
 
                DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = 
completeGraph
-                       .run(new VertexDegree<LongValue, NullValue, 
NullValue>());
+                       .run(new VertexDegree<>());
 
                for (Vertex<LongValue, LongValue> vertex : 
degreeOnSourceId.collect()) {
                        assertEquals(expectedDegree, 
vertex.getValue().getValue());
@@ -109,7 +109,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = 
undirectedRMatGraph(10, 16)
-                       .run(new VertexDegree<LongValue, NullValue, 
NullValue>());
+                       .run(new VertexDegree<>());
 
                Checksum checksumOnSourceId = new 
ChecksumHashCode<Vertex<LongValue, LongValue>>()
                        .run(degreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index f03d82c..51e7712 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
 import org.junit.Test;
@@ -41,7 +40,7 @@ extends AsmTestBase {
        public void testWithSimpleGraph()
                        throws Exception {
                Graph<IntValue, NullValue, NullValue> graph = 
undirectedSimpleGraph
-                       .run(new MaximumDegree<IntValue, NullValue, 
NullValue>(3));
+                       .run(new MaximumDegree<>(3));
 
                String expectedVerticesResult =
                        "(0,(null))\n" +
@@ -67,8 +66,8 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                Checksum checksum = undirectedRMatGraph(10, 16)
-                       .run(new MaximumDegree<LongValue, NullValue, 
NullValue>(16))
-                       .run(new ChecksumHashCode<LongValue, NullValue, 
NullValue>())
+                       .run(new MaximumDegree<>(16))
+                       .run(new ChecksumHashCode<>())
                        .execute();
 
                assertEquals(805, checksum.getCount());

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index a3aad4b..751d030 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -70,7 +70,7 @@ public class SimplifyTest {
                        "(1,0,(null))";
 
                Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-                       .run(new Simplify<IntValue, NullValue, NullValue>());
+                       .run(new Simplify<>());
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
index 6ff4292..68b4e0c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -71,7 +71,7 @@ public class SimplifyTest {
                        "(2,0,(null))";
 
                Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-                       .run(new Simplify<IntValue, NullValue, 
NullValue>(false));
+                       .run(new Simplify<>(false));
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);
        }
@@ -84,7 +84,7 @@ public class SimplifyTest {
                        "(1,0,(null))";
 
                Graph<IntValue, NullValue, NullValue> simpleGraph = graph
-                       .run(new Simplify<IntValue, NullValue, 
NullValue>(true));
+                       .run(new Simplify<>(true));
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);
        }

Reply via email to