[FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest This closes #5697.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac077615 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac077615 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac077615 Branch: refs/heads/master Commit: ac077615d244661c99232a6b3a4b88afd9186e11 Parents: f0a6ff7 Author: zentol <ches...@apache.org> Authored: Mon Mar 12 13:21:01 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Apr 4 08:59:05 2018 +0200 ---------------------------------------------------------------------- .../LegacyScheduleOrUpdateConsumersTest.java | 168 +++++++++++++++++++ .../ScheduleOrUpdateConsumersTest.java | 34 +++- 2 files changed, 193 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ac077615/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java new file mode 100644 index 0000000..846901a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; + +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY; + +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger { + + private static final int NUMBER_OF_TMS = 2; + private static final int NUMBER_OF_SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUp() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDown() throws Exception { + flink.stop(); + } + + /** + * Tests notifications of multiple receivers when a task produces both a pipelined and blocking + * result. + * + * <pre> + * +----------+ + * +-- pipelined -> | Receiver | + * +--------+ | +----------+ + * | Sender |-| + * +--------+ | +----------+ + * +-- blocking --> | Receiver | + * +----------+ + * </pre> + * + * The pipelined receiver gets deployed after the first buffer is available and the blocking + * one after all subtasks are finished. + */ + @Test + public void testMixedPipelinedAndBlockingResults() throws Exception { + final JobVertex sender = new JobVertex("Sender"); + sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class); + sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM); + sender.setParallelism(PARALLELISM); + + final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver"); + pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class); + pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM); + pipelinedReceiver.setParallelism(PARALLELISM); + + pipelinedReceiver.connectNewDataSetAsInput( + sender, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED); + + final JobVertex blockingReceiver = new JobVertex("Blocking Receiver"); + blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class); + blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM); + blockingReceiver.setParallelism(PARALLELISM); + + blockingReceiver.connectNewDataSetAsInput(sender, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING); + + SlotSharingGroup slotSharingGroup = new SlotSharingGroup( + sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID()); + + sender.setSlotSharingGroup(slotSharingGroup); + pipelinedReceiver.setSlotSharingGroup(slotSharingGroup); + blockingReceiver.setSlotSharingGroup(slotSharingGroup); + + final JobGraph jobGraph = new JobGraph( + "Mixed pipelined and blocking result", + sender, + pipelinedReceiver, + blockingReceiver); + + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + } + + // --------------------------------------------------------------------------------------------- + + public static class BinaryRoundRobinSubtaskIndexSender extends AbstractInvokable { + + public static final String CONFIG_KEY = "number-of-times-to-send"; + + public BinaryRoundRobinSubtaskIndexSender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2); + + // The order of intermediate result creation in the job graph specifies which produced + // result partition is pipelined/blocking. + final RecordWriter<IntValue> pipelinedWriter = + new RecordWriter<>(getEnvironment().getWriter(0)); + + final RecordWriter<IntValue> blockingWriter = + new RecordWriter<>(getEnvironment().getWriter(1)); + + writers.add(pipelinedWriter); + writers.add(blockingWriter); + + final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0); + + final IntValue subtaskIndex = new IntValue( + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); + + // Produce the first intermediate result and then the second in a serial fashion. + for (RecordWriter<IntValue> writer : writers) { + try { + for (int i = 0; i < numberOfTimesToSend; i++) { + writer.emit(subtaskIndex); + } + writer.flushAll(); + } + finally { + writer.clearBuffers(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ac077615/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index f55dfe4..c743a63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -18,16 +18,20 @@ package org.apache.flink.runtime.jobmanager.scheduler; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest; -import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.testutils.category.Flip6; import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; @@ -36,30 +40,42 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.experimental.categories.Category; import java.util.List; import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY; +@Category(Flip6.class) public class ScheduleOrUpdateConsumersTest extends TestLogger { private static final int NUMBER_OF_TMS = 2; private static final int NUMBER_OF_SLOTS_PER_TM = 2; private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; - private static TestingCluster flink; + private static MiniCluster flink; @BeforeClass public static void setUp() throws Exception { - flink = TestingUtils.startTestingCluster( - NUMBER_OF_SLOTS_PER_TM, - NUMBER_OF_TMS, - TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + final Configuration config = new Configuration(); + config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(config) + .setNumTaskManagers(NUMBER_OF_TMS) + .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) + .build(); + + flink = new MiniCluster(miniClusterConfiguration); + + flink.start(); } @AfterClass public static void tearDown() throws Exception { - flink.stop(); + if (flink != null) { + flink.close(); + } } /** @@ -118,7 +134,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger { pipelinedReceiver, blockingReceiver); - flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + flink.executeJobBlocking(jobGraph); } // ---------------------------------------------------------------------------------------------