This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5b705f3b8dcafb915a3d59190a03867a27c9113
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Fri Sep 6 10:03:26 2019 +0200

    [hotfix][runtime] Rename StreamTask's performDefaultAction method to 
processInput
---
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../flink/streaming/runtime/tasks/SourceStreamTask.java    |  2 +-
 .../flink/streaming/runtime/tasks/StreamIterationHead.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  4 ++--
 .../runtime/tasks/StreamTaskSelectiveReadingTest.java      |  4 ++--
 .../streaming/runtime/tasks/StreamTaskTerminationTest.java |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java      | 14 +++++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java         |  2 +-
 .../streaming/runtime/tasks/SynchronousCheckpointTest.java |  4 ++--
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java      |  2 +-
 .../org/apache/flink/streaming/util/MockStreamTask.java    |  2 +-
 .../runtime/jobmaster/JobMasterStopWithSavepointIT.java    |  4 ++--
 12 files changed, 22 insertions(+), 22 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index db663da..814f445 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -76,7 +76,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
        }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void processInput(ActionContext context) throws Exception {
                if (input.hasNext()) {
                        reuse.replace(input.next());
                        headOperator.setKeyContextElement1(reuse);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 5caddef..e1f7990 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
        }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void processInput(ActionContext context) throws Exception {
                // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
                // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
                sourceThread.setTaskDescription(getName());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index d25bd23..5d71adb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -66,7 +66,7 @@ public class StreamIterationHead<OUT> extends 
OneInputStreamTask<OUT, OUT> {
        // 
------------------------------------------------------------------------
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void processInput(ActionContext context) throws Exception {
                StreamRecord<OUT> nextRecord = shouldWait ?
                        dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
                        dataChannel.take();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b1cfc4..aa84ae9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -275,7 +275,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         * @param context context object for collaborative interaction between 
the action and the stream task.
         * @throws Exception on any problems in the action.
         */
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void processInput(ActionContext context) throws Exception {
                if (!inputProcessor.processInput()) {
                        context.allActionsCompleted();
                }
@@ -298,7 +298,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                }
                        }
 
-                       performDefaultAction(actionContext);
+                       processInput(actionContext);
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
index 1308796..2867f0d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -191,14 +191,14 @@ public class StreamTaskSelectiveReadingTest {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        if (!started) {
                                synchronized (this) {
                                        this.wait();
                                }
                        }
 
-                       super.performDefaultAction(context);
+                       super.processInput(context);
                }
 
                public void startProcessing() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 72e8a19..ec2f26d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -225,7 +225,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        RUN_LATCH.trigger();
                        // wait until we have started an asynchronous checkpoint
                        CHECKPOINTING_LATCH.await();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d0295f1..706fb75 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -867,7 +867,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() throws Exception {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        context.allActionsCompleted();
                }
 
@@ -1074,7 +1074,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) {
+               protected void processInput(ActionContext context) {
                        if (isCanceled() || inputFinished) {
                                context.allActionsCompleted();
                        }
@@ -1111,7 +1111,7 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        if (fail) {
                                throw new RuntimeException();
                        }
@@ -1199,7 +1199,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        holder = new LockHolder(getCheckpointLock(), latch);
                        holder.start();
                        latch.await();
@@ -1244,7 +1244,7 @@ public class StreamTaskTest extends TestLogger {
                protected void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        final OneShotLatch latch = new OneShotLatch();
                        final Object lock = new Object();
 
@@ -1310,9 +1310,9 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        syncLatch.await();
-                       super.performDefaultAction(context);
+                       super.processInput(context);
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 3b4aee3..24320b8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -137,7 +137,7 @@ public class SynchronousCheckpointITCase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        if (!isRunning) {
                                isRunning = true;
                                eventQueue.put(Event.TASK_IS_RUNNING);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 8b71423..0af985f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -171,10 +171,10 @@ public class SynchronousCheckpointTest {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        runningLatch.trigger();
                        execLatch.await();
-                       super.performDefaultAction(context);
+                       super.processInput(context);
                }
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index e40e23d..b6a5f61 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -474,7 +474,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                public void init() {}
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        triggerCheckpointOnBarrier(
                                new CheckpointMetaData(
                                        11L,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 37e7328..a6d3093 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -81,7 +81,7 @@ public class MockStreamTask extends StreamTask {
        public void init() { }
 
        @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
+       protected void processInput(ActionContext context) throws Exception {
                context.allActionsCompleted();
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index d54ec1f..6bdb651 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -286,7 +286,7 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        final long taskIndex = 
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
                        if (taskIndex == 0) {
                                numberOfRestarts.countDown();
@@ -343,7 +343,7 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
                }
 
                @Override
-               protected void performDefaultAction(ActionContext context) 
throws Exception {
+               protected void processInput(ActionContext context) throws 
Exception {
                        invokeLatch.countDown();
                        finishLatch.await();
                        context.allActionsCompleted();

Reply via email to