ibessonov commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r630787386



##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -82,12 +85,34 @@ public NettyClient(
         if (clientFuture != null)
             throw new IgniteInternalException("Attempted to start an already 
started NettyClient");
 
-        clientFuture = 
NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
-            .thenApply(ch -> {
-                clientCloseFuture = 
NettyUtils.toCompletableFuture(ch.closeFuture());
-                channel = ch;
-
-                return new NettySender(channel, serializationRegistry);
+        clientFuture = new CompletableFuture<>();
+
+        NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
+            .whenComplete((channel, throwable) -> {
+                synchronized (this) {
+                    if (throwable == null) {
+                        CompletableFuture<Void> closeFuture = 
NettyUtils.toCompletableFuture(channel.closeFuture());
+
+                        if (stopped) {
+                            // Close channel in case if client has been 
stopped prior to this moment.
+                            channel.close();
+
+                            // Wait for channel to close and then cancel the 
client future.
+                            closeFuture.whenComplete((unused, ignored) -> {
+                                clientFuture.cancel(true);

Review comment:
       Out of curiosity I read documentation of "cancel" method. Do we really 
need it? Especially with "true" parameter? What's the point if we're the ones 
that execute the task? So many questions.

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -153,35 +161,37 @@ public NettyServer(
              */
             .childOption(ChannelOption.SO_KEEPALIVE, true);
 
-        serverCloseFuture = CompletableFuture.allOf(
-            NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
-            NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
-        );
-
         serverStartFuture = new CompletableFuture<>();
 
         NettyUtils.toChannelCompletableFuture(bootstrap.bind(port))
-            .thenAccept(ch -> {
-                CompletableFuture<Void> channelCloseFuture = 
NettyUtils.toCompletableFuture(ch.closeFuture())
-                    // Shutdown event loops on server stop.
-                    .whenComplete((v, err) -> shutdownEventLoopGroups());
+            .whenComplete((channel, err) -> {
+                synchronized (this) {

Review comment:
       Why "synchronized"?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -90,14 +101,37 @@ public void testServerFailedToStart() throws Exception {
     public void testServerChannelClosedAbruptly() throws Exception {
         var channel = new EmbeddedServerChannel();
 
-        NettyServer server = getServer(channel, true);
+        server = getServer(channel.newSucceededFuture(), true);
 
         channel.close();
 
         assertTrue(server.getBossGroup().isShuttingDown());
         assertTrue(server.getWorkerGroup().isShuttingDown());
     }
 
+    /**
+     * Tests a scenario where a server is stopped before a server socket is 
successfully bound.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerStoppedBeforeStarted() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        var future = channel.newPromise();

Review comment:
       Please use explicit type

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -82,12 +85,34 @@ public NettyClient(
         if (clientFuture != null)
             throw new IgniteInternalException("Attempted to start an already 
started NettyClient");
 
-        clientFuture = 
NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
-            .thenApply(ch -> {
-                clientCloseFuture = 
NettyUtils.toCompletableFuture(ch.closeFuture());
-                channel = ch;
-
-                return new NettySender(channel, serializationRegistry);
+        clientFuture = new CompletableFuture<>();
+
+        NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
+            .whenComplete((channel, throwable) -> {
+                synchronized (this) {

Review comment:
       Why "synchronized"?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
##########
@@ -23,26 +23,38 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Tests for {@link NettyClient}.
  */
 public class NettyClientTest {
+    /** Client. */
+    private NettyClient client;
+
     /** */
     private final SocketAddress address = 
InetSocketAddress.createUnresolved("", 0);
 
+    /** */
+    @AfterEach
+    void tearDown() {

Review comment:
       Why not public?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -35,6 +37,15 @@
  * Tests for {@link NettyServer}.
  */
 public class NettyServerTest {
+    /** Server. */
+    private NettyServer server;
+
+    /** */
+    @AfterEach
+    final void tearDown() {

Review comment:
       Why not public?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
##########
@@ -111,6 +124,36 @@ public void testConnectionClose() throws Exception {
         assertFalse(tuple.client.failedToConnect());
     }
 
+    /**
+     * Tests a scenario where a connection is established successfully after a 
client has been stopped.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStoppedBeforeStarted() throws Exception {
+        var channel = new EmbeddedChannel();
+
+        var future = channel.newPromise();
+
+        ClientAndSender tuple = createClientAndSenderFromChannelFuture(future);
+
+        tuple.client.stop();
+
+        future.setSuccess(null);
+
+        client = tuple.client;
+
+        try {
+            tuple.sender.get(3, TimeUnit.SECONDS);
+            fail();
+        }
+        catch (CancellationException ignored) {

Review comment:
       assertThrows?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to