This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0189db46316eff35d3d07f57ebcf116b1f61cea6 Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Mon May 20 18:55:59 2019 +0800 [hotfix][network] Introduce PartititonRequestClient interface for creating simple client instance in tests --- .../runtime/io/network/ConnectionManager.java | 2 - .../runtime/io/network/LocalConnectionManager.java | 2 - .../runtime/io/network/PartitionRequestClient.java | 70 ++++++++++++++++++++++ .../io/network/netty/NettyConnectionManager.java | 1 + ...lient.java => NettyPartitionRequestClient.java} | 22 +++---- .../netty/PartitionRequestClientFactory.java | 21 +++---- .../partition/consumer/RemoteInputChannel.java | 2 +- .../netty/ClientTransportErrorHandlingTest.java | 19 +++--- ...editBasedPartitionRequestClientHandlerTest.java | 5 +- ...t.java => NettyPartitionRequestClientTest.java} | 9 +-- .../network/partition/InputChannelTestUtils.java | 2 +- .../partition/consumer/RemoteInputChannelTest.java | 2 +- 12 files changed, 113 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 75f39e9..c342750 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; - import java.io.IOException; /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 46ca7fc..319a9ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; - /** * A connection manager implementation to bypass setup overhead for task managers running in local * execution mode. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java new file mode 100644 index 0000000..a215700 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import java.io.IOException; + +/** + * Client to send messages or task events via network for {@link RemoteInputChannel}. + */ +public interface PartitionRequestClient { + + /** + * Requests a remote sub partition. + * + * @param partitionId The identifier of result partition to be requested. + * @param subpartitionIndex The sub partition index in the requested result partition. + * @param inputChannel The remote input channel for requesting the sub partition. + * @param delayMs The request is scheduled within a delay time. + */ + void requestSubpartition( + ResultPartitionID partitionId, + int subpartitionIndex, + RemoteInputChannel inputChannel, + int delayMs) throws IOException; + + /** + * Notifies available credits from one remote input channel. + * + * @param inputChannel The remote input channel who announces the available credits. + */ + void notifyCreditAvailable(RemoteInputChannel inputChannel); + + /** + * Sends a task event backwards to an intermediate result partition. + * + * @param partitionId The identifier of result partition. + * @param event The task event to be sent. + * @param inputChannel The remote input channel for sending this event. + */ + void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, RemoteInputChannel inputChannel) throws IOException; + + /** + * Cancels the partition request for the given remote input channel and removes + * this client from factory if it is not referenced by any other input channels. + * + * @param inputChannel The remote input channel for canceling partition and to + * be removed from network stack. + */ + void close(RemoteInputChannel inputChannel) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index 73d9b11..ef3db13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.TaskEventPublisher; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java index 9c9deaa..4d42a3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.event.TaskEvent; -import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; @@ -47,9 +48,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>This client is shared by all remote input channels, which request a partition * from the same {@link ConnectionID}. */ -public class PartitionRequestClient { +public class NettyPartitionRequestClient implements PartitionRequestClient { - private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class); + private static final Logger LOG = LoggerFactory.getLogger(NettyPartitionRequestClient.class); private final Channel tcpChannel; @@ -62,7 +63,7 @@ public class PartitionRequestClient { /** If zero, the underlying TCP channel can be safely closed. */ private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter(); - PartitionRequestClient( + NettyPartitionRequestClient( Channel tcpChannel, NetworkClientHandler clientHandler, ConnectionID connectionId, @@ -94,7 +95,8 @@ public class PartitionRequestClient { * <p>The request goes to the remote producer, for which this partition * request client instance has been created. */ - public ChannelFuture requestSubpartition( + @Override + public void requestSubpartition( final ResultPartitionID partitionId, final int subpartitionIndex, final RemoteInputChannel inputChannel, @@ -128,7 +130,6 @@ public class PartitionRequestClient { if (delayMs == 0) { ChannelFuture f = tcpChannel.writeAndFlush(request); f.addListener(listener); - return f; } else { final ChannelFuture[] f = new ChannelFuture[1]; tcpChannel.eventLoop().schedule(new Runnable() { @@ -138,19 +139,18 @@ public class PartitionRequestClient { f[0].addListener(listener); } }, delayMs, TimeUnit.MILLISECONDS); - - return f[0]; } } /** * Sends a task event backwards to an intermediate result partition producer. - * <p> - * Backwards task events flow between readers and writers and therefore + * + * <p>Backwards task events flow between readers and writers and therefore * will only work when both are running at the same time, which is only * guaranteed to be the case when both the respective producer and * consumer task run pipelined. */ + @Override public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException { checkNotClosed(); @@ -170,10 +170,12 @@ public class PartitionRequestClient { }); } + @Override public void notifyCreditAvailable(RemoteInputChannel inputChannel) { clientHandler.notifyCreditAvailable(inputChannel); } + @Override public void close(RemoteInputChannel inputChannel) throws IOException { clientHandler.removeInputChannel(inputChannel); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index 2df094b..229121e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; @@ -33,7 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** - * Factory for {@link PartitionRequestClient} instances. + * Factory for {@link NettyPartitionRequestClient} instances. * * <p>Instances of partition requests clients are shared among several {@link RemoteInputChannel} * instances. @@ -50,19 +51,19 @@ class PartitionRequestClientFactory { /** * Atomically establishes a TCP connection to the given remote address and - * creates a {@link PartitionRequestClient} instance for this connection. + * creates a {@link NettyPartitionRequestClient} instance for this connection. */ - PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { + NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { Object entry; - PartitionRequestClient client = null; + NettyPartitionRequestClient client = null; while (client == null) { entry = clients.get(connectionId); if (entry != null) { // Existing channel or connecting channel - if (entry instanceof PartitionRequestClient) { - client = (PartitionRequestClient) entry; + if (entry instanceof NettyPartitionRequestClient) { + client = (NettyPartitionRequestClient) entry; } else { ConnectingChannel future = (ConnectingChannel) entry; @@ -92,7 +93,7 @@ class PartitionRequestClientFactory { clients.replace(connectionId, old, client); } else { - client = (PartitionRequestClient) old; + client = (NettyPartitionRequestClient) old; } } @@ -166,7 +167,7 @@ class PartitionRequestClientFactory { synchronized (connectLock) { try { NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); - partitionRequestClient = new PartitionRequestClient( + partitionRequestClient = new NettyPartitionRequestClient( channel, clientHandler, connectionId, clientFactory); if (disposeRequestClient) { @@ -181,11 +182,11 @@ class PartitionRequestClientFactory { } } - private volatile PartitionRequestClient partitionRequestClient; + private volatile NettyPartitionRequestClient partitionRequestClient; private volatile Throwable error; - private PartitionRequestClient waitForChannel() throws IOException, InterruptedException { + private NettyPartitionRequestClient waitForChannel() throws IOException, InterruptedException { synchronized (connectLock) { while (error == null && partitionRequestClient == null) { connectLock.wait(2000); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 397c4fe..fabc495 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -24,13 +24,13 @@ import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.util.ExceptionUtils; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 266461b..a0689e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; @@ -56,7 +56,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.isA; @@ -113,7 +112,7 @@ public class ClientTransportErrorHandlingTest { } }); - PartitionRequestClient requestClient = new PartitionRequestClient( + PartitionRequestClient requestClient = new NettyPartitionRequestClient( ch, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); // Create input channels @@ -134,22 +133,20 @@ public class ClientTransportErrorHandlingTest { }).when(rich[1]).onError(isA(LocalTransportException.class)); // First request is successful - ChannelFuture f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0); - assertTrue(f.await().isSuccess()); + requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0); // Second request is *not* successful - f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0); - assertFalse(f.await().isSuccess()); + requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0); - // Only the second channel should be notified about the error - verify(rich[0], times(0)).onError(any(LocalTransportException.class)); - - // Wait for the notification + // Wait for the notification and it could confirm all the request operations are done if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) { fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about the channel error."); } + // Only the second channel should be notified about the error + verify(rich[0], times(0)).onError(any(LocalTransportException.class)); + shutdown(serverAndClient); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java index 92ae98d..1517dc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -244,7 +245,7 @@ public class CreditBasedPartitionRequestClientHandlerTest { public void testNotifyCreditAvailable() throws Exception { final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final PartitionRequestClient client = new PartitionRequestClient( + final PartitionRequestClient client = new NettyPartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2); @@ -344,7 +345,7 @@ public class CreditBasedPartitionRequestClientHandlerTest { public void testNotifyCreditAvailableAfterReleased() throws Exception { final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final PartitionRequestClient client = new PartitionRequestClient( + final PartitionRequestClient client = new NettyPartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java index dcc3ad4..a119b51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; @@ -41,9 +42,9 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; /** - * Tests for {@link PartitionRequestClient}. + * Tests for {@link NettyPartitionRequestClient}. */ -public class PartitionRequestClientTest { +public class NettyPartitionRequestClientTest { @Test public void testRetriggerPartitionRequest() throws Exception { @@ -51,7 +52,7 @@ public class PartitionRequestClientTest { final PartitionRequestClientHandler handler = new PartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final PartitionRequestClient client = new PartitionRequestClient( + final PartitionRequestClient client = new NettyPartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); final int numExclusiveBuffers = 2; @@ -110,7 +111,7 @@ public class PartitionRequestClientTest { public void testDoublePartitionRequest() throws Exception { final PartitionRequestClientHandler handler = new PartitionRequestClientHandler(); final EmbeddedChannel channel = new EmbeddedChannel(handler); - final PartitionRequestClient client = new PartitionRequestClient( + final PartitionRequestClient client = new NettyPartitionRequestClient( channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class)); final int numExclusiveBuffers = 2; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index ece1009..4ff472e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -22,8 +22,8 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index e7f0648..6c03106 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -22,11 +22,11 @@ import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.PartitionRequestClient; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;