Repository: flink Updated Branches: refs/heads/master fabc5f96e -> 7eb58773e
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java new file mode 100644 index 0000000..5036123 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.junit.Assert; +import org.junit.Test; + +public class CompensatedSumTest { + + /** + * When adding a series of numbers the order of the numbers should not impact the results. + * + * This test shows that a naive summation comes up with a different result than Kahan + * Summation when you start with either a smaller or larger number in some cases and + * helps prove our Kahan Summation is working. + */ + @Test + public void testAdd1() throws Exception { + final CompensatedSum smallSum = new CompensatedSum(0.001, 0.0); + final CompensatedSum largeSum = new CompensatedSum(1000, 0.0); + + CompensatedSum compensatedResult1 = smallSum; + CompensatedSum compensatedResult2 = largeSum; + double naiveResult1 = smallSum.value(); + double naiveResult2 = largeSum.value(); + + for(int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(smallSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += smallSum.value(); + naiveResult2 += smallSum.value(); + } + + compensatedResult1 = compensatedResult1.add(largeSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += largeSum.value(); + naiveResult2 += smallSum.value(); + + // Kahan summation gave the same result no matter what order we added + Assert.assertEquals(1000.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(1000.011, compensatedResult2.value(), 0.0); + + // naive addition gave a small floating point error + Assert.assertEquals(1000.011, naiveResult1, 0.0); + Assert.assertEquals(1000.0109999999997, naiveResult2, 0.0); + + Assert.assertEquals(compensatedResult1.value(), compensatedResult2.value(), 0.0); + Assert.assertEquals(naiveResult1, naiveResult2, 0.0001); + Assert.assertNotEquals(naiveResult1, naiveResult2, 0.0); + } + + @Test + public void testDelta() throws Exception { + CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 0.0); + for(int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(0.001); + } + Assert.assertEquals(0.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(new Double("8.673617379884035E-19"), compensatedResult1.delta(), 0.0); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java new file mode 100644 index 0000000..08fbe78 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class DoubleSummaryAggregatorTest { + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetXValues() throws Exception { + + final Double[] q1x = { 10.0, 8.0, 13.0, 9.0, 11.0, 14.0, 6.0, 4.0, 12.0, 7.0, 5.0 }; + final Double[] q4x = { 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 19.0, 8.0, 8.0, 8.0 }; + + NumericColumnSummary<Double> q1 = summarize(q1x); + NumericColumnSummary<Double> q4 = summarize(q4x); + + Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0); + Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0); + + Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d); + Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d); + + double stddev = Math.sqrt(11.0); + Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d); + Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d); + } + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetYValues() throws Exception { + final Double[] q1y = { 8.04, 6.95, 7.58, 8.81, 8.33, 9.96, 7.24, 4.26, 10.84, 4.82, 5.68 }; + final Double[] q2y = { 9.14, 8.14, 8.74, 8.77, 9.26, 8.1, 6.13, 3.1, 9.13, 7.26, 4.74 }; + final Double[] q3y = { 7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73 }; + final Double[] q4y = { 6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 5.25, 12.5, 5.56, 7.91, 6.89 }; + + NumericColumnSummary<Double> q1 = summarize(q1y); + NumericColumnSummary<Double> q2 = summarize(q2y); + NumericColumnSummary<Double> q3 = summarize(q3y); + NumericColumnSummary<Double> q4 = summarize(q4y); + + // the y values are have less precisely matching means and variances + + Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001); + + Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01); + } + + @Test + public void testIsNan() throws Exception { + DoubleSummaryAggregator ag = new DoubleSummaryAggregator(); + Assert.assertFalse(ag.isNan(-1.0)); + Assert.assertFalse(ag.isNan(0.0)); + Assert.assertFalse(ag.isNan(23.0)); + Assert.assertFalse(ag.isNan(Double.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Double.MIN_VALUE)); + Assert.assertTrue(ag.isNan(Double.NaN)); + } + + @Test + public void testIsInfinite() throws Exception { + DoubleSummaryAggregator ag = new DoubleSummaryAggregator(); + Assert.assertFalse(ag.isInfinite(-1.0)); + Assert.assertFalse(ag.isInfinite(0.0)); + Assert.assertFalse(ag.isInfinite(23.0)); + Assert.assertFalse(ag.isInfinite(Double.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Double.MIN_VALUE)); + Assert.assertTrue(ag.isInfinite(Double.POSITIVE_INFINITY)); + Assert.assertTrue(ag.isInfinite(Double.NEGATIVE_INFINITY)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0.0, 100.0).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0.0, 0.0, 100.0).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0.0, 0.0, 100.0, 100.0).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0.0, 100.0, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100.0, summarize(0.0, 100.0).getSum().doubleValue(), 0.0); + Assert.assertEquals(15, summarize(1.0, 2.0, 3.0, 4.0, 5.0).getSum().doubleValue(), 0.0); + Assert.assertEquals(0, summarize(-100.0, 0.0, 100.0, null).getSum().doubleValue(), 0.0); + Assert.assertEquals(90, summarize(-10.0, 100.0, null).getSum().doubleValue(), 0.0); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001.0, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMax().doubleValue(), 0.0); + Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0); + Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMin().doubleValue(), 0.0); + Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0); + Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0); + Assert.assertNull(summarize().getMin()); + } + + @Test + public void testCounts() throws Exception { + NumericColumnSummary<Double> summary = summarize(Double.NaN, 1.0, null, 123.0, -44.00001, Double.POSITIVE_INFINITY, 55.0, Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY, null, Double.NaN); + Assert.assertEquals(11, summary.getTotalCount()); + Assert.assertEquals(2, summary.getNullCount()); + Assert.assertEquals(9, summary.getNonNullCount()); + Assert.assertEquals(7, summary.getMissingCount()); + Assert.assertEquals(4, summary.getNonMissingCount()); + Assert.assertEquals(2, summary.getNanCount()); + Assert.assertEquals(3, summary.getInfinityCount()); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected NumericColumnSummary<Double> summarize(Double... values) { + return new AggregateCombineHarness<Double,NumericColumnSummary<Double>,DoubleSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Double> result1, NumericColumnSummary<Double> result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(values); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java new file mode 100644 index 0000000..a30d6aa --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.DoubleValue; +import org.junit.Assert; + +public class DoubleValueSummaryAggregatorTest extends DoubleSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected NumericColumnSummary<Double> summarize(Double... values) { + + DoubleValue[] doubleValues = new DoubleValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + doubleValues[i] = new DoubleValue(values[i]); + } + } + + return new AggregateCombineHarness<DoubleValue,NumericColumnSummary<Double>,ValueSummaryAggregator.DoubleValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Double> result1, NumericColumnSummary<Double> result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(doubleValues); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java new file mode 100644 index 0000000..c761fc2 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0f (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0f + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class FloatSummaryAggregatorTest { + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetXValues() throws Exception { + + final Float[] q1x = { 10.0f, 8.0f, 13.0f, 9.0f, 11.0f, 14.0f, 6.0f, 4.0f, 12.0f, 7.0f, 5.0f }; + final Float[] q4x = { 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 19.0f, 8.0f, 8.0f, 8.0f }; + + NumericColumnSummary<Float> q1 = summarize(q1x); + NumericColumnSummary<Float> q4 = summarize(q4x); + + Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0f); + Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0f); + + Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d); + Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d); + + double stddev = Math.sqrt(11.0f); + Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d); + Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d); + } + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetYValues() throws Exception { + final Float[] q1y = { 8.04f, 6.95f, 7.58f, 8.81f, 8.33f, 9.96f, 7.24f, 4.26f, 10.84f, 4.82f, 5.68f }; + final Float[] q2y = { 9.14f, 8.14f, 8.74f, 8.77f, 9.26f, 8.1f, 6.13f, 3.1f, 9.13f, 7.26f, 4.74f }; + final Float[] q3y = { 7.46f, 6.77f, 12.74f, 7.11f, 7.81f, 8.84f, 6.08f, 5.39f, 8.15f, 6.42f, 5.73f }; + final Float[] q4y = { 6.58f, 5.76f, 7.71f, 8.84f, 8.47f, 7.04f, 5.25f, 12.5f, 5.56f, 7.91f, 6.89f }; + + NumericColumnSummary<Float> q1 = summarize(q1y); + NumericColumnSummary<Float> q2 = summarize(q2y); + NumericColumnSummary<Float> q3 = summarize(q3y); + NumericColumnSummary<Float> q4 = summarize(q4y); + + // the y values are have less precisely matching means and variances + + Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001); + + Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01); + } + + @Test + public void testIsNan() throws Exception { + FloatSummaryAggregator ag = new FloatSummaryAggregator(); + Assert.assertFalse(ag.isNan(-1.0f)); + Assert.assertFalse(ag.isNan(0.0f)); + Assert.assertFalse(ag.isNan(23.0f)); + Assert.assertFalse(ag.isNan(Float.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Float.MIN_VALUE)); + Assert.assertTrue(ag.isNan(Float.NaN)); + } + + @Test + public void testIsInfinite() throws Exception { + FloatSummaryAggregator ag = new FloatSummaryAggregator(); + Assert.assertFalse(ag.isInfinite(-1.0f)); + Assert.assertFalse(ag.isInfinite(0.0f)); + Assert.assertFalse(ag.isInfinite(23.0f)); + Assert.assertFalse(ag.isInfinite(Float.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Float.MIN_VALUE)); + Assert.assertTrue(ag.isInfinite(Float.POSITIVE_INFINITY)); + Assert.assertTrue(ag.isInfinite(Float.NEGATIVE_INFINITY)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0.0f, 100.0f).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0.0f, 0.0f, 100.0f).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0.0f, 0.0f, 100.0f, 100.0f).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0.0f, 100.0f, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100.0, summarize(0.0f, 100.0f).getSum().floatValue(), 0.0f); + Assert.assertEquals(15, summarize(1.0f, 2.0f, 3.0f, 4.0f, 5.0f).getSum().floatValue(), 0.0f); + Assert.assertEquals(0, summarize(-100.0f, 0.0f, 100.0f, null).getSum().floatValue(), 0.0f); + Assert.assertEquals(90, summarize(-10.0f, 100.0f, null).getSum().floatValue(), 0.0f); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001.0f, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMax().floatValue(), 0.0f); + Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f); + Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMin().floatValue(), 0.0f); + Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f); + Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating + * and combining a bunch of different ways. + */ + protected NumericColumnSummary<Float> summarize(Float... values) { + + return new AggregateCombineHarness<Float,NumericColumnSummary<Float>,FloatSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(values); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java new file mode 100644 index 0000000..ff87946 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.FloatValue; +import org.junit.Assert; + +public class FloatValueSummaryAggregatorTest extends FloatSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating + * and combining a bunch of different ways. + */ + @Override + protected NumericColumnSummary<Float> summarize(Float... values) { + + FloatValue[] floatValues = new FloatValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + floatValues[i] = new FloatValue(values[i]); + } + } + + return new AggregateCombineHarness<FloatValue,NumericColumnSummary<Float>,ValueSummaryAggregator.FloatValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Float> result1, NumericColumnSummary<Float> result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-10d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-10d); + } + + }.summarize(floatValues); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java new file mode 100644 index 0000000..110d2cc --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class IntegerSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + IntegerSummaryAggregator ag = new IntegerSummaryAggregator(); + // always false for Integer + Assert.assertFalse(ag.isNan(-1)); + Assert.assertFalse(ag.isNan(0)); + Assert.assertFalse(ag.isNan(23)); + Assert.assertFalse(ag.isNan(Integer.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Integer.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + IntegerSummaryAggregator ag = new IntegerSummaryAggregator(); + // always false for Integer + Assert.assertFalse(ag.isInfinite(-1)); + Assert.assertFalse(ag.isInfinite(0)); + Assert.assertFalse(ag.isInfinite(23)); + Assert.assertFalse(ag.isInfinite(Integer.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Integer.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100, summarize(0, 100).getSum().intValue()); + Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().intValue()); + Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().intValue()); + Assert.assertEquals(90, summarize(-10, 100, null).getSum().intValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().intValue()); + Assert.assertEquals(0, summarize(Integer.MIN_VALUE, -1000, 0).getMax().intValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().intValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().intValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().intValue()); + Assert.assertEquals(Integer.MIN_VALUE, summarize(Integer.MIN_VALUE, -1000, 0).getMin().intValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().intValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().intValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary<Integer> summarize(Integer... values) { + + return new AggregateCombineHarness<Integer,NumericColumnSummary<Integer>,IntegerSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Integer> result1, NumericColumnSummary<Integer> result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue()); + Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue()); + Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(values); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java new file mode 100644 index 0000000..6ac5485 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.IntValue; +import org.junit.Assert; + +public class IntegerValueSummaryAggregatorTest extends IntegerSummaryAggregatorTest { + + @Override + protected NumericColumnSummary<Integer> summarize(Integer... values) { + + IntValue[] intValues = new IntValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + intValues[i] = new IntValue(values[i]); + } + } + + return new AggregateCombineHarness<IntValue,NumericColumnSummary<Integer>,ValueSummaryAggregator.IntegerValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Integer> result1, NumericColumnSummary<Integer> result2) { + + Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue()); + Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue()); + Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(intValues); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java new file mode 100644 index 0000000..1905657 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class LongSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + LongSummaryAggregator ag = new LongSummaryAggregator(); + // always false for Long + Assert.assertFalse(ag.isNan(-1L)); + Assert.assertFalse(ag.isNan(0L)); + Assert.assertFalse(ag.isNan(23L)); + Assert.assertFalse(ag.isNan(Long.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Long.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + LongSummaryAggregator ag = new LongSummaryAggregator(); + // always false for Long + Assert.assertFalse(ag.isInfinite(-1L)); + Assert.assertFalse(ag.isInfinite(0L)); + Assert.assertFalse(ag.isInfinite(23L)); + Assert.assertFalse(ag.isInfinite(Long.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Long.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0L, 100L).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0L, 0L, 100L).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0L, 0L, 100L, 100L).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0L, 100L, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100L, summarize(0L, 100L).getSum().longValue()); + Assert.assertEquals(15L, summarize(1L, 2L, 3L, 4L, 5L).getSum().longValue()); + Assert.assertEquals(0L, summarize(-100L, 0L, 100L, null).getSum().longValue()); + Assert.assertEquals(90L, summarize(-10L, 100L, null).getSum().longValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMax().longValue()); + Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMax().longValue()); + Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMax().longValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMin().longValue()); + Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMin().longValue()); + Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMin().longValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary<Long> summarize(Long... values) { + return new AggregateCombineHarness<Long,NumericColumnSummary<Long>,LongSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(), result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue()); + Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue()); + Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(values); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java new file mode 100644 index 0000000..eecda69 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.LongValue; +import org.junit.Assert; + +public class LongValueSummaryAggregatorTest extends LongSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values + */ + @Override + protected NumericColumnSummary<Long> summarize(Long... values) { + + LongValue[] longValues = new LongValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + longValues[i] = new LongValue(values[i]); + } + } + + return new AggregateCombineHarness<LongValue,NumericColumnSummary<Long>,ValueSummaryAggregator.LongValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Long> result1, NumericColumnSummary<Long> result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(), result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue()); + Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue()); + Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(longValues); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java new file mode 100644 index 0000000..ebbf627 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class ShortSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + ShortSummaryAggregator ag = new ShortSummaryAggregator(); + // always false for Short + Assert.assertFalse(ag.isNan((short) -1)); + Assert.assertFalse(ag.isNan((short) 0)); + Assert.assertFalse(ag.isNan((short) 23)); + Assert.assertFalse(ag.isNan(Short.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Short.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + ShortSummaryAggregator ag = new ShortSummaryAggregator(); + // always false for Short + Assert.assertFalse(ag.isInfinite((short) -1)); + Assert.assertFalse(ag.isInfinite((short) 0)); + Assert.assertFalse(ag.isInfinite((short) 23)); + Assert.assertFalse(ag.isInfinite(Short.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Short.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100, summarize(0, 100).getSum().shortValue()); + Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().shortValue()); + Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().shortValue()); + Assert.assertEquals(90, summarize(-10, 100, null).getSum().shortValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().shortValue()); + Assert.assertEquals(0, summarize((int)Short.MIN_VALUE, -1000, 0).getMax().shortValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().shortValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().shortValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().shortValue()); + Assert.assertEquals(Short.MIN_VALUE, summarize((int)Short.MIN_VALUE, -1000, 0).getMin().shortValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().shortValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().shortValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary<Short> summarize(Integer... values) { + + // cast everything to short here + Short[] shortValues = new Short[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + shortValues[i] = values[i].shortValue(); + } + } + + return new AggregateCombineHarness<Short,NumericColumnSummary<Short>,ShortSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue()); + Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue()); + Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(shortValues); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java new file mode 100644 index 0000000..8a8e7aa --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.ShortValue; +import org.junit.Assert; + +public class ShortValueSummaryAggregatorTest extends ShortSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary<Short> summarize(Integer... values) { + + ShortValue[] shortValues = new ShortValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + shortValues[i] = new ShortValue(values[i].shortValue()); + } + } + + return new AggregateCombineHarness<ShortValue,NumericColumnSummary<Short>,ValueSummaryAggregator.ShortValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary<Short> result1, NumericColumnSummary<Short> result2) { + + Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue()); + Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue()); + Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(shortValues); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java new file mode 100644 index 0000000..02fc125 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class StringSummaryAggregatorTest { + + @Test + public void testMixedGroup() { + StringColumnSummary summary = summarize("abc", "", null, " ", "defghi", "foo", null, null, "", " "); + Assert.assertEquals(10, summary.getTotalCount()); + Assert.assertEquals(3, summary.getNullCount()); + Assert.assertEquals(7, summary.getNonNullCount()); + Assert.assertEquals(2, summary.getEmptyCount()); + Assert.assertEquals(0, summary.getMinLength().intValue()); + Assert.assertEquals(6, summary.getMaxLength().intValue()); + Assert.assertEquals(2.142857, summary.getMeanLength().doubleValue(), 0.001); + } + + @Test + public void testAllNullStrings() { + StringColumnSummary summary = summarize(null, null, null, null); + Assert.assertEquals(4, summary.getTotalCount()); + Assert.assertEquals(4, summary.getNullCount()); + Assert.assertEquals(0, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getEmptyCount()); + Assert.assertNull(summary.getMinLength()); + Assert.assertNull(summary.getMaxLength()); + Assert.assertNull(summary.getMeanLength()); + } + + @Test + public void testAllWithValues() { + StringColumnSummary summary = summarize("cat", "hat", "dog", "frog"); + Assert.assertEquals(4, summary.getTotalCount()); + Assert.assertEquals(0, summary.getNullCount()); + Assert.assertEquals(4, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getEmptyCount()); + Assert.assertEquals(3, summary.getMinLength().intValue()); + Assert.assertEquals(4, summary.getMaxLength().intValue()); + Assert.assertEquals(3.25, summary.getMeanLength().doubleValue(), 0.0); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected StringColumnSummary summarize(String... values) { + + return new AggregateCombineHarness<String,StringColumnSummary,StringSummaryAggregator>(){ + + @Override + protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) { + Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount()); + Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength()); + Assert.assertEquals(result1.getMinLength(), result2.getMinLength()); + if (result1.getMeanLength() == null) { + Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength()); + } + else { + Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d); + } + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + } + + }.summarize(values); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java new file mode 100644 index 0000000..19bfd52 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.apache.flink.types.StringValue; +import org.junit.Assert; + +public class StringValueSummaryAggregatorTest extends StringSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + @Override + protected StringColumnSummary summarize(String... values) { + + StringValue[] stringValues = new StringValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + stringValues[i] = new StringValue(values[i]); + } + } + + return new AggregateCombineHarness<StringValue,StringColumnSummary,ValueSummaryAggregator.StringValueSummaryAggregator>(){ + + @Override + protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) { + Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount()); + Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength()); + Assert.assertEquals(result1.getMinLength(), result2.getMinLength()); + if (result1.getMeanLength() == null) { + Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength()); + } + else { + Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d); + } + + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + } + + }.summarize(stringValues); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java new file mode 100644 index 0000000..8134a90 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.types.*; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + + +public class SummaryAggregatorFactoryTest { + + @Test + public void testCreate() throws Exception { + // supported primitive types + Assert.assertEquals(StringSummaryAggregator.class, SummaryAggregatorFactory.create(String.class).getClass()); + Assert.assertEquals(ShortSummaryAggregator.class, SummaryAggregatorFactory.create(Short.class).getClass()); + Assert.assertEquals(IntegerSummaryAggregator.class, SummaryAggregatorFactory.create(Integer.class).getClass()); + Assert.assertEquals(LongSummaryAggregator.class, SummaryAggregatorFactory.create(Long.class).getClass()); + Assert.assertEquals(FloatSummaryAggregator.class, SummaryAggregatorFactory.create(Float.class).getClass()); + Assert.assertEquals(DoubleSummaryAggregator.class, SummaryAggregatorFactory.create(Double.class).getClass()); + Assert.assertEquals(BooleanSummaryAggregator.class, SummaryAggregatorFactory.create(Boolean.class).getClass()); + + // supported value types + Assert.assertEquals(ValueSummaryAggregator.StringValueSummaryAggregator.class, SummaryAggregatorFactory.create(StringValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.ShortValueSummaryAggregator.class, SummaryAggregatorFactory.create(ShortValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.IntegerValueSummaryAggregator.class, SummaryAggregatorFactory.create(IntValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.LongValueSummaryAggregator.class, SummaryAggregatorFactory.create(LongValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.FloatValueSummaryAggregator.class, SummaryAggregatorFactory.create(FloatValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.DoubleValueSummaryAggregator.class, SummaryAggregatorFactory.create(DoubleValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.BooleanValueSummaryAggregator.class, SummaryAggregatorFactory.create(BooleanValue.class).getClass()); + + // some not well supported types - these fallback to ObjectSummaryAggregator + Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(Object.class).getClass()); + Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(List.class).getClass()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7eb58773/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java index afbcb89..1409848 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java @@ -22,9 +22,15 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.summarize.BooleanColumnSummary; +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.types.DoubleValue; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -106,4 +112,76 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase { Assert.assertEquals(checksum.getCount(), 15); Assert.assertEquals(checksum.getChecksum(), 55); } + + @Test + public void testSummarize() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + List<Tuple8<Short, Integer, Long, Float, Double, String, Boolean, DoubleValue>> data = new ArrayList<>(); + data.add(new Tuple8<>((short)1, 1, 100L, 0.1f, 1.012376, "hello", false, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)2, 2, 1000L, 0.2f, 2.003453, "hello", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)4, 10, 10000L, 0.2f, 75.00005, "null", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)10, 4, 100L, 0.9f, 79.5, "", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)5, 5, 1000L, 0.2f, 10.0000001, "a", false, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)6, 6, 10L, 0.1f, 0.0000000000023, "", true, new DoubleValue(100.0))); + data.add(new Tuple8<>((short)7, 7, 1L, 0.2f, Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0))); + data.add(new Tuple8<>((short)8, 8, -100L, 0.001f, Double.NaN, "abcdefghi", true, new DoubleValue(100.0))); + + Collections.shuffle(data); + + DataSet<Tuple8<Short, Integer, Long, Float, Double, String, Boolean, DoubleValue>> ds = env.fromCollection(data); + + // call method under test + Tuple results = DataSetUtils.summarize(ds); + + Assert.assertEquals(8, results.getArity()); + + NumericColumnSummary<Short> col0Summary = results.getField(0); + Assert.assertEquals(8, col0Summary.getNonMissingCount()); + Assert.assertEquals(1, col0Summary.getMin().shortValue()); + Assert.assertEquals(10, col0Summary.getMax().shortValue()); + Assert.assertEquals(5.375, col0Summary.getMean().doubleValue(), 0.0); + + NumericColumnSummary<Integer> col1Summary = results.getField(1); + Assert.assertEquals(1, col1Summary.getMin().intValue()); + Assert.assertEquals(10, col1Summary.getMax().intValue()); + Assert.assertEquals(5.375, col1Summary.getMean().doubleValue(), 0.0); + + NumericColumnSummary<Long> col2Summary = results.getField(2); + Assert.assertEquals(-100L, col2Summary.getMin().longValue()); + Assert.assertEquals(10000L, col2Summary.getMax().longValue()); + + NumericColumnSummary<Float> col3Summary = results.getField(3); + Assert.assertEquals(8, col3Summary.getTotalCount()); + Assert.assertEquals(0.001000, col3Summary.getMin().doubleValue(), 0.0000001); + Assert.assertEquals(0.89999999, col3Summary.getMax().doubleValue(), 0.0000001); + Assert.assertEquals(0.2376249988883501, col3Summary.getMean().doubleValue(), 0.000000000001); + Assert.assertEquals(0.0768965488108089, col3Summary.getVariance().doubleValue(), 0.00000001); + Assert.assertEquals(0.27730226975415995, col3Summary.getStandardDeviation().doubleValue(), 0.000000000001); + + NumericColumnSummary<Double> col4Summary = results.getField(4); + Assert.assertEquals(6, col4Summary.getNonMissingCount()); + Assert.assertEquals(2, col4Summary.getMissingCount()); + Assert.assertEquals(0.0000000000023, col4Summary.getMin().doubleValue(), 0.0); + Assert.assertEquals(79.5, col4Summary.getMax().doubleValue(), 0.000000000001); + + StringColumnSummary col5Summary = results.getField(5); + Assert.assertEquals(8, col5Summary.getTotalCount()); + Assert.assertEquals(0, col5Summary.getNullCount()); + Assert.assertEquals(8, col5Summary.getNonNullCount()); + Assert.assertEquals(2, col5Summary.getEmptyCount()); + Assert.assertEquals(0, col5Summary.getMinLength().intValue()); + Assert.assertEquals(16, col5Summary.getMaxLength().intValue()); + Assert.assertEquals(5.0, col5Summary.getMeanLength().doubleValue(), 0.0001); + + BooleanColumnSummary col6Summary = results.getField(6); + Assert.assertEquals(8, col6Summary.getTotalCount()); + Assert.assertEquals(2, col6Summary.getFalseCount()); + Assert.assertEquals(6, col6Summary.getTrueCount()); + Assert.assertEquals(0, col6Summary.getNullCount()); + + NumericColumnSummary<Double> col7Summary = results.getField(7); + Assert.assertEquals(100.0, col7Summary.getMax().doubleValue(), 0.00001); + Assert.assertEquals(50.0, col7Summary.getMin().doubleValue(), 0.00001); + } }