Repository: flink
Updated Branches:
refs/heads/master b2be80ddc - 05bff2295
[streaming] Refactor the accessing of the user specified field by sum, min, etc.
SumAggregator and ComparableAggregator had some code duplication for
accessing the specified field for different kinds of types (Tuple, Pojo,
Array, simple).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05bff229
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05bff229
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05bff229
Branch: refs/heads/master
Commit: 05bff2295661e21544f03c3048c44a09506d986e
Parents: b2be80d
Author: Gabor Gevay gga...@gmail.com
Authored: Sun May 24 13:14:11 2015 +0200
Committer: mbalassi mbala...@apache.org
Committed: Sat Jun 20 20:37:03 2015 +0200
--
.../api/java/typeutils/TupleTypeInfoBase.java | 4 +
.../api/scala/typeutils/CaseClassTypeInfo.scala | 2 +
.../streaming/api/datastream/DataStream.java| 103 ++--
.../api/datastream/WindowedDataStream.java | 38 ++-
.../aggregation/ComparableAggregator.java | 216 +++-
.../functions/aggregation/SumAggregator.java| 159 ++--
.../windowbuffer/SlidingGroupedPreReducer.java | 2 +-
.../windowbuffer/SlidingPreReducer.java | 2 +-
.../flink/streaming/util/FieldAccessor.java | 249 +++
.../streaming/api/AggregationFunctionTest.java | 74 +++---
.../flink/streaming/util/FieldAccessorTest.java | 75 ++
.../scala/ScalaStreamingAggregator.java | 111 -
.../flink/streaming/api/scala/DataStream.scala | 24 +-
.../api/scala/WindowedDataStream.scala | 18 +-
14 files changed, 474 insertions(+), 603 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/flink/blob/05bff229/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
--
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 5051449..3314ca9 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -69,6 +69,10 @@ public abstract class TupleTypeInfoBaseT extends
CompositeTypeT {
return true;
}
+ public boolean isCaseClass() {
+ return false;
+ }
+
@Override
public int getArity() {
return types.length;
http://git-wip-us.apache.org/repos/asf/flink/blob/05bff229/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
--
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 2c80433..0c8049d 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -228,4 +228,6 @@ abstract class CaseClassTypeInfo[T : Product](
override def toString = clazz.getName + ( + fieldNames.zip(types).map {
case (n, t) = n + : + t}
.mkString(, ) + )
+
+ override def isCaseClass = true
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05bff229/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
--
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6964a07..b065950 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,7 +44,6 @@ import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
@@ -243,7 +242,7 @@ public class DataStreamOUT {