[FLINK-3219] [java scala] Implement DataSet.count and DataSet.collect using a single operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a53587 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a53587 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a53587 Branch: refs/heads/master Commit: e9a535877906bd75df3e633e3c5dad556b9c925d Parents: 0937be0 Author: Greg Hogan <c...@greghogan.com> Authored: Mon Jan 11 17:04:31 2016 -0500 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jan 15 11:44:21 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/DataSet.java | 7 ++--- .../java/org/apache/flink/api/java/Utils.java | 33 +++++++++++++------- .../org/apache/flink/api/scala/DataSet.scala | 7 ++--- .../jsonplan/JsonJobGraphGenerationTest.java | 2 +- 4 files changed, 28 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index c5a636c..be84032 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -46,7 +46,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SelectByMaxFunction; import org.apache.flink.api.java.functions.SelectByMinFunction; import org.apache.flink.api.java.io.CsvOutputFormat; -import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter; @@ -387,8 +386,7 @@ public abstract class DataSet<T> { public long count() throws Exception { final String id = new AbstractID().toString(); - flatMap(new Utils.CountHelper<T>(id)).name("count()") - .output(new DiscardingOutputFormat<Long>()).name("count() sink"); + output(new Utils.CountHelper<T>(id)).name("count()"); JobExecutionResult res = getExecutionEnvironment().execute(); return res.<Long> getAccumulatorResult(id); @@ -405,8 +403,7 @@ public abstract class DataSet<T> { final String id = new AbstractID().toString(); final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig()); - this.flatMap(new Utils.CollectHelper<>(id, serializer)).name("collect()") - .output(new DiscardingOutputFormat<T>()).name("collect() sink"); + this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()"); JobExecutionResult res = getExecutionEnvironment().execute(); ArrayList<byte[]> accResult = res.getAccumulatorResult(id); http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 665f35f..cb10906 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -20,20 +20,19 @@ package org.apache.flink.api.java; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.List; import java.util.Random; -import org.apache.flink.api.common.functions.RichFlatMapFunction; - import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis; /** @@ -78,7 +77,7 @@ public final class Utils { } @SkipCodeAnalysis - public static class CountHelper<T> extends RichFlatMapFunction<T, Long> { + public static class CountHelper<T> extends RichOutputFormat<T> { private static final long serialVersionUID = 1L; @@ -91,18 +90,26 @@ public final class Utils { } @Override - public void flatMap(T value, Collector<Long> out) throws Exception { + public void configure(Configuration parameters) { + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + } + + @Override + public void writeRecord(T record) throws IOException { counter++; } @Override - public void close() throws Exception { + public void close() throws IOException { getRuntimeContext().getLongCounter(id).add(counter); } } @SkipCodeAnalysis - public static class CollectHelper<T> extends RichFlatMapFunction<T, T> { + public static class CollectHelper<T> extends RichOutputFormat<T> { private static final long serialVersionUID = 1L; @@ -117,17 +124,21 @@ public final class Utils { } @Override - public void open(Configuration parameters) throws Exception { + public void configure(Configuration parameters) { + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { this.accumulator = new SerializedListAccumulator<>(); } @Override - public void flatMap(T value, Collector<T> out) throws Exception { - accumulator.add(value, serializer); + public void writeRecord(T record) throws IOException { + accumulator.add(record, serializer); } @Override - public void close() throws Exception { + public void close() throws IOException { // Important: should only be added in close method to minimize traffic of accumulators getRuntimeContext().addAccumulator(id, accumulator); } http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index a3ce53c..396ee90 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.Utils.CountHelper import org.apache.flink.api.java.aggregation.Aggregations import org.apache.flink.api.java.functions.{FirstReducer, KeySelector} -import org.apache.flink.api.java.io.{DiscardingOutputFormat, PrintingOutputFormat, TextOutputFormat} +import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat} import org.apache.flink.api.java.operators.Keys.ExpressionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.operators.join.JoinType @@ -521,7 +521,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { @throws(classOf[Exception]) def count(): Long = { val id = new AbstractID().toString - javaSet.flatMap(new CountHelper[T](id)).output(new DiscardingOutputFormat[java.lang.Long]) + javaSet.output(new CountHelper[T](id)) val res = getExecutionEnvironment.execute() res.getAccumulatorResult[Long](id) } @@ -539,8 +539,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { val id = new AbstractID().toString val serializer = getType().createSerializer(getExecutionEnvironment.getConfig) - javaSet.flatMap(new Utils.CollectHelper[T](id, serializer)) - .output(new DiscardingOutputFormat[T]) + javaSet.output(new Utils.CollectHelper[T](id, serializer)) val res = getExecutionEnvironment.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/e9a53587/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java index a862242..a9ade6a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java @@ -183,7 +183,7 @@ public class JsonJobGraphGenerationTest { // without arguments try { final int parallelism = 1; // some ops have DOP 1 forced - JsonValidator validator = new GenericValidator(parallelism, 10); + JsonValidator validator = new GenericValidator(parallelism, 9); TestingExecutionEnvironment.setAsNext(validator, parallelism); ConnectedComponents.main();