Repository: flink
Updated Branches:
  refs/heads/master c212701d5 -> 9435370e7


[FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker

Make triggerStackTraceSampleInternal private again and add locking to
triggerStackTraceSample.

This closes #5422.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9435370e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9435370e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9435370e

Branch: refs/heads/master
Commit: 9435370e76098f8ea3b689411c085c82a253a6d3
Parents: c85d5e3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Feb 8 12:38:53 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 8 12:40:29 2018 +0100

----------------------------------------------------------------------
 .../backpressure/BackPressureStatsTracker.java  | 54 ++++++++++----------
 .../BackPressureStatsTrackerTest.java           |  8 ++-
 2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
index 8c130e6..ec8a451 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -172,40 +171,39 @@ public class BackPressureStatsTracker {
         * @param vertex Operator to get the stats for.
         * @return Flag indicating whether a sample with triggered.
         */
-       @VisibleForTesting
-       boolean triggerStackTraceSampleInternal(final ExecutionJobVertex 
vertex) {
-               synchronized (lock) {
-                       if (shutDown) {
-                               return false;
-                       }
+       private boolean triggerStackTraceSampleInternal(final 
ExecutionJobVertex vertex) {
+               assert(Thread.holdsLock(lock));
 
-                       if (!pendingStats.contains(vertex) &&
-                               
!vertex.getGraph().getState().isGloballyTerminalState()) {
+               if (shutDown) {
+                       return false;
+               }
 
-                               Executor executor = 
vertex.getGraph().getFutureExecutor();
+               if (!pendingStats.contains(vertex) &&
+                       
!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-                               // Only trigger if still active job
-                               if (executor != null) {
-                                       pendingStats.add(vertex);
+                       Executor executor = 
vertex.getGraph().getFutureExecutor();
 
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
-                                       }
+                       // Only trigger if still active job
+                       if (executor != null) {
+                               pendingStats.add(vertex);
 
-                                       CompletableFuture<StackTraceSample> 
sample = coordinator.triggerStackTraceSample(
-                                               vertex.getTaskVertices(),
-                                               numSamples,
-                                               delayBetweenSamples,
-                                               MAX_STACK_TRACE_DEPTH);
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Triggering stack trace 
sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+                               }
 
-                                       sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
+                               CompletableFuture<StackTraceSample> sample = 
coordinator.triggerStackTraceSample(
+                                       vertex.getTaskVertices(),
+                                       numSamples,
+                                       delayBetweenSamples,
+                                       MAX_STACK_TRACE_DEPTH);
 
-                                       return true;
-                               }
-                       }
+                               sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
 
-                       return false;
+                               return true;
+                       }
                }
+
+               return false;
        }
 
        /**
@@ -220,7 +218,9 @@ public class BackPressureStatsTracker {
         */
        @Deprecated
        public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
-               return triggerStackTraceSampleInternal(vertex);
+               synchronized (lock) {
+                       return triggerStackTraceSampleInternal(vertex);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
index debf71d..0bbf5f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
@@ -93,18 +93,16 @@ public class BackPressureStatsTrackerTest extends 
TestLogger {
                // getOperatorBackPressureStats triggers stack trace sampling
                
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
 
-               Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
                                Matchers.eq(taskVertices),
                                Matchers.eq(numSamples),
                                Matchers.eq(delayBetweenSamples),
                                
Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
 
-               // Trigger again for pending request, should not fire
-               Assert.assertFalse("Unexpected trigger", 
tracker.triggerStackTraceSampleInternal(jobVertex));
-
+               // Request back pressure stats again. This should not trigger 
another sample request
                
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
 
-               Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
                                Matchers.eq(taskVertices),
                                Matchers.eq(numSamples),
                                Matchers.eq(delayBetweenSamples),

Reply via email to