Repository: flink
Updated Branches:
  refs/heads/master fabc5f96e -> 7eb58773e


http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
new file mode 100644
index 0000000..5036123
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompensatedSumTest {
+
+       /**
+        * When adding a series of numbers the order of the numbers should not 
impact the results.
+        *
+        * This test shows that a naive summation comes up with a different 
result than Kahan
+        * Summation when you start with either a smaller or larger number in 
some cases and
+        * helps prove our Kahan Summation is working.
+        */
+       @Test
+       public void testAdd1() throws Exception {
+               final CompensatedSum smallSum = new CompensatedSum(0.001, 0.0);
+               final CompensatedSum largeSum = new CompensatedSum(1000, 0.0);
+
+               CompensatedSum compensatedResult1 = smallSum;
+               CompensatedSum compensatedResult2 = largeSum;
+               double naiveResult1 = smallSum.value();
+               double naiveResult2 = largeSum.value();
+
+               for(int i = 0; i < 10; i++) {
+                       compensatedResult1 = compensatedResult1.add(smallSum);
+                       compensatedResult2 = compensatedResult2.add(smallSum);
+                       naiveResult1 += smallSum.value();
+                       naiveResult2 += smallSum.value();
+               }
+
+               compensatedResult1 = compensatedResult1.add(largeSum);
+               compensatedResult2 = compensatedResult2.add(smallSum);
+               naiveResult1 += largeSum.value();
+               naiveResult2 += smallSum.value();
+
+               // Kahan summation gave the same result no matter what order we 
added
+               Assert.assertEquals(1000.011, compensatedResult1.value(), 0.0);
+               Assert.assertEquals(1000.011, compensatedResult2.value(), 0.0);
+
+               // naive addition gave a small floating point error
+               Assert.assertEquals(1000.011, naiveResult1, 0.0);
+               Assert.assertEquals(1000.0109999999997, naiveResult2, 0.0);
+
+               Assert.assertEquals(compensatedResult1.value(), 
compensatedResult2.value(), 0.0);
+               Assert.assertEquals(naiveResult1, naiveResult2, 0.0001);
+               Assert.assertNotEquals(naiveResult1, naiveResult2, 0.0);
+       }
+
+       @Test
+       public void testDelta() throws Exception {
+               CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 
0.0);
+               for(int i = 0; i < 10; i++) {
+                       compensatedResult1 = compensatedResult1.add(0.001);
+               }
+               Assert.assertEquals(0.011, compensatedResult1.value(), 0.0);
+               Assert.assertEquals(new Double("8.673617379884035E-19"), 
compensatedResult1.delta(), 0.0);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
new file mode 100644
index 0000000..08fbe78
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DoubleSummaryAggregatorTest {
+
+       /**
+        * Use some values from Anscombe's Quartet for testing.
+        *
+        * There was no particular reason to use these except they have known 
means and variance.
+        *
+        * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+        */
+       @Test
+       public void testAnscomesQuartetXValues() throws Exception {
+
+               final Double[] q1x = { 10.0, 8.0, 13.0, 9.0, 11.0, 14.0, 6.0, 
4.0, 12.0, 7.0, 5.0 };
+               final Double[] q4x = { 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 19.0, 
8.0, 8.0, 8.0 };
+
+               NumericColumnSummary<Double> q1 = summarize(q1x);
+               NumericColumnSummary<Double> q4 = summarize(q4x);
+
+               Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0);
+               Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0);
+
+               Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 
1e-10d);
+               Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 
1e-10d);
+
+               double stddev = Math.sqrt(11.0);
+               Assert.assertEquals(stddev, 
q1.getStandardDeviation().doubleValue(), 1e-10d);
+               Assert.assertEquals(stddev, 
q4.getStandardDeviation().doubleValue(), 1e-10d);
+       }
+
+       /**
+        * Use some values from Anscombe's Quartet for testing.
+        *
+        * There was no particular reason to use these except they have known 
means and variance.
+        *
+        * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+        */
+       @Test
+       public void testAnscomesQuartetYValues() throws Exception {
+               final Double[] q1y = { 8.04, 6.95, 7.58, 8.81, 8.33, 9.96, 
7.24, 4.26, 10.84, 4.82, 5.68 };
+               final Double[] q2y = { 9.14, 8.14, 8.74, 8.77, 9.26, 8.1, 6.13, 
3.1, 9.13, 7.26, 4.74 };
+               final Double[] q3y = { 7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 
6.08, 5.39, 8.15, 6.42, 5.73 };
+               final Double[] q4y = { 6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 
5.25, 12.5, 5.56, 7.91, 6.89 };
+
+               NumericColumnSummary<Double> q1 = summarize(q1y);
+               NumericColumnSummary<Double> q2 = summarize(q2y);
+               NumericColumnSummary<Double> q3 = summarize(q3y);
+               NumericColumnSummary<Double> q4 = summarize(q4y);
+
+               // the y values are have less precisely matching means and 
variances
+
+               Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001);
+
+               Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01);
+       }
+
+       @Test
+       public void testIsNan() throws Exception {
+               DoubleSummaryAggregator ag = new DoubleSummaryAggregator();
+               Assert.assertFalse(ag.isNan(-1.0));
+               Assert.assertFalse(ag.isNan(0.0));
+               Assert.assertFalse(ag.isNan(23.0));
+               Assert.assertFalse(ag.isNan(Double.MAX_VALUE));
+               Assert.assertFalse(ag.isNan(Double.MIN_VALUE));
+               Assert.assertTrue(ag.isNan(Double.NaN));
+       }
+
+       @Test
+       public void testIsInfinite() throws Exception {
+               DoubleSummaryAggregator ag = new DoubleSummaryAggregator();
+               Assert.assertFalse(ag.isInfinite(-1.0));
+               Assert.assertFalse(ag.isInfinite(0.0));
+               Assert.assertFalse(ag.isInfinite(23.0));
+               Assert.assertFalse(ag.isInfinite(Double.MAX_VALUE));
+               Assert.assertFalse(ag.isInfinite(Double.MIN_VALUE));
+               Assert.assertTrue(ag.isInfinite(Double.POSITIVE_INFINITY));
+               Assert.assertTrue(ag.isInfinite(Double.NEGATIVE_INFINITY));
+       }
+
+       @Test
+       public void testMean() throws Exception {
+               Assert.assertEquals(50.0, summarize(0.0, 100.0).getMean(), 0.0);
+               Assert.assertEquals(33.333333, summarize(0.0, 0.0, 
100.0).getMean(), 0.00001);
+               Assert.assertEquals(50.0, summarize(0.0, 0.0, 100.0, 
100.0).getMean(), 0.0);
+               Assert.assertEquals(50.0, summarize(0.0, 100.0, 
null).getMean(), 0.0);
+               Assert.assertNull(summarize().getMean());
+       }
+
+       @Test
+       public void testSum() throws Exception {
+               Assert.assertEquals(100.0, summarize(0.0, 
100.0).getSum().doubleValue(), 0.0);
+               Assert.assertEquals(15, summarize(1.0, 2.0, 3.0, 4.0, 
5.0).getSum().doubleValue(), 0.0);
+               Assert.assertEquals(0, summarize(-100.0, 0.0, 100.0, 
null).getSum().doubleValue(), 0.0);
+               Assert.assertEquals(90, summarize(-10.0, 100.0, 
null).getSum().doubleValue(), 0.0);
+               Assert.assertNull(summarize().getSum());
+       }
+
+       @Test
+       public void testMax() throws Exception {
+               Assert.assertEquals(1001.0, summarize(-1000.0, 0.0, 1.0, 50.0, 
999.0, 1001.0).getMax().doubleValue(), 0.0);
+               Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 
10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0);
+               Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 
null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 
0.0);
+               Assert.assertNull(summarize().getMax());
+       }
+
+       @Test
+       public void testMin() throws Exception {
+               Assert.assertEquals(-1000, summarize(-1000.0, 0.0, 1.0, 50.0, 
999.0, 1001.0).getMin().doubleValue(), 0.0);
+               Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 
10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0);
+               Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 
null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 
0.0);
+               Assert.assertNull(summarize().getMin());
+       }
+
+       @Test
+       public void testCounts() throws Exception {
+               NumericColumnSummary<Double> summary = summarize(Double.NaN, 
1.0, null, 123.0, -44.00001, Double.POSITIVE_INFINITY, 55.0, 
Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY, null, Double.NaN);
+               Assert.assertEquals(11, summary.getTotalCount());
+               Assert.assertEquals(2, summary.getNullCount());
+               Assert.assertEquals(9, summary.getNonNullCount());
+               Assert.assertEquals(7, summary.getMissingCount());
+               Assert.assertEquals(4, summary.getNonMissingCount());
+               Assert.assertEquals(2, summary.getNanCount());
+               Assert.assertEquals(3, summary.getInfinityCount());
+       }
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating and combining
+        * a bunch of different ways.
+        */
+       protected NumericColumnSummary<Double> summarize(Double... values) {
+               return new 
AggregateCombineHarness<Double,NumericColumnSummary<Double>,DoubleSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Double> result1, 
NumericColumnSummary<Double> result2) {
+                               Assert.assertEquals(result1.getMin(), 
result2.getMin(), 0.0);
+                               Assert.assertEquals(result1.getMax(), 
result2.getMax(), 0.0);
+                               Assert.assertEquals(result1.getMean(), 
result2.getMean(), 1e-12d);
+                               Assert.assertEquals(result1.getVariance(), 
result2.getVariance(), 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation(), 
result2.getStandardDeviation(), 1e-12d);
+                       }
+
+               }.summarize(values);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..a30d6aa
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.DoubleValue;
+import org.junit.Assert;
+
+public class DoubleValueSummaryAggregatorTest extends 
DoubleSummaryAggregatorTest {
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating and combining
+        * a bunch of different ways.
+        */
+       protected NumericColumnSummary<Double> summarize(Double... values) {
+
+               DoubleValue[] doubleValues = new DoubleValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               doubleValues[i] = new DoubleValue(values[i]);
+                       }
+               }
+
+               return new 
AggregateCombineHarness<DoubleValue,NumericColumnSummary<Double>,ValueSummaryAggregator.DoubleValueSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Double> result1, 
NumericColumnSummary<Double> result2) {
+                               Assert.assertEquals(result1.getMin(), 
result2.getMin(), 0.0);
+                               Assert.assertEquals(result1.getMax(), 
result2.getMax(), 0.0);
+                               Assert.assertEquals(result1.getMean(), 
result2.getMean(), 1e-12d);
+                               Assert.assertEquals(result1.getVariance(), 
result2.getVariance(), 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation(), 
result2.getStandardDeviation(), 1e-12d);
+                       }
+
+               }.summarize(doubleValues);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
new file mode 100644
index 0000000..c761fc2
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0f (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0f
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class FloatSummaryAggregatorTest {
+
+       /**
+        * Use some values from Anscombe's Quartet for testing.
+        *
+        * There was no particular reason to use these except they have known 
means and variance.
+        *
+        * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+        */
+       @Test
+       public void testAnscomesQuartetXValues() throws Exception {
+
+               final Float[] q1x = { 10.0f, 8.0f, 13.0f, 9.0f, 11.0f, 14.0f, 
6.0f, 4.0f, 12.0f, 7.0f, 5.0f };
+               final Float[] q4x = { 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 
19.0f, 8.0f, 8.0f, 8.0f };
+
+               NumericColumnSummary<Float> q1 = summarize(q1x);
+               NumericColumnSummary<Float> q4 = summarize(q4x);
+
+               Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0f);
+               Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0f);
+
+               Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 
1e-10d);
+               Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 
1e-10d);
+
+               double stddev = Math.sqrt(11.0f);
+               Assert.assertEquals(stddev, 
q1.getStandardDeviation().doubleValue(), 1e-10d);
+               Assert.assertEquals(stddev, 
q4.getStandardDeviation().doubleValue(), 1e-10d);
+       }
+
+       /**
+        * Use some values from Anscombe's Quartet for testing.
+        *
+        * There was no particular reason to use these except they have known 
means and variance.
+        *
+        * https://en.wikipedia.org/wiki/Anscombe%27s_quartet
+        */
+       @Test
+       public void testAnscomesQuartetYValues() throws Exception {
+               final Float[] q1y = { 8.04f, 6.95f, 7.58f, 8.81f, 8.33f, 9.96f, 
7.24f, 4.26f, 10.84f, 4.82f, 5.68f };
+               final Float[] q2y = { 9.14f, 8.14f, 8.74f, 8.77f, 9.26f, 8.1f, 
6.13f, 3.1f, 9.13f, 7.26f, 4.74f };
+               final Float[] q3y = { 7.46f, 6.77f, 12.74f, 7.11f, 7.81f, 
8.84f, 6.08f, 5.39f, 8.15f, 6.42f, 5.73f };
+               final Float[] q4y = { 6.58f, 5.76f, 7.71f, 8.84f, 8.47f, 7.04f, 
5.25f, 12.5f, 5.56f, 7.91f, 6.89f };
+
+               NumericColumnSummary<Float> q1 = summarize(q1y);
+               NumericColumnSummary<Float> q2 = summarize(q2y);
+               NumericColumnSummary<Float> q3 = summarize(q3y);
+               NumericColumnSummary<Float> q4 = summarize(q4y);
+
+               // the y values are have less precisely matching means and 
variances
+
+               Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001);
+               Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001);
+
+               Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01);
+               Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01);
+       }
+
+       @Test
+       public void testIsNan() throws Exception {
+               FloatSummaryAggregator ag = new FloatSummaryAggregator();
+               Assert.assertFalse(ag.isNan(-1.0f));
+               Assert.assertFalse(ag.isNan(0.0f));
+               Assert.assertFalse(ag.isNan(23.0f));
+               Assert.assertFalse(ag.isNan(Float.MAX_VALUE));
+               Assert.assertFalse(ag.isNan(Float.MIN_VALUE));
+               Assert.assertTrue(ag.isNan(Float.NaN));
+       }
+
+       @Test
+       public void testIsInfinite() throws Exception {
+               FloatSummaryAggregator ag = new FloatSummaryAggregator();
+               Assert.assertFalse(ag.isInfinite(-1.0f));
+               Assert.assertFalse(ag.isInfinite(0.0f));
+               Assert.assertFalse(ag.isInfinite(23.0f));
+               Assert.assertFalse(ag.isInfinite(Float.MAX_VALUE));
+               Assert.assertFalse(ag.isInfinite(Float.MIN_VALUE));
+               Assert.assertTrue(ag.isInfinite(Float.POSITIVE_INFINITY));
+               Assert.assertTrue(ag.isInfinite(Float.NEGATIVE_INFINITY));
+       }
+
+       @Test
+       public void testMean() throws Exception {
+               Assert.assertEquals(50.0, summarize(0.0f, 100.0f).getMean(), 
0.0);
+               Assert.assertEquals(33.333333, summarize(0.0f, 0.0f, 
100.0f).getMean(), 0.00001);
+               Assert.assertEquals(50.0, summarize(0.0f, 0.0f, 100.0f, 
100.0f).getMean(), 0.0);
+               Assert.assertEquals(50.0, summarize(0.0f, 100.0f, 
null).getMean(), 0.0);
+               Assert.assertNull(summarize().getMean());
+       }
+
+       @Test
+       public void testSum() throws Exception {
+               Assert.assertEquals(100.0, summarize(0.0f, 
100.0f).getSum().floatValue(), 0.0f);
+               Assert.assertEquals(15, summarize(1.0f, 2.0f, 3.0f, 4.0f, 
5.0f).getSum().floatValue(), 0.0f);
+               Assert.assertEquals(0, summarize(-100.0f, 0.0f, 100.0f, 
null).getSum().floatValue(), 0.0f);
+               Assert.assertEquals(90, summarize(-10.0f, 100.0f, 
null).getSum().floatValue(), 0.0f);
+               Assert.assertNull(summarize().getSum());
+       }
+
+       @Test
+       public void testMax() throws Exception {
+               Assert.assertEquals(1001.0f, summarize(-1000.0f, 0.0f, 1.0f, 
50.0f, 999.0f, 1001.0f).getMax().floatValue(), 0.0f);
+               Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 
9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 
0.0f);
+               Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 
9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 
3.0f).getMax().floatValue(), 0.0f);
+               Assert.assertNull(summarize().getMax());
+       }
+
+       @Test
+       public void testMin() throws Exception {
+               Assert.assertEquals(-1000, summarize(-1000.0f, 0.0f, 1.0f, 
50.0f, 999.0f, 1001.0f).getMin().floatValue(), 0.0f);
+               Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 
9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 
0.0f);
+               Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 
9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 
3.0f).getMin().floatValue(), 0.0f);
+               Assert.assertNull(summarize().getMin());
+       }
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating
+        * and combining a bunch of different ways.
+        */
+       protected NumericColumnSummary<Float> summarize(Float... values) {
+
+               return new 
AggregateCombineHarness<Float,NumericColumnSummary<Float>,FloatSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> 
result2) {
+                               Assert.assertEquals(result1.getMin(), 
result2.getMin(), 0.0f);
+                               Assert.assertEquals(result1.getMax(), 
result2.getMax(), 0.0f);
+                               Assert.assertEquals(result1.getMean(), 
result2.getMean(), 1e-12d);
+                               Assert.assertEquals(result1.getVariance(), 
result2.getVariance(), 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation(), 
result2.getStandardDeviation(), 1e-12d);
+                       }
+
+               }.summarize(values);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..ff87946
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.FloatValue;
+import org.junit.Assert;
+
+public class FloatValueSummaryAggregatorTest extends 
FloatSummaryAggregatorTest {
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating
+        * and combining a bunch of different ways.
+        */
+       @Override
+       protected NumericColumnSummary<Float> summarize(Float... values) {
+
+               FloatValue[] floatValues = new FloatValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               floatValues[i] = new FloatValue(values[i]);
+                       }
+               }
+
+               return new 
AggregateCombineHarness<FloatValue,NumericColumnSummary<Float>,ValueSummaryAggregator.FloatValueSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> 
result2) {
+                               Assert.assertEquals(result1.getMin(), 
result2.getMin(), 0.0f);
+                               Assert.assertEquals(result1.getMax(), 
result2.getMax(), 0.0f);
+                               Assert.assertEquals(result1.getMean(), 
result2.getMean(), 1e-10d);
+                               Assert.assertEquals(result1.getVariance(), 
result2.getVariance(), 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation(), 
result2.getStandardDeviation(), 1e-10d);
+                       }
+
+               }.summarize(floatValues);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
new file mode 100644
index 0000000..110d2cc
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IntegerSummaryAggregatorTest {
+
+       @Test
+       public void testIsNan() throws Exception {
+               IntegerSummaryAggregator ag = new IntegerSummaryAggregator();
+               // always false for Integer
+               Assert.assertFalse(ag.isNan(-1));
+               Assert.assertFalse(ag.isNan(0));
+               Assert.assertFalse(ag.isNan(23));
+               Assert.assertFalse(ag.isNan(Integer.MAX_VALUE));
+               Assert.assertFalse(ag.isNan(Integer.MIN_VALUE));
+               Assert.assertFalse(ag.isNan(null));
+       }
+
+       @Test
+       public void testIsInfinite() throws Exception {
+               IntegerSummaryAggregator ag = new IntegerSummaryAggregator();
+               // always false for Integer
+               Assert.assertFalse(ag.isInfinite(-1));
+               Assert.assertFalse(ag.isInfinite(0));
+               Assert.assertFalse(ag.isInfinite(23));
+               Assert.assertFalse(ag.isInfinite(Integer.MAX_VALUE));
+               Assert.assertFalse(ag.isInfinite(Integer.MIN_VALUE));
+               Assert.assertFalse(ag.isInfinite(null));
+       }
+
+       @Test
+       public void testMean() throws Exception {
+               Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0);
+               Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 
0.00001);
+               Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 
0.0);
+               Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 
0.0);
+               Assert.assertNull(summarize().getMean());
+       }
+
+       @Test
+       public void testSum() throws Exception {
+               Assert.assertEquals(100, summarize(0, 100).getSum().intValue());
+               Assert.assertEquals(15, summarize(1, 2, 3, 4, 
5).getSum().intValue());
+               Assert.assertEquals(0, summarize(-100, 0, 100, 
null).getSum().intValue());
+               Assert.assertEquals(90, summarize(-10, 100, 
null).getSum().intValue());
+               Assert.assertNull(summarize().getSum());
+       }
+
+       @Test
+       public void testMax() throws Exception {
+               Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 
1001).getMax().intValue());
+               Assert.assertEquals(0, summarize(Integer.MIN_VALUE, -1000, 
0).getMax().intValue());
+               Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 
0, 11, -2, 3).getMax().intValue());
+               Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 
3, 5, null, 0, 11, -2, 3).getMax().intValue());
+               Assert.assertNull(summarize().getMax());
+       }
+
+       @Test
+       public void testMin() throws Exception {
+               Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 
1001).getMin().intValue());
+               Assert.assertEquals(Integer.MIN_VALUE, 
summarize(Integer.MIN_VALUE, -1000, 0).getMin().intValue());
+               Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 
0, 11, -2, 3).getMin().intValue());
+               Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 
3, 5, null, 0, 11, -2, 3).getMin().intValue());
+               Assert.assertNull(summarize().getMin());
+       }
+
+       /**
+        * Helper method for summarizing a list of values
+        */
+       protected NumericColumnSummary<Integer> summarize(Integer... values) {
+
+               return new 
AggregateCombineHarness<Integer,NumericColumnSummary<Integer>,IntegerSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Integer> result1, 
NumericColumnSummary<Integer> result2) {
+
+                               
Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+                               
Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               
Assert.assertEquals(result1.containsNull(),result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue());
+                               
Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue());
+                               
Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(values);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..6ac5485
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.IntValue;
+import org.junit.Assert;
+
+public class IntegerValueSummaryAggregatorTest extends 
IntegerSummaryAggregatorTest {
+
+       @Override
+       protected NumericColumnSummary<Integer> summarize(Integer... values) {
+
+               IntValue[] intValues = new IntValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               intValues[i] = new IntValue(values[i]);
+                       }
+               }
+
+               return new 
AggregateCombineHarness<IntValue,NumericColumnSummary<Integer>,ValueSummaryAggregator.IntegerValueSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Integer> result1, 
NumericColumnSummary<Integer> result2) {
+
+                               Assert.assertEquals(result1.getTotalCount(), 
result2.getTotalCount());
+                               
Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               
Assert.assertEquals(result1.containsNull(),result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue());
+                               
Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue());
+                               
Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(intValues);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
new file mode 100644
index 0000000..1905657
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LongSummaryAggregatorTest {
+
+       @Test
+       public void testIsNan() throws Exception {
+               LongSummaryAggregator ag = new LongSummaryAggregator();
+               // always false for Long
+               Assert.assertFalse(ag.isNan(-1L));
+               Assert.assertFalse(ag.isNan(0L));
+               Assert.assertFalse(ag.isNan(23L));
+               Assert.assertFalse(ag.isNan(Long.MAX_VALUE));
+               Assert.assertFalse(ag.isNan(Long.MIN_VALUE));
+               Assert.assertFalse(ag.isNan(null));
+       }
+
+       @Test
+       public void testIsInfinite() throws Exception {
+               LongSummaryAggregator ag = new LongSummaryAggregator();
+               // always false for Long
+               Assert.assertFalse(ag.isInfinite(-1L));
+               Assert.assertFalse(ag.isInfinite(0L));
+               Assert.assertFalse(ag.isInfinite(23L));
+               Assert.assertFalse(ag.isInfinite(Long.MAX_VALUE));
+               Assert.assertFalse(ag.isInfinite(Long.MIN_VALUE));
+               Assert.assertFalse(ag.isInfinite(null));
+       }
+
+       @Test
+       public void testMean() throws Exception {
+               Assert.assertEquals(50.0, summarize(0L, 100L).getMean(), 0.0);
+               Assert.assertEquals(33.333333, summarize(0L, 0L, 
100L).getMean(), 0.00001);
+               Assert.assertEquals(50.0, summarize(0L, 0L, 100L, 
100L).getMean(), 0.0);
+               Assert.assertEquals(50.0, summarize(0L, 100L, null).getMean(), 
0.0);
+               Assert.assertNull(summarize().getMean());
+       }
+
+       @Test
+       public void testSum() throws Exception {
+               Assert.assertEquals(100L, summarize(0L, 
100L).getSum().longValue());
+               Assert.assertEquals(15L, summarize(1L, 2L, 3L, 4L, 
5L).getSum().longValue());
+               Assert.assertEquals(0L, summarize(-100L, 0L, 100L, 
null).getSum().longValue());
+               Assert.assertEquals(90L, summarize(-10L, 100L, 
null).getSum().longValue());
+               Assert.assertNull(summarize().getSum());
+       }
+
+       @Test
+       public void testMax() throws Exception {
+               Assert.assertEquals(1001L, summarize(-1000L, 0L, 1L, 50L, 999L, 
1001L).getMax().longValue());
+               Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 
3L, 5L, 0L, 11L, -2L, 3L).getMax().longValue());
+               Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, null, 
10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMax().longValue());
+               Assert.assertNull(summarize().getMax());
+       }
+
+       @Test
+       public void testMin() throws Exception {
+               Assert.assertEquals(-1000L, summarize(-1000L, 0L, 1L, 50L, 
999L, 1001L).getMin().longValue());
+               Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 
3L, 5L, 0L, 11L, -2L, 3L).getMin().longValue());
+               Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, null, 
10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMin().longValue());
+               Assert.assertNull(summarize().getMin());
+       }
+
+       /**
+        * Helper method for summarizing a list of values
+        */
+       protected NumericColumnSummary<Long> summarize(Long... values) {
+               return new 
AggregateCombineHarness<Long,NumericColumnSummary<Long>,LongSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> 
result2) {
+
+                               
Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+                               Assert.assertEquals(result1.getNullCount(), 
result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               Assert.assertEquals(result1.containsNull(), 
result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue());
+                               
Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue());
+                               
Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(values);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..eecda69
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.LongValue;
+import org.junit.Assert;
+
+public class LongValueSummaryAggregatorTest extends LongSummaryAggregatorTest {
+
+       /**
+        * Helper method for summarizing a list of values
+        */
+       @Override
+       protected NumericColumnSummary<Long> summarize(Long... values) {
+
+               LongValue[] longValues = new LongValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               longValues[i] = new LongValue(values[i]);
+                       }
+               }
+
+               return new 
AggregateCombineHarness<LongValue,NumericColumnSummary<Long>,ValueSummaryAggregator.LongValueSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> 
result2) {
+
+                               
Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+                               Assert.assertEquals(result1.getNullCount(), 
result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               Assert.assertEquals(result1.containsNull(), 
result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue());
+                               
Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue());
+                               
Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(longValues);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
new file mode 100644
index 0000000..ebbf627
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ShortSummaryAggregatorTest {
+
+       @Test
+       public void testIsNan() throws Exception {
+               ShortSummaryAggregator ag = new ShortSummaryAggregator();
+               // always false for Short
+               Assert.assertFalse(ag.isNan((short) -1));
+               Assert.assertFalse(ag.isNan((short) 0));
+               Assert.assertFalse(ag.isNan((short) 23));
+               Assert.assertFalse(ag.isNan(Short.MAX_VALUE));
+               Assert.assertFalse(ag.isNan(Short.MIN_VALUE));
+               Assert.assertFalse(ag.isNan(null));
+       }
+
+       @Test
+       public void testIsInfinite() throws Exception {
+               ShortSummaryAggregator ag = new ShortSummaryAggregator();
+               // always false for Short
+               Assert.assertFalse(ag.isInfinite((short) -1));
+               Assert.assertFalse(ag.isInfinite((short) 0));
+               Assert.assertFalse(ag.isInfinite((short) 23));
+               Assert.assertFalse(ag.isInfinite(Short.MAX_VALUE));
+               Assert.assertFalse(ag.isInfinite(Short.MIN_VALUE));
+               Assert.assertFalse(ag.isInfinite(null));
+       }
+
+       @Test
+       public void testMean() throws Exception {
+               Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0);
+               Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 
0.00001);
+               Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 
0.0);
+               Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 
0.0);
+               Assert.assertNull(summarize().getMean());
+       }
+
+       @Test
+       public void testSum() throws Exception {
+               Assert.assertEquals(100, summarize(0, 
100).getSum().shortValue());
+               Assert.assertEquals(15, summarize(1, 2, 3, 4, 
5).getSum().shortValue());
+               Assert.assertEquals(0, summarize(-100, 0, 100, 
null).getSum().shortValue());
+               Assert.assertEquals(90, summarize(-10, 100, 
null).getSum().shortValue());
+               Assert.assertNull(summarize().getSum());
+       }
+
+       @Test
+       public void testMax() throws Exception {
+               Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 
1001).getMax().shortValue());
+               Assert.assertEquals(0, summarize((int)Short.MIN_VALUE, -1000, 
0).getMax().shortValue());
+               Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 
0, 11, -2, 3).getMax().shortValue());
+               Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 
3, 5, null, 0, 11, -2, 3).getMax().shortValue());
+               Assert.assertNull(summarize().getMax());
+       }
+
+       @Test
+       public void testMin() throws Exception {
+               Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 
1001).getMin().shortValue());
+               Assert.assertEquals(Short.MIN_VALUE, 
summarize((int)Short.MIN_VALUE, -1000, 0).getMin().shortValue());
+               Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 
0, 11, -2, 3).getMin().shortValue());
+               Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 
3, 5, null, 0, 11, -2, 3).getMin().shortValue());
+               Assert.assertNull(summarize().getMin());
+       }
+
+       /**
+        * Helper method for summarizing a list of values
+        */
+       protected NumericColumnSummary<Short> summarize(Integer... values) {
+
+               // cast everything to short here
+               Short[] shortValues = new Short[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               shortValues[i] = values[i].shortValue();
+                       }
+               }
+
+               return new 
AggregateCombineHarness<Short,NumericColumnSummary<Short>,ShortSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> 
result2) {
+
+                               
Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount());
+                               
Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               
Assert.assertEquals(result1.containsNull(),result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue());
+                               
Assert.assertEquals(result1.getMax().shortValue(), 
result2.getMax().shortValue());
+                               
Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(shortValues);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..8a8e7aa
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.types.ShortValue;
+import org.junit.Assert;
+
+public class ShortValueSummaryAggregatorTest extends 
ShortSummaryAggregatorTest {
+
+       /**
+        * Helper method for summarizing a list of values
+        */
+       protected NumericColumnSummary<Short> summarize(Integer... values) {
+
+               ShortValue[] shortValues = new ShortValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               shortValues[i] = new 
ShortValue(values[i].shortValue());
+                       }
+               }
+
+               return new 
AggregateCombineHarness<ShortValue,NumericColumnSummary<Short>,ValueSummaryAggregator.ShortValueSummaryAggregator>()
 {
+
+                       @Override
+                       protected void 
compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> 
result2) {
+
+                               Assert.assertEquals(result1.getTotalCount(), 
result2.getTotalCount());
+                               
Assert.assertEquals(result1.getNullCount(),result2.getNullCount());
+                               
Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount());
+                               
Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount());
+                               
Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount());
+                               
Assert.assertEquals(result1.getNanCount(),result2.getNanCount());
+
+                               
Assert.assertEquals(result1.containsNull(),result2.containsNull());
+                               
Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull());
+
+                               
Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue());
+                               
Assert.assertEquals(result1.getMax().shortValue(), 
result2.getMax().shortValue());
+                               
Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue());
+                               
Assert.assertEquals(result1.getMean().doubleValue(), 
result2.getMean().doubleValue(), 1e-12d);
+                               
Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(),
 1e-9d);
+                               
Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(),
 1e-12d);
+                       }
+               }.summarize(shortValues);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
new file mode 100644
index 0000000..02fc125
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class StringSummaryAggregatorTest {
+
+       @Test
+       public void testMixedGroup() {
+               StringColumnSummary summary = summarize("abc", "", null, "  ", 
"defghi", "foo", null, null, "", " ");
+               Assert.assertEquals(10, summary.getTotalCount());
+               Assert.assertEquals(3, summary.getNullCount());
+               Assert.assertEquals(7, summary.getNonNullCount());
+               Assert.assertEquals(2, summary.getEmptyCount());
+               Assert.assertEquals(0, summary.getMinLength().intValue());
+               Assert.assertEquals(6, summary.getMaxLength().intValue());
+               Assert.assertEquals(2.142857, 
summary.getMeanLength().doubleValue(), 0.001);
+       }
+
+       @Test
+       public void testAllNullStrings() {
+               StringColumnSummary summary = summarize(null, null, null, null);
+               Assert.assertEquals(4, summary.getTotalCount());
+               Assert.assertEquals(4, summary.getNullCount());
+               Assert.assertEquals(0, summary.getNonNullCount());
+               Assert.assertEquals(0, summary.getEmptyCount());
+               Assert.assertNull(summary.getMinLength());
+               Assert.assertNull(summary.getMaxLength());
+               Assert.assertNull(summary.getMeanLength());
+       }
+
+       @Test
+       public void testAllWithValues() {
+               StringColumnSummary summary = summarize("cat", "hat", "dog", 
"frog");
+               Assert.assertEquals(4, summary.getTotalCount());
+               Assert.assertEquals(0, summary.getNullCount());
+               Assert.assertEquals(4, summary.getNonNullCount());
+               Assert.assertEquals(0, summary.getEmptyCount());
+               Assert.assertEquals(3, summary.getMinLength().intValue());
+               Assert.assertEquals(4, summary.getMaxLength().intValue());
+               Assert.assertEquals(3.25, 
summary.getMeanLength().doubleValue(), 0.0);
+       }
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating and combining
+        * a bunch of different ways.
+        */
+       protected StringColumnSummary summarize(String... values) {
+
+               return new 
AggregateCombineHarness<String,StringColumnSummary,StringSummaryAggregator>(){
+
+                       @Override
+                       protected void compareResults(StringColumnSummary 
result1, StringColumnSummary result2) {
+                               Assert.assertEquals(result1.getEmptyCount(), 
result2.getEmptyCount());
+                               Assert.assertEquals(result1.getMaxLength(), 
result2.getMaxLength());
+                               Assert.assertEquals(result1.getMinLength(), 
result2.getMinLength());
+                               if (result1.getMeanLength() == null) {
+                                       
Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength());
+                               }
+                               else {
+                                       
Assert.assertEquals(result1.getMeanLength().doubleValue(), 
result2.getMeanLength().doubleValue(), 1e-5d);
+                               }
+                               Assert.assertEquals(result1.getNullCount(), 
result2.getNullCount());
+                               Assert.assertEquals(result1.getNonNullCount(), 
result2.getNonNullCount());
+                       }
+
+               }.summarize(values);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
new file mode 100644
index 0000000..19bfd52
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.apache.flink.types.StringValue;
+import org.junit.Assert;
+
+public class StringValueSummaryAggregatorTest extends 
StringSummaryAggregatorTest {
+
+       /**
+        * Helper method for summarizing a list of values.
+        *
+        * This method breaks the rule of "testing only one thing" by 
aggregating and combining
+        * a bunch of different ways.
+        */
+       @Override
+       protected StringColumnSummary summarize(String... values) {
+
+               StringValue[] stringValues = new StringValue[values.length];
+               for(int i = 0; i < values.length; i++) {
+                       if (values[i] != null) {
+                               stringValues[i] = new StringValue(values[i]);
+                       }
+               }
+
+               return new 
AggregateCombineHarness<StringValue,StringColumnSummary,ValueSummaryAggregator.StringValueSummaryAggregator>(){
+
+                       @Override
+                       protected void compareResults(StringColumnSummary 
result1, StringColumnSummary result2) {
+                               Assert.assertEquals(result1.getEmptyCount(), 
result2.getEmptyCount());
+                               Assert.assertEquals(result1.getMaxLength(), 
result2.getMaxLength());
+                               Assert.assertEquals(result1.getMinLength(), 
result2.getMinLength());
+                               if (result1.getMeanLength() == null) {
+                                       
Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength());
+                               }
+                               else {
+                                       
Assert.assertEquals(result1.getMeanLength().doubleValue(), 
result2.getMeanLength().doubleValue(), 1e-5d);
+                               }
+
+                               Assert.assertEquals(result1.getNullCount(), 
result2.getNullCount());
+                               Assert.assertEquals(result1.getNonNullCount(), 
result2.getNonNullCount());
+                       }
+
+               }.summarize(stringValues);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
new file mode 100644
index 0000000..8134a90
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.summarize.aggregation;
+
+import org.apache.flink.types.*;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+public class SummaryAggregatorFactoryTest {
+
+       @Test
+       public void testCreate() throws Exception {
+               // supported primitive types
+               Assert.assertEquals(StringSummaryAggregator.class, 
SummaryAggregatorFactory.create(String.class).getClass());
+               Assert.assertEquals(ShortSummaryAggregator.class, 
SummaryAggregatorFactory.create(Short.class).getClass());
+               Assert.assertEquals(IntegerSummaryAggregator.class, 
SummaryAggregatorFactory.create(Integer.class).getClass());
+               Assert.assertEquals(LongSummaryAggregator.class, 
SummaryAggregatorFactory.create(Long.class).getClass());
+               Assert.assertEquals(FloatSummaryAggregator.class, 
SummaryAggregatorFactory.create(Float.class).getClass());
+               Assert.assertEquals(DoubleSummaryAggregator.class, 
SummaryAggregatorFactory.create(Double.class).getClass());
+               Assert.assertEquals(BooleanSummaryAggregator.class, 
SummaryAggregatorFactory.create(Boolean.class).getClass());
+
+               // supported value types
+               
Assert.assertEquals(ValueSummaryAggregator.StringValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(StringValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.ShortValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(ShortValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.IntegerValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(IntValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.LongValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(LongValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.FloatValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(FloatValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.DoubleValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(DoubleValue.class).getClass());
+               
Assert.assertEquals(ValueSummaryAggregator.BooleanValueSummaryAggregator.class, 
SummaryAggregatorFactory.create(BooleanValue.class).getClass());
+
+               // some not well supported types - these fallback to 
ObjectSummaryAggregator
+               Assert.assertEquals(ObjectSummaryAggregator.class, 
SummaryAggregatorFactory.create(Object.class).getClass());
+               Assert.assertEquals(ObjectSummaryAggregator.class, 
SummaryAggregatorFactory.create(List.class).getClass());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index afbcb89..1409848 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -22,9 +22,15 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.summarize.BooleanColumnSummary;
+import org.apache.flink.api.java.summarize.NumericColumnSummary;
+import org.apache.flink.api.java.summarize.StringColumnSummary;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.DoubleValue;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -106,4 +112,76 @@ public class DataSetUtilsITCase extends 
MultipleProgramsTestBase {
                Assert.assertEquals(checksum.getCount(), 15);
                Assert.assertEquals(checksum.getChecksum(), 55);
        }
+
+       @Test
+       public void testSummarize() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               List<Tuple8<Short, Integer, Long, Float, Double, String, 
Boolean, DoubleValue>> data = new ArrayList<>();
+               data.add(new Tuple8<>((short)1, 1, 100L, 0.1f, 1.012376, 
"hello", false, new DoubleValue(50.0)));
+               data.add(new Tuple8<>((short)2, 2, 1000L, 0.2f, 2.003453, 
"hello", true, new DoubleValue(50.0)));
+               data.add(new Tuple8<>((short)4, 10, 10000L, 0.2f, 75.00005, 
"null", true, new DoubleValue(50.0)));
+               data.add(new Tuple8<>((short)10, 4, 100L, 0.9f, 79.5, "", true, 
new DoubleValue(50.0)));
+               data.add(new Tuple8<>((short)5, 5, 1000L, 0.2f, 10.0000001, 
"a", false, new DoubleValue(50.0)));
+               data.add(new Tuple8<>((short)6, 6, 10L, 0.1f, 0.0000000000023, 
"", true, new DoubleValue(100.0)));
+               data.add(new Tuple8<>((short)7, 7, 1L, 0.2f, 
Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0)));
+               data.add(new Tuple8<>((short)8, 8, -100L, 0.001f, Double.NaN, 
"abcdefghi", true, new DoubleValue(100.0)));
+
+               Collections.shuffle(data);
+
+               DataSet<Tuple8<Short, Integer, Long, Float, Double, String, 
Boolean, DoubleValue>> ds = env.fromCollection(data);
+
+               // call method under test
+               Tuple results = DataSetUtils.summarize(ds);
+
+               Assert.assertEquals(8, results.getArity());
+
+               NumericColumnSummary<Short> col0Summary = results.getField(0);
+               Assert.assertEquals(8, col0Summary.getNonMissingCount());
+               Assert.assertEquals(1, col0Summary.getMin().shortValue());
+               Assert.assertEquals(10, col0Summary.getMax().shortValue());
+               Assert.assertEquals(5.375, col0Summary.getMean().doubleValue(), 
0.0);
+
+               NumericColumnSummary<Integer> col1Summary = results.getField(1);
+               Assert.assertEquals(1, col1Summary.getMin().intValue());
+               Assert.assertEquals(10, col1Summary.getMax().intValue());
+               Assert.assertEquals(5.375, col1Summary.getMean().doubleValue(), 
0.0);
+
+               NumericColumnSummary<Long> col2Summary = results.getField(2);
+               Assert.assertEquals(-100L, col2Summary.getMin().longValue());
+               Assert.assertEquals(10000L, col2Summary.getMax().longValue());
+
+               NumericColumnSummary<Float> col3Summary = results.getField(3);
+               Assert.assertEquals(8, col3Summary.getTotalCount());
+               Assert.assertEquals(0.001000, 
col3Summary.getMin().doubleValue(), 0.0000001);
+               Assert.assertEquals(0.89999999, 
col3Summary.getMax().doubleValue(), 0.0000001);
+               Assert.assertEquals(0.2376249988883501, 
col3Summary.getMean().doubleValue(), 0.000000000001);
+               Assert.assertEquals(0.0768965488108089, 
col3Summary.getVariance().doubleValue(), 0.00000001);
+               Assert.assertEquals(0.27730226975415995, 
col3Summary.getStandardDeviation().doubleValue(), 0.000000000001);
+
+               NumericColumnSummary<Double> col4Summary = results.getField(4);
+               Assert.assertEquals(6, col4Summary.getNonMissingCount());
+               Assert.assertEquals(2, col4Summary.getMissingCount());
+               Assert.assertEquals(0.0000000000023, 
col4Summary.getMin().doubleValue(), 0.0);
+               Assert.assertEquals(79.5, col4Summary.getMax().doubleValue(), 
0.000000000001);
+
+               StringColumnSummary col5Summary = results.getField(5);
+               Assert.assertEquals(8, col5Summary.getTotalCount());
+               Assert.assertEquals(0, col5Summary.getNullCount());
+               Assert.assertEquals(8, col5Summary.getNonNullCount());
+               Assert.assertEquals(2, col5Summary.getEmptyCount());
+               Assert.assertEquals(0, col5Summary.getMinLength().intValue());
+               Assert.assertEquals(16, col5Summary.getMaxLength().intValue());
+               Assert.assertEquals(5.0, 
col5Summary.getMeanLength().doubleValue(), 0.0001);
+
+               BooleanColumnSummary col6Summary = results.getField(6);
+               Assert.assertEquals(8, col6Summary.getTotalCount());
+               Assert.assertEquals(2, col6Summary.getFalseCount());
+               Assert.assertEquals(6, col6Summary.getTrueCount());
+               Assert.assertEquals(0, col6Summary.getNullCount());
+
+               NumericColumnSummary<Double> col7Summary = results.getField(7);
+               Assert.assertEquals(100.0, col7Summary.getMax().doubleValue(), 
0.00001);
+               Assert.assertEquals(50.0, col7Summary.getMin().doubleValue(), 
0.00001);
+       }
 }

Reply via email to