buildbot success in on flink-docs-release-0.10

2016-06-14 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/238

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: lares_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-release-1.0

2016-06-14 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.0 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.0/builds/113

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: tethys_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.0' 
triggered this build
Build Source Stamp: [branch release-1.0] HEAD
Blamelist: 

BUILD FAILED: failed compile

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-master

2016-06-14 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/361

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: tethys_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

BUILD FAILED: failed compile

Sincerely,
 -The Buildbot





flink git commit: [FLINK-3323] [docs] Add documentation for NiFi connector.

2016-06-14 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master d353895ba -> 8227b0f75


[FLINK-3323] [docs] Add documentation for NiFi connector.

This closes #2099


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8227b0f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8227b0f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8227b0f7

Branch: refs/heads/master
Commit: 8227b0f7555b47e7eef268a81962908660212c3f
Parents: d353895
Author: smarthi 
Authored: Mon May 30 09:04:25 2016 -0400
Committer: Fabian Hueske 
Committed: Tue Jun 14 23:49:23 2016 +0200

--
 docs/apis/streaming/connectors/elasticsearch.md |   2 +-
 .../apis/streaming/connectors/elasticsearch2.md |   2 +-
 docs/apis/streaming/connectors/index.md |   3 +-
 docs/apis/streaming/connectors/nifi.md  | 128 +++
 4 files changed, 132 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/elasticsearch.md
--
diff --git a/docs/apis/streaming/connectors/elasticsearch.md 
b/docs/apis/streaming/connectors/elasticsearch.md
index cf996fb..93b2bf6 100644
--- a/docs/apis/streaming/connectors/elasticsearch.md
+++ b/docs/apis/streaming/connectors/elasticsearch.md
@@ -180,4 +180,4 @@ text.addSink(new ElasticsearchSink(config, transports, new 
IndexRequestBuilder[S
 The difference is that we now need to provide a list of Elasticsearch Nodes
 to which the sink should connect using a `TransportClient`.
 
-More about information about Elasticsearch can be found 
[here](https://elastic.co).
+More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/elasticsearch2.md
--
diff --git a/docs/apis/streaming/connectors/elasticsearch2.md 
b/docs/apis/streaming/connectors/elasticsearch2.md
index 7146285..36d0920 100644
--- a/docs/apis/streaming/connectors/elasticsearch2.md
+++ b/docs/apis/streaming/connectors/elasticsearch2.md
@@ -141,4 +141,4 @@ This will buffer elements and Action Requests before 
sending to the cluster. The
 This now provides a list of Elasticsearch Nodes
 to which the sink should connect via a `TransportClient`.
 
-More about information about Elasticsearch can be found 
[here](https://elastic.co).
+More information about Elasticsearch can be found [here](https://elastic.co).

http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/index.md
--
diff --git a/docs/apis/streaming/connectors/index.md 
b/docs/apis/streaming/connectors/index.md
index 20d6e41..85a07a1 100644
--- a/docs/apis/streaming/connectors/index.md
+++ b/docs/apis/streaming/connectors/index.md
@@ -4,7 +4,7 @@ title: "Streaming Connectors"
 # Sub-level navigation
 sub-nav-group: streaming
 sub-nav-id: connectors
-sub-nav-pos: 6
+sub-nav-pos: 8
 sub-nav-title: Connectors
 ---
 
+
+This connector provides a Source and Sink that can read from and write to 
+[Apache NiFi](https://nifi.apache.org/). To use this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-nifi{{ site.scala_version_suffix }}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ Installing Apache NiFi
+
+Instructions for setting up a Apache NiFi cluster can be found
+[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi).
+
+ Apache NiFi Source
+
+The connector provides a Source for reading data from Apache NiFi to Apache 
Flink.
+
+The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi.
+
+- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given 
the client's SiteToSiteConfig and a
+ default wait time of 1000 ms.
+  
+- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a 
`NiFiSource(…)` given the client's
+ SiteToSiteConfig and the specified wait time (in milliseconds).
+ 
+Example:
+
+
+
+{% highlight java %}
+SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+.url("http://localhost:8080/nifi;)
+.portName("Data for Flink")
+.requestBatchCount(5)
+.buildConfig();
+
+SourceFunction 

[4/4] flink git commit: [FLINK-3896] Allow a StreamTask to be Externally Cancelled

2016-06-14 Thread aljoscha
[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom Operators
can make their containing task fail when needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c

Branch: refs/heads/master
Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85
Parents: fdf4360
Author: kl0u 
Authored: Tue May 10 18:56:58 2016 +0200
Committer: Aljoscha Krettek 
Committed: Tue Jun 14 18:11:22 2016 +0200

--
 .../apache/flink/runtime/execution/Environment.java  | 11 +++
 .../runtime/taskmanager/RuntimeEnvironment.java  |  9 +
 .../org/apache/flink/runtime/taskmanager/Task.java   |  4 ++--
 .../operators/testutils/DummyEnvironment.java|  5 +
 .../runtime/operators/testutils/MockEnvironment.java |  5 +
 .../flink/streaming/runtime/tasks/StreamTask.java| 15 ++-
 .../runtime/tasks/StreamMockEnvironment.java |  5 +
 7 files changed, 51 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 121936c..9f779ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -166,6 +166,17 @@ public interface Environment {
 */
void acknowledgeCheckpoint(long checkpointId, StateHandle state);
 
+   /**
+* Marks task execution failed for an external reason (a reason other 
than the task code itself
+* throwing an exception). If the task is already in a terminal state
+* (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.
+* Otherwise it sets the state to FAILED, and, if the invokable code is 
running,
+* starts an asynchronous thread that aborts that code.
+*
+* This method never blocks.
+*/
+   void failExternally(Throwable cause);
+
// 

//  Fields relevant to the I/O system. Should go into Task
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 1f93a0d..80c5fbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment {
private final TaskManagerRuntimeInfo taskManagerInfo;
private final TaskMetricGroup metrics;
 
+   private final Task containingTask;
+
// 

 
public RuntimeEnvironment(
@@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment {
InputGate[] inputGates,
ActorGateway jobManager,
TaskManagerRuntimeInfo taskManagerInfo,
+   Task containingTask,
TaskMetricGroup metrics) {
 
this.jobId = checkNotNull(jobId);
@@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment {
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
this.taskManagerInfo = checkNotNull(taskManagerInfo);
+   this.containingTask = containingTask;
this.metrics = metrics;
}
 
@@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment {
 
jobManager.tell(message);
}
+
+   @Override
+   public void failExternally(Throwable cause) {
+   this.containingTask.failExternally(cause);
+   }
 }


[3/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

2016-06-14 Thread aljoscha
[FLINK-2314] Make Streaming File Sources Persistent

This commit is a combination of several commits/changes. It combines
changes to the file input formats and the streaming file read operator
and integrates them into the API.

These are the messages of the other two commits:

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353895b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353895b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353895b

Branch: refs/heads/master
Commit: d353895ba512a5e30fb08a25643fd93f085e8456
Parents: bc19486
Author: kl0u 
Authored: Sun Apr 10 16:56:42 2016 +0200
Committer: Aljoscha Krettek 
Committed: Tue Jun 14 18:11:22 2016 +0200

--
 .../flink/api/java/io/AvroInputFormat.java  |  85 +++-
 .../io/avro/AvroSplittableInputFormatTest.java  |  95 +++-
 .../flink/api/common/io/BinaryInputFormat.java  | 121 +++--
 .../apache/flink/api/common/io/BlockInfo.java   |   5 +
 .../common/io/CheckpointableInputFormat.java|  61 +++
 .../api/common/io/DelimitedInputFormat.java | 114 +++--
 .../api/common/io/EnumerateNestedFilesTest.java |   2 +-
 .../api/common/io/FileInputFormatTest.java  |   9 +-
 .../api/common/io/SequentialFormatTestBase.java |  15 +-
 flink-fs-tests/pom.xml  |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java | 300 +
 .../hdfstests/ContinuousFileMonitoringTest.java | 447 +++
 .../flink/api/java/io/CsvInputFormatTest.java   | 134 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../api/datastream/DataStreamSource.java|   5 +
 .../environment/StreamExecutionEnvironment.java | 256 +--
 .../ContinuousFileMonitoringFunction.java   | 328 ++
 .../source/ContinuousFileReaderOperator.java| 390 
 .../source/FileMonitoringFunction.java  |   3 +-
 .../api/functions/source/FilePathFilter.java|  66 +++
 .../functions/source/FileProcessingMode.java|  31 ++
 .../api/functions/source/FileReadFunction.java  |   3 +-
 .../functions/source/FileSourceFunction.java| 148 --
 .../api/functions/source/InputFormatSource.java | 148 ++
 .../api/graph/StreamGraphGenerator.java |   6 +-
 .../api/operators/OutputTypeConfigurable.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +
 .../api/scala/StreamExecutionEnvironment.scala  |  74 ++-
 ...ontinuousFileProcessingCheckpointITCase.java | 327 ++
 .../StreamFaultToleranceTestBase.java   |   8 +-
 32 files changed, 2889 insertions(+), 317 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
--
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 605ce69..a920275 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -22,13 +22,15 @@ package org.apache.flink.api.java.io;
 import java.io.IOException;
 
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import 

[1/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

2016-06-14 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master fdf436099 -> d353895ba


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 530bae9..67c05e5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -23,8 +23,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -89,7 +91,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
 * followed by the checks in {@link #postSubmit}.
 */
@Test
-   public void runCheckpointedProgram() {
+   public void runCheckpointedProgram() throws Exception {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", 
cluster.getLeaderRPCPort());
@@ -99,13 +101,13 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
 
testProgram(env);
 
-   env.execute();
+   TestUtils.tryExecute(env, "Fault Tolerance Test");
 
postSubmit();
}
catch (Exception e) {
e.printStackTrace();
-   fail(e.getMessage());
+   Assert.fail(e.getMessage());
}
}
 



[2/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

2016-06-14 Thread aljoscha
http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ae4758f..1cd052c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import 
org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment {
}
 
/**
-* Creates a data stream that represents the Strings produced by 
reading the given file line wise. The file will be
-* read with the system's default character set.
+* Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
+* line. The file will be read with the system's default character set.
+*
+* 
+*  NOTES ON CHECKPOINTING:  The source monitors the path, 
creates the
+* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
+* forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
+* and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint
+* barriers are going to be forwarded after the source exits, thus 
having no checkpoints after that point.
 *
 * @param filePath
 *  The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
 * @return The data stream that represents the data read from the given 
file as text lines
 */
public DataStreamSource readTextFile(String filePath) {
-   Preconditions.checkNotNull(filePath, "The file path may not be 
null.");
-   TextInputFormat format = new TextInputFormat(new 
Path(filePath));
-   TypeInformation typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
-
-   return createInput(format, typeInfo, "Read Text File Source");
+   return readTextFile(filePath, "UTF-8");
}
 
/**
-* Creates a data stream that represents the Strings produced by 
reading the given file line wise. The {@link
-* java.nio.charset.Charset} with the given name will be used to read 
the files.
+* Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
+* line. The {@link java.nio.charset.Charset} with the given name will 
be used to read the files.
+*
+* 
+*  NOTES ON CHECKPOINTING:  The source monitors the path, 
creates the
+* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
+* forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
+* and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint

[2/2] flink git commit: [FLINK-3971] [tableAPI] Fix handling of null values in aggregations.

2016-06-14 Thread fhueske
[FLINK-3971] [tableAPI] Fix handling of null values in aggregations.

This closes #2049


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf43609
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf43609
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf43609

Branch: refs/heads/master
Commit: fdf43609977c807deb3bb81bf1095efb721fe688
Parents: e0b9e8d
Author: gallenvara 
Authored: Mon May 30 18:30:07 2016 +0800
Committer: Fabian Hueske 
Committed: Tue Jun 14 15:05:32 2016 +0200

--
 .../table/runtime/aggregate/AvgAggregate.scala  | 60 
 .../table/runtime/aggregate/MaxAggregate.scala  | 42 ++
 .../table/runtime/aggregate/MinAggregate.scala  | 44 +++---
 .../table/runtime/aggregate/SumAggregate.scala  | 12 +++-
 .../runtime/aggregate/AvgAggregateTest.scala| 11 +++-
 .../runtime/aggregate/CountAggregateTest.scala  |  5 +-
 .../runtime/aggregate/MaxAggregateTest.scala| 25 +++-
 .../runtime/aggregate/MinAggregateTest.scala| 25 +++-
 .../runtime/aggregate/SumAggregateTest.scala| 14 -
 9 files changed, 165 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index 8cf181a..e724648 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -59,11 +59,17 @@ abstract class IntegralAvgAggregate[T] extends 
AvgAggregate[T] {
 buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, 
bufferCount))
   }
 
+  override def evaluate(buffer : Row): T = {
+doEvaluate(buffer).asInstanceOf[T]
+  }
+
   override def intermediateDataType = Array(
 BasicTypeInfo.LONG_TYPE_INFO,
 BasicTypeInfo.LONG_TYPE_INFO)
 
   def doPrepare(value: Any, partial: Row): Unit
+
+  def doEvaluate(buffer: Row): Any
 }
 
 class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
@@ -73,10 +79,14 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
 partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Byte = {
+  override def doEvaluate(buffer: Row): Any = {
 val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
 val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
-(bufferSum / bufferCount).toByte
+if (bufferCount == 0L) {
+  null
+} else {
+  (bufferSum / bufferCount).toByte
+}
   }
 }
 
@@ -88,10 +98,14 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] 
{
 partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Short = {
+  override def doEvaluate(buffer: Row): Any = {
 val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
 val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
-(bufferSum / bufferCount).toShort
+if (bufferCount == 0L) {
+  null
+} else {
+  (bufferSum / bufferCount).toShort
+}
   }
 }
 
@@ -103,10 +117,14 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
 partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Int = {
+  override def doEvaluate(buffer: Row): Any = {
 val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
 val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
-(bufferSum / bufferCount).toInt
+if (bufferCount == 0L) {
+  null
+} else {
+  (bufferSum / bufferCount).toInt
+}
   }
 }
 
@@ -145,10 +163,14 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] 
{
 buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, 
bufferCount))
   }
 
-  override def evaluate(buffer: Row): Long = {
+  override def doEvaluate(buffer: Row): Any = {
 val bufferSum = 
buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
 val bufferCount = 
buffer.productElement(partialCountIndex).asInstanceOf[Long]
-bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+if (bufferCount == 0L) {
+  null
+} else {
+  bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+}
   }
 }
 
@@ -178,11 +200,17 @@ abstract class