[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2386


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-10-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83207030
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

The reason why we still differentiate between those is that we don't want 
to keep the latency statistics for all parallel source instances at the 
intermediate operators.
So in the current implementation:
- Intermediate operators maintain latency statistics for each logical 
source (so not much data)
- Sinks maintain latency statistics for all parallel instances of each 
logical source (a bit more data)
The sinks maintain the additional data to allow users debugging latency 
issues with individual parallel instances (for example when a machine running a 
source has issues).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-10-13 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83203023
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

Do we need the differentiation between sinks/non-sinks anymore now that we 
decided to report the latency for all operators?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-10-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r83201419
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

We would not need to set this as a value in the `StreamConfig` class, 
because its basically a check if `StreamConfig.getOutEdgesInOrder().size == 0`. 
The problem is that `getOutEdgesInOrder()` expects the user code classloader. 
The classloader is usually not available anymore in the operators.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78149051
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
 
+
+   // 

+   //  Metrics
+   // 

+
+   // --- One input stream
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   // --- Two input stream
+   public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+
+   protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+   if(isSink) {
--- End diff --

I forgot to push my latest commit, where this is fixed ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78148192
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -125,6 +126,11 @@ public void processWatermark(Watermark mark) throws 
Exception {
}
 
@Override
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
--- End diff --

Why is this necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78148223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -322,6 +346,151 @@ public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
 
+
+   // 

+   //  Metrics
+   // 

+
+   // --- One input stream
+   public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   // --- Two input stream
+   public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+   public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
+   reportOrForwardLatencyMarker(latencyMarker);
+   }
+
+
+   protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+   if(isSink) {
--- End diff --

Don't all operators now keep the latency statistics? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r78147868
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 ---
@@ -95,11 +100,17 @@ public DirectedOutput(
 
@Override
public void emitWatermark(Watermark mark) {
-   for (Output out : allOutputs) {
+   for (Output out: allOutputs) {
--- End diff --

I think the space was good here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997800
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

could we not set this value in the `operatorConfig` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
 ---
@@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws 
Exception {
}
}
 
+
--- End diff --

unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997021
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -347,4 +521,5 @@ public void close() {
output.close();
}
}
+
--- End diff --

unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
+   LOG.warn("{} has been set to a value below 0: {}. Using 
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+   historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+   }
+
+   latencyGauge = new LatencyGauge(this.metrics, historySize, 
!isSink);
--- End diff --

this looks a bit odd; `latencyGauge = this.metrics.gauge(new 
LatencyGauge(historySize, isSink), "latency")` would be more consistent to how 
other metrics are registered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996378
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -149,6 +154,8 @@
 
private LinkedHashSet registeredPojoTypes = new 
LinkedHashSet<>();
 
+
--- End diff --

new lines should be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996178
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
+The marker contains a timestamp from the time when the record has been 
emitted at the sources.
+Latency marker can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
--- End diff --

marker -> markers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996152
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
--- End diff --

LatencyMaker -> LatencyMarker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-08-18 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2386

[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of 
records going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an 
configured interval. The default value for the interval is 2000 ms. The 
emission of markers can be disabled by setting the interval to 0. 
LatencyMarkers can not "overtake" regular elements. This ensures that the 
measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward 
latency markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. 
This ensures that every marker exists only once in the system, and that 
repartition steps are not causing an explosion in the number of transferred 
markers.
If an operator is a sink, it will maintain the last 512 latencies from each 
known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a 
special LatencyGauge from the sink (every operator can be a sink, if it doesn't 
have any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are 
in-sync, so the latency measurements will be inaccurate if the hardware clocks 
are not correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3660-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2386






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---