buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/238 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: lares_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot failure in on flink-docs-release-1.0
The Buildbot has detected a new failure on builder flink-docs-release-1.0 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.0/builds/113 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: tethys_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.0' triggered this build Build Source Stamp: [branch release-1.0] HEAD Blamelist: BUILD FAILED: failed compile Sincerely, -The Buildbot
buildbot failure in on flink-docs-master
The Buildbot has detected a new failure on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/361 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: tethys_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: BUILD FAILED: failed compile Sincerely, -The Buildbot
flink git commit: [FLINK-3323] [docs] Add documentation for NiFi connector.
Repository: flink Updated Branches: refs/heads/master d353895ba -> 8227b0f75 [FLINK-3323] [docs] Add documentation for NiFi connector. This closes #2099 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8227b0f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8227b0f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8227b0f7 Branch: refs/heads/master Commit: 8227b0f7555b47e7eef268a81962908660212c3f Parents: d353895 Author: smarthiAuthored: Mon May 30 09:04:25 2016 -0400 Committer: Fabian Hueske Committed: Tue Jun 14 23:49:23 2016 +0200 -- docs/apis/streaming/connectors/elasticsearch.md | 2 +- .../apis/streaming/connectors/elasticsearch2.md | 2 +- docs/apis/streaming/connectors/index.md | 3 +- docs/apis/streaming/connectors/nifi.md | 128 +++ 4 files changed, 132 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/elasticsearch.md -- diff --git a/docs/apis/streaming/connectors/elasticsearch.md b/docs/apis/streaming/connectors/elasticsearch.md index cf996fb..93b2bf6 100644 --- a/docs/apis/streaming/connectors/elasticsearch.md +++ b/docs/apis/streaming/connectors/elasticsearch.md @@ -180,4 +180,4 @@ text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[S The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a `TransportClient`. -More about information about Elasticsearch can be found [here](https://elastic.co). +More information about Elasticsearch can be found [here](https://elastic.co). http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/elasticsearch2.md -- diff --git a/docs/apis/streaming/connectors/elasticsearch2.md b/docs/apis/streaming/connectors/elasticsearch2.md index 7146285..36d0920 100644 --- a/docs/apis/streaming/connectors/elasticsearch2.md +++ b/docs/apis/streaming/connectors/elasticsearch2.md @@ -141,4 +141,4 @@ This will buffer elements and Action Requests before sending to the cluster. The This now provides a list of Elasticsearch Nodes to which the sink should connect via a `TransportClient`. -More about information about Elasticsearch can be found [here](https://elastic.co). +More information about Elasticsearch can be found [here](https://elastic.co). http://git-wip-us.apache.org/repos/asf/flink/blob/8227b0f7/docs/apis/streaming/connectors/index.md -- diff --git a/docs/apis/streaming/connectors/index.md b/docs/apis/streaming/connectors/index.md index 20d6e41..85a07a1 100644 --- a/docs/apis/streaming/connectors/index.md +++ b/docs/apis/streaming/connectors/index.md @@ -4,7 +4,7 @@ title: "Streaming Connectors" # Sub-level navigation sub-nav-group: streaming sub-nav-id: connectors -sub-nav-pos: 6 +sub-nav-pos: 8 sub-nav-title: Connectors --- + +This connector provides a Source and Sink that can read from and write to +[Apache NiFi](https://nifi.apache.org/). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-nifi{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) +for information about how to package the program with the libraries for +cluster execution. + + Installing Apache NiFi + +Instructions for setting up a Apache NiFi cluster can be found +[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi). + + Apache NiFi Source + +The connector provides a Source for reading data from Apache NiFi to Apache Flink. + +The class `NiFiSource(â¦)` provides 2 constructors for reading data from NiFi. + +- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(â¦)` given the client's SiteToSiteConfig and a + default wait time of 1000 ms. + +- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(â¦)` given the client's + SiteToSiteConfig and the specified wait time (in milliseconds). + +Example: + + + +{% highlight java %} +SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() +.url("http://localhost:8080/nifi;) +.portName("Data for Flink") +.requestBatchCount(5) +.buildConfig(); + +SourceFunction
[4/4] flink git commit: [FLINK-3896] Allow a StreamTask to be Externally Cancelled
[FLINK-3896] Allow a StreamTask to be Externally Cancelled It adds a method failExternally() to the StreamTask, so that custom Operators can make their containing task fail when needed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c Branch: refs/heads/master Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85 Parents: fdf4360 Author: kl0uAuthored: Tue May 10 18:56:58 2016 +0200 Committer: Aljoscha Krettek Committed: Tue Jun 14 18:11:22 2016 +0200 -- .../apache/flink/runtime/execution/Environment.java | 11 +++ .../runtime/taskmanager/RuntimeEnvironment.java | 9 + .../org/apache/flink/runtime/taskmanager/Task.java | 4 ++-- .../operators/testutils/DummyEnvironment.java| 5 + .../runtime/operators/testutils/MockEnvironment.java | 5 + .../flink/streaming/runtime/tasks/StreamTask.java| 15 ++- .../runtime/tasks/StreamMockEnvironment.java | 5 + 7 files changed, 51 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 121936c..9f779ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -166,6 +166,17 @@ public interface Environment { */ void acknowledgeCheckpoint(long checkpointId, StateHandle state); + /** +* Marks task execution failed for an external reason (a reason other than the task code itself +* throwing an exception). If the task is already in a terminal state +* (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. +* Otherwise it sets the state to FAILED, and, if the invokable code is running, +* starts an asynchronous thread that aborts that code. +* +* This method never blocks. +*/ + void failExternally(Throwable cause); + // // Fields relevant to the I/O system. Should go into Task // http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 1f93a0d..80c5fbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment { private final TaskManagerRuntimeInfo taskManagerInfo; private final TaskMetricGroup metrics; + private final Task containingTask; + // public RuntimeEnvironment( @@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment { InputGate[] inputGates, ActorGateway jobManager, TaskManagerRuntimeInfo taskManagerInfo, + Task containingTask, TaskMetricGroup metrics) { this.jobId = checkNotNull(jobId); @@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment { this.inputGates = checkNotNull(inputGates); this.jobManager = checkNotNull(jobManager); this.taskManagerInfo = checkNotNull(taskManagerInfo); + this.containingTask = containingTask; this.metrics = metrics; } @@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment { jobManager.tell(message); } + + @Override + public void failExternally(Throwable cause) { + this.containingTask.failExternally(cause); + } }
[3/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent
[FLINK-2314] Make Streaming File Sources Persistent This commit is a combination of several commits/changes. It combines changes to the file input formats and the streaming file read operator and integrates them into the API. These are the messages of the other two commits: [FLINK-3717] Make FileInputFormat checkpointable This adds a new interface called CheckpointableInputFormat which describes input formats whose state is queryable, i.e. getCurrentState() returns where the reader is in the underlying source, and they can resume reading from a user-specified position. This functionality is not yet leveraged by current readers. [FLINK-3889] Refactor File Monitoring Source This is meant to replace the different file reading sources in Flink streaming. Now there is one monitoring source with DOP 1 monitoring a directory and assigning input split to downstream readers. In addition, it makes the new features added by FLINK-3717 work together with the aforementioned entities (the monitor and the readers) in order to have fault tolerant file sources and exactly once guarantees. This does not replace the old API calls. This will be done in a future commit. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353895b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353895b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353895b Branch: refs/heads/master Commit: d353895ba512a5e30fb08a25643fd93f085e8456 Parents: bc19486 Author: kl0uAuthored: Sun Apr 10 16:56:42 2016 +0200 Committer: Aljoscha Krettek Committed: Tue Jun 14 18:11:22 2016 +0200 -- .../flink/api/java/io/AvroInputFormat.java | 85 +++- .../io/avro/AvroSplittableInputFormatTest.java | 95 +++- .../flink/api/common/io/BinaryInputFormat.java | 121 +++-- .../apache/flink/api/common/io/BlockInfo.java | 5 + .../common/io/CheckpointableInputFormat.java| 61 +++ .../api/common/io/DelimitedInputFormat.java | 114 +++-- .../api/common/io/EnumerateNestedFilesTest.java | 2 +- .../api/common/io/FileInputFormatTest.java | 9 +- .../api/common/io/SequentialFormatTestBase.java | 15 +- flink-fs-tests/pom.xml | 11 +- .../ContinuousFileMonitoringFunctionITCase.java | 300 + .../hdfstests/ContinuousFileMonitoringTest.java | 447 +++ .../flink/api/java/io/CsvInputFormatTest.java | 134 +- .../runtime/taskmanager/RuntimeEnvironment.java | 4 +- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../api/datastream/DataStreamSource.java| 5 + .../environment/StreamExecutionEnvironment.java | 256 +-- .../ContinuousFileMonitoringFunction.java | 328 ++ .../source/ContinuousFileReaderOperator.java| 390 .../source/FileMonitoringFunction.java | 3 +- .../api/functions/source/FilePathFilter.java| 66 +++ .../functions/source/FileProcessingMode.java| 31 ++ .../api/functions/source/FileReadFunction.java | 3 +- .../functions/source/FileSourceFunction.java| 148 -- .../api/functions/source/InputFormatSource.java | 148 ++ .../api/graph/StreamGraphGenerator.java | 6 +- .../api/operators/OutputTypeConfigurable.java | 2 +- .../streaming/api/operators/StreamOperator.java | 2 +- .../util/OneInputStreamOperatorTestHarness.java | 4 + .../api/scala/StreamExecutionEnvironment.scala | 74 ++- ...ontinuousFileProcessingCheckpointITCase.java | 327 ++ .../StreamFaultToleranceTestBase.java | 8 +- 32 files changed, 2889 insertions(+), 317 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java -- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java index 605ce69..a920275 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java @@ -22,13 +22,15 @@ package org.apache.flink.api.java.io; import java.io.IOException; import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.reflect.ReflectDatumReader; import
[1/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent
Repository: flink Updated Branches: refs/heads/master fdf436099 -> d353895ba http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 530bae9..67c05e5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -23,8 +23,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -89,7 +91,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { * followed by the checks in {@link #postSubmit}. */ @Test - public void runCheckpointedProgram() { + public void runCheckpointedProgram() throws Exception { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", cluster.getLeaderRPCPort()); @@ -99,13 +101,13 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { testProgram(env); - env.execute(); + TestUtils.tryExecute(env, "Fault Tolerance Test"); postSubmit(); } catch (Exception e) { e.printStackTrace(); - fail(e.getMessage()); + Assert.fail(e.getMessage()); } }
[2/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent
http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index ae4758f..1cd052c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; +import org.apache.flink.streaming.api.functions.source.FilePathFilter; import org.apache.flink.streaming.api.functions.source.FileReadFunction; -import org.apache.flink.streaming.api.functions.source.FileSourceFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.InputFormatSource; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.FromIteratorFunction; import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; @@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment { } /** -* Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be -* read with the system's default character set. +* Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such +* line. The file will be read with the system's default character set. +* +* +* NOTES ON CHECKPOINTING: The source monitors the path, creates the +* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, +* forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, +* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint +* barriers are going to be forwarded after the source exits, thus having no checkpoints after that point. * * @param filePath * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). * @return The data stream that represents the data read from the given file as text lines */ public DataStreamSource readTextFile(String filePath) { - Preconditions.checkNotNull(filePath, "The file path may not be null."); - TextInputFormat format = new TextInputFormat(new Path(filePath)); - TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - - return createInput(format, typeInfo, "Read Text File Source"); + return readTextFile(filePath, "UTF-8"); } /** -* Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link -* java.nio.charset.Charset} with the given name will be used to read the files. +* Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such +* line. The {@link java.nio.charset.Charset} with the given name will be used to read the files. +* +* +* NOTES ON CHECKPOINTING: The source monitors the path, creates the +* {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, +* forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, +* and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
[2/2] flink git commit: [FLINK-3971] [tableAPI] Fix handling of null values in aggregations.
[FLINK-3971] [tableAPI] Fix handling of null values in aggregations. This closes #2049 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf43609 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf43609 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf43609 Branch: refs/heads/master Commit: fdf43609977c807deb3bb81bf1095efb721fe688 Parents: e0b9e8d Author: gallenvaraAuthored: Mon May 30 18:30:07 2016 +0800 Committer: Fabian Hueske Committed: Tue Jun 14 15:05:32 2016 +0200 -- .../table/runtime/aggregate/AvgAggregate.scala | 60 .../table/runtime/aggregate/MaxAggregate.scala | 42 ++ .../table/runtime/aggregate/MinAggregate.scala | 44 +++--- .../table/runtime/aggregate/SumAggregate.scala | 12 +++- .../runtime/aggregate/AvgAggregateTest.scala| 11 +++- .../runtime/aggregate/CountAggregateTest.scala | 5 +- .../runtime/aggregate/MaxAggregateTest.scala| 25 +++- .../runtime/aggregate/MinAggregateTest.scala| 25 +++- .../runtime/aggregate/SumAggregateTest.scala| 14 - 9 files changed, 165 insertions(+), 73 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala index 8cf181a..e724648 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala @@ -59,11 +59,17 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] { buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } + override def evaluate(buffer : Row): T = { +doEvaluate(buffer).asInstanceOf[T] + } + override def intermediateDataType = Array( BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO) def doPrepare(value: Any, partial: Row): Unit + + def doEvaluate(buffer: Row): Any } class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { @@ -73,10 +79,14 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Byte = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] -(bufferSum / bufferCount).toByte +if (bufferCount == 0L) { + null +} else { + (bufferSum / bufferCount).toByte +} } } @@ -88,10 +98,14 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Short = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] -(bufferSum / bufferCount).toShort +if (bufferCount == 0L) { + null +} else { + (bufferSum / bufferCount).toShort +} } } @@ -103,10 +117,14 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] { partial.setField(partialCountIndex, 1L) } - override def evaluate(buffer: Row): Int = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] -(bufferSum / bufferCount).toInt +if (bufferCount == 0L) { + null +} else { + (bufferSum / bufferCount).toInt +} } } @@ -145,10 +163,14 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] { buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount)) } - override def evaluate(buffer: Row): Long = { + override def doEvaluate(buffer: Row): Any = { val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger] val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long] -bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue() +if (bufferCount == 0L) { + null +} else { + bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue() +} } } @@ -178,11 +200,17 @@ abstract class