This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5b705f3b8dcafb915a3d59190a03867a27c9113 Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Fri Sep 6 10:03:26 2019 +0200 [hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput --- .../apache/flink/state/api/output/BoundedStreamTask.java | 2 +- .../flink/streaming/runtime/tasks/SourceStreamTask.java | 2 +- .../flink/streaming/runtime/tasks/StreamIterationHead.java | 2 +- .../apache/flink/streaming/runtime/tasks/StreamTask.java | 4 ++-- .../runtime/tasks/StreamTaskSelectiveReadingTest.java | 4 ++-- .../streaming/runtime/tasks/StreamTaskTerminationTest.java | 2 +- .../flink/streaming/runtime/tasks/StreamTaskTest.java | 14 +++++++------- .../runtime/tasks/SynchronousCheckpointITCase.java | 2 +- .../streaming/runtime/tasks/SynchronousCheckpointTest.java | 4 ++-- .../runtime/tasks/TaskCheckpointingBehaviourTest.java | 2 +- .../org/apache/flink/streaming/util/MockStreamTask.java | 2 +- .../runtime/jobmaster/JobMasterStopWithSavepointIT.java | 4 ++-- 12 files changed, 22 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java index db663da..814f445 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java @@ -76,7 +76,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { if (input.hasNext()) { reuse.replace(input.next()); headOperator.setKeyContextElement1(reuse); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 5caddef..e1f7990 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -111,7 +111,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). sourceThread.setTaskDescription(getName()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index d25bd23..5d71adb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -66,7 +66,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { // ------------------------------------------------------------------------ @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { StreamRecord<OUT> nextRecord = shouldWait ? dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : dataChannel.take(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1b1cfc4..aa84ae9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -275,7 +275,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> * @param context context object for collaborative interaction between the action and the stream task. * @throws Exception on any problems in the action. */ - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { if (!inputProcessor.processInput()) { context.allActionsCompleted(); } @@ -298,7 +298,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - performDefaultAction(actionContext); + processInput(actionContext); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index 1308796..2867f0d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -191,14 +191,14 @@ public class StreamTaskSelectiveReadingTest { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { if (!started) { synchronized (this) { this.wait(); } } - super.performDefaultAction(context); + super.processInput(context); } public void startProcessing() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 72e8a19..ec2f26d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -225,7 +225,7 @@ public class StreamTaskTerminationTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { RUN_LATCH.trigger(); // wait until we have started an asynchronous checkpoint CHECKPOINTING_LATCH.await(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d0295f1..706fb75 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -867,7 +867,7 @@ public class StreamTaskTest extends TestLogger { protected void init() throws Exception {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { context.allActionsCompleted(); } @@ -1074,7 +1074,7 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) { + protected void processInput(ActionContext context) { if (isCanceled() || inputFinished) { context.allActionsCompleted(); } @@ -1111,7 +1111,7 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { if (fail) { throw new RuntimeException(); } @@ -1199,7 +1199,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { holder = new LockHolder(getCheckpointLock(), latch); holder.start(); latch.await(); @@ -1244,7 +1244,7 @@ public class StreamTaskTest extends TestLogger { protected void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); @@ -1310,9 +1310,9 @@ public class StreamTaskTest extends TestLogger { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { syncLatch.await(); - super.performDefaultAction(context); + super.processInput(context); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index 3b4aee3..24320b8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -137,7 +137,7 @@ public class SynchronousCheckpointITCase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { if (!isRunning) { isRunning = true; eventQueue.put(Event.TASK_IS_RUNNING); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java index 8b71423..0af985f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java @@ -171,10 +171,10 @@ public class SynchronousCheckpointTest { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { runningLatch.trigger(); execLatch.await(); - super.performDefaultAction(context); + super.processInput(context); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index e40e23d..b6a5f61 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -474,7 +474,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { public void init() {} @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { triggerCheckpointOnBarrier( new CheckpointMetaData( 11L, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java index 37e7328..a6d3093 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java @@ -81,7 +81,7 @@ public class MockStreamTask extends StreamTask { public void init() { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { context.allActionsCompleted(); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index d54ec1f..6bdb651 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -286,7 +286,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); if (taskIndex == 0) { numberOfRestarts.countDown(); @@ -343,7 +343,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { } @Override - protected void performDefaultAction(ActionContext context) throws Exception { + protected void processInput(ActionContext context) throws Exception { invokeLatch.countDown(); finishLatch.await(); context.allActionsCompleted();