This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 41dbe2e  GEODE-5747: Handling SocketException consistently (#2504)
41dbe2e is described below

commit 41dbe2e5804939df0b33ae47e57da50f6c7d9152
Author: Ryan McMahon <rmcma...@pivotal.io>
AuthorDate: Tue Oct 2 12:48:11 2018 -0700

    GEODE-5747: Handling SocketException consistently (#2504)
---
 .../internal/tcpserver/TcpServerJUnitTest.java     | 73 ++++++++++++++++++++--
 .../distributed/internal/tcpserver/TcpServer.java  |  3 +-
 .../geode/internal/InternalDataSerializer.java     | 44 ++++---------
 .../internal/InternalDataSerializerJUnitTest.java  | 31 +++++++++
 4 files changed, 111 insertions(+), 40 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
index 71a1eba..4f83560 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
@@ -18,12 +18,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
+import java.net.SocketException;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +40,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.DataSerializable;
 import org.apache.geode.cache.GemFireCache;
@@ -79,18 +87,20 @@ public class TcpServerJUnitTest {
     EchoHandler handler = new EchoHandler();
     start(handler);
 
+    TcpClient tcpClient = new TcpClient();
+
     TestObject test = new TestObject();
     test.id = 5;
     TestObject result =
-        (TestObject) new TcpClient().requestToServer(localhost, port, test, 60 
* 1000);
+        (TestObject) tcpClient.requestToServer(localhost, port, test, 60 * 
1000);
     assertEquals(test.id, result.id);
 
-    String[] info = new TcpClient().getInfo(localhost, port);
+    String[] info = tcpClient.getInfo(localhost, port);
     assertNotNull(info);
     assertTrue(info.length > 1);
 
     try {
-      new TcpClient().stop(localhost, port);
+      tcpClient.stop(localhost, port);
     } catch (ConnectException ignore) {
       // must not be running
     }
@@ -109,12 +119,14 @@ public class TcpServerJUnitTest {
     DelayHandler handler = new DelayHandler(latch);
     start(handler);
 
+    TcpClient tcpClient = new TcpClient();
+
     final AtomicBoolean done = new AtomicBoolean();
     Thread delayedThread = new Thread() {
       public void run() {
         Boolean delay = Boolean.valueOf(true);
         try {
-          new TcpClient().requestToServer(localhost, port, delay, 60 * 1000);
+          tcpClient.requestToServer(localhost, port, delay, 60 * 1000);
         } catch (IOException e) {
           e.printStackTrace();
         } catch (ClassNotFoundException e) {
@@ -127,7 +139,7 @@ public class TcpServerJUnitTest {
     try {
       Thread.sleep(500);
       assertFalse(done.get());
-      new TcpClient().requestToServer(localhost, port, Boolean.valueOf(false), 
60 * 1000);
+      tcpClient.requestToServer(localhost, port, Boolean.valueOf(false), 60 * 
1000);
       assertFalse(done.get());
 
       latch.countDown();
@@ -138,7 +150,7 @@ public class TcpServerJUnitTest {
       delayedThread.join(60 * 1000);
       assertTrue(!delayedThread.isAlive()); // GemStoneAddition
       try {
-        new TcpClient().stop(localhost, port);
+        tcpClient.stop(localhost, port);
       } catch (ConnectException ignore) {
         // must not be running
       }
@@ -146,6 +158,55 @@ public class TcpServerJUnitTest {
     }
   }
 
+  @Test
+  public void testNewConnectionsAcceptedAfterSocketException() throws 
IOException,
+      ClassNotFoundException, InterruptedException {
+    // Initially mock the handler to throw a SocketException. We want to 
verify that the server
+    // can recover and serve new client requests after a SocketException is 
thrown.
+    TcpHandler mockTcpHandler = mock(TcpHandler.class);
+    
doThrow(SocketException.class).when(mockTcpHandler).processRequest(any(Object.class));
+    start(mockTcpHandler);
+
+    TcpClient tcpClient = new TcpClient();
+
+    // Due to the mocked handler, an EOFException will be thrown on the 
client. This is expected,
+    // so we just catch it.
+    try {
+      tcpClient.requestToServer(localhost, port, new TestObject(), 60 * 1000);
+    } catch (EOFException eofEx) {
+    }
+
+    // Change the mock handler behavior to echo the request back
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return invocation.getArgument(0);
+      }
+    }).when(mockTcpHandler).processRequest(any(Object.class));
+
+    // Perform another request and validate that it was served successfully
+    TestObject test = new TestObject();
+    test.id = 5;
+    TestObject result =
+        (TestObject) tcpClient.requestToServer(localhost, port, test, 60 * 
1000);
+
+    assertEquals(test.id, result.id);
+    String[] info = tcpClient.getInfo(localhost, port);
+    assertNotNull(info);
+    assertTrue(info.length > 1);
+
+    try {
+      tcpClient.stop(localhost, port);
+    } catch (ConnectException ignore) {
+      // must not be running
+    }
+    server.join(60 * 1000);
+    assertFalse(server.isAlive());
+
+    assertEquals(5, stats.started.get());
+    assertEquals(5, stats.ended.get());
+  }
+
   private static class TestObject implements DataSerializable {
     int id;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 266e6a1..eaffc03 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.net.URL;
 import java.util.Date;
 import java.util.HashMap;
@@ -386,7 +387,7 @@ public class TcpServer {
         } else {
           rejectUnknownProtocolConnection(socket, firstByte);
         }
-      } catch (EOFException ignore) {
+      } catch (EOFException | SocketException ignore) {
         // client went away - ignore
       } catch (CancelException ignore) {
         // ignore
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
 
b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 90921b9..dfa5f07 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2480,6 +2480,13 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
         } else {
           ((DataSerializable) ds).fromData(in);
         }
+
+        if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
+          logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read {} {}",
+              ds instanceof DataSerializableFixedID ? "DataSerializableFixedID"
+                  : "DataSerializable",
+              ds);
+        }
       }
     } catch (EOFException | ClassNotFoundException | CacheClosedException | 
SocketException ex) {
       // client went away - ignore
@@ -2499,16 +2506,11 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       Constructor init = c.getConstructor(new Class[0]);
       init.setAccessible(true);
       Object o = init.newInstance(new Object[0]);
-      Assert.assertTrue(o instanceof DataSerializable);
-      invokeFromData(o, in);
 
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read DataSerializable {}", 
o);
-      }
+      invokeFromData(o, in);
 
       return o;
-
-    } catch (EOFException ex) {
+    } catch (EOFException | SocketException ex) {
       // client went away - ignore
       throw ex;
     } catch (Exception ex) {
@@ -2519,30 +2521,6 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
     }
   }
 
-  private static Object readDataSerializableFixedID(final DataInput in)
-      throws IOException, ClassNotFoundException {
-    Class c = readClass(in);
-    try {
-      Constructor init = c.getConstructor(new Class[0]);
-      init.setAccessible(true);
-      Object o = init.newInstance(new Object[0]);
-
-      invokeFromData(o, in);
-
-      if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
-        logger.trace(LogMarker.SERIALIZER_VERBOSE, "Read 
DataSerializableFixedID {}", o);
-      }
-
-      return o;
-
-    } catch (Exception ex) {
-      throw new SerializationException(
-          LocalizedStrings.DataSerializer_COULD_NOT_CREATE_AN_INSTANCE_OF_0
-              .toLocalizedString(c.getName()),
-          ex);
-    }
-  }
-
   /**
    * Get the {@link Version} of the peer or disk store that created this 
{@link DataInput}.
    */
@@ -2643,7 +2621,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       case DS_FIXED_ID_SHORT:
         return DSFIDFactory.create(in.readShort(), in);
       case DS_NO_FIXED_ID:
-        return readDataSerializableFixedID(in);
+        return readDataSerializable(in);
       case DS_FIXED_ID_INT:
         return DSFIDFactory.create(in.readInt(), in);
       default:
@@ -2784,7 +2762,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       case DS_FIXED_ID_INT:
         return DSFIDFactory.create(in.readInt(), in);
       case DS_NO_FIXED_ID:
-        return readDataSerializableFixedID(in);
+        return readDataSerializable(in);
       case NULL:
         return null;
       case NULL_STRING:
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java
index 2ef8e73..f787141 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/InternalDataSerializerJUnitTest.java
@@ -17,10 +17,12 @@ package org.apache.geode.internal;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.net.SocketException;
 import java.util.Properties;
@@ -70,6 +72,20 @@ public class InternalDataSerializerJUnitTest {
         .isInstanceOf(SocketException.class);
   }
 
+  @Test
+  public void testBasicReadObject_SocketExceptionReThrown()
+      throws IOException, ClassNotFoundException {
+    DataInput in = mock(DataInput.class);
+    doReturn(DSCODE.DS_NO_FIXED_ID.toByte()).doReturn(DSCODE.CLASS.toByte())
+        .doReturn(DSCODE.STRING.toByte()).when(in).readByte();
+    doReturn(
+        
"org.apache.geode.internal.InternalDataSerializerJUnitTest$SocketExceptionThrowingDataSerializable")
+            .when(in).readUTF();
+
+    assertThatThrownBy(() -> InternalDataSerializer.basicReadObject(in))
+        .isInstanceOf(SocketException.class);
+  }
+
   class TestFunction implements Function {
     @Override
     public void execute(FunctionContext context) {
@@ -79,4 +95,19 @@ public class InternalDataSerializerJUnitTest {
 
   class TestPdxSerializerObject implements PdxSerializerObject {
   }
+
+  // Class must be static in order to call the constructor via reflection in 
the serializer
+  public static class SocketExceptionThrowingDataSerializable implements 
DataSerializable {
+    public SocketExceptionThrowingDataSerializable() {}
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      // Not needed for test
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      throw new SocketException();
+    }
+  }
 }

Reply via email to