GEODE-3170: Closed socket doesn't result in an infinite loop. This closes #633

* Protobuf deserialization returning null is handled.
* IOException causes GenericProtocolServerConnection to close.
* Added a couple of JUnit tests.

Signed-off-by: Hitesh Khamesra <hkames...@pivotal.io>
Signed-off-by: Galen O'Sullivan <gosulli...@pivotal.io>


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9905794e
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9905794e
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9905794e

Branch: refs/heads/feature/GEM-1483
Commit: 9905794eb0412f881c9e373852b463ef3f132ea3
Parents: 4fddda9
Author: Galen O'Sullivan <gosulli...@pivotal.io>
Authored: Thu Jul 13 11:26:09 2017 -0700
Committer: Udo Kohlmeyer <ukohlme...@pivotal.io>
Committed: Fri Jul 14 14:23:41 2017 -0700

----------------------------------------------------------------------
 .../GenericProtocolServerConnection.java        | 17 +-----
 .../cache/tier/sockets/ServerConnection.java    |  2 +-
 .../GenericProtocolServerConnectionTest.java    | 62 ++++++++++++++++++++
 .../sockets/ServerConnectionFactoryTest.java    |  1 +
 .../InvalidProtocolMessageException.java        |  2 +-
 .../protobuf/ProtobufStreamProcessor.java       |  4 ++
 .../protobuf/ProtobufStreamProcessorTest.java   | 41 +++++++++++++
 7 files changed, 112 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index a2e7305..b155d7e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -26,27 +26,15 @@ import java.io.OutputStream;
 import java.net.Socket;
 
 /**
- * Holds the socket and protocol handler for the new client protocol. TODO: 
Currently unimplemented
- * due the the protocol not being there.
+ * Holds the socket and protocol handler for the new client protocol.
  */
 public class GenericProtocolServerConnection extends ServerConnection {
   // The new protocol lives in a separate module and gets loaded when this 
class is instantiated.
-  // TODO implement this.
   private final ClientProtocolMessageHandler messageHandler;
 
   /**
    * Creates a new <code>GenericProtocolServerConnection</code> that processes 
messages received
    * from an edge client over a given <code>Socket</code>.
-   *
-   * @param s
-   * @param c
-   * @param helper
-   * @param stats
-   * @param hsTimeout
-   * @param socketBufferSize
-   * @param communicationModeStr
-   * @param communicationMode
-   * @param acceptor
    */
   public GenericProtocolServerConnection(Socket s, InternalCache c, 
CachedRegionHelper helper,
       CacheServerStats stats, int hsTimeout, int socketBufferSize, String 
communicationModeStr,
@@ -68,9 +56,8 @@ public class GenericProtocolServerConnection extends 
ServerConnection {
       messageHandler.receiveMessage(inputStream, outputStream, 
this.getCache());
     } catch (IOException e) {
       logger.warn(e);
-      // TODO?
+      this.setFlagProcessMessagesAsFalse(); // TODO: better shutdown.
     }
-    return;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 8704dad..870d0ff 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -694,7 +694,7 @@ public abstract class ServerConnection implements Runnable {
   private boolean clientDisconnectedCleanly = false;
   private Throwable clientDisconnectedException;
   private int failureCount = 0;
-  private boolean processMessages = true;
+  protected boolean processMessages = true;
 
   protected void doHandshake() {
     // hitesh:to create new connection handshake

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
new file mode 100644
index 0000000..3bfcd8b
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+@Category(UnitTest.class)
+public class GenericProtocolServerConnectionTest {
+  @Test
+  public void testProcessFlag() throws IOException {
+    try {
+      System.setProperty("geode.feature-protobuf-protocol", "true");
+      ServerConnection serverConnection = 
IOExceptionThrowingServerConnection();
+      Assert.assertTrue(serverConnection.processMessages);
+      serverConnection.doOneMessage();
+      Assert.assertTrue(!serverConnection.processMessages);
+    } finally {
+      System.clearProperty("geode.feature-protobuf-protocol");
+    }
+  }
+
+  private static ServerConnection IOExceptionThrowingServerConnection() throws 
IOException {
+    Socket socketMock = mock(Socket.class);
+    
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
+
+    ClientProtocolMessageHandler clientProtocolMock = 
mock(ClientProtocolMessageHandler.class);
+    doThrow(new IOException()).when(clientProtocolMock).receiveMessage(any(), 
any(), any());
+
+    return new GenericProtocolServerConnection(socketMock, 
mock(InternalCache.class),
+        mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+        Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL, mock(AcceptorImpl.class), 
clientProtocolMock,
+        mock(SecurityService.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 4e994cd..b3c3e32 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-protobuf/src/main/java/org/apache/geode/protocol/exception/InvalidProtocolMessageException.java
----------------------------------------------------------------------
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/protocol/exception/InvalidProtocolMessageException.java
 
b/geode-protobuf/src/main/java/org/apache/geode/protocol/exception/InvalidProtocolMessageException.java
index dae03a1..8903b8a 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/protocol/exception/InvalidProtocolMessageException.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/protocol/exception/InvalidProtocolMessageException.java
@@ -15,7 +15,7 @@
 package org.apache.geode.protocol.exception;
 
 /**
- * Indicates that a message didn't properly follow it's protocol specification.
+ * Indicates that a message didn't properly follow its protocol specification.
  */
 public class InvalidProtocolMessageException extends Exception {
   public InvalidProtocolMessageException(String message) {

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
index ef4affa..980495a 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import 
org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -75,6 +76,9 @@ public class ProtobufStreamProcessor implements 
ClientProtocolMessageHandler {
   public void processOneMessage(InputStream inputStream, OutputStream 
outputStream, Cache cache)
       throws InvalidProtocolMessageException, 
OperationHandlerNotRegisteredException, IOException {
     ClientProtocol.Message message = 
protobufProtocolSerializer.deserialize(inputStream);
+    if (message == null) {
+      throw new EOFException("Tried to deserialize protobuf message at EOF");
+    }
 
     ClientProtocol.Request request = message.getRequest();
     ClientProtocol.Response response = protobufOpsProcessor.process(request, 
cache);

http://git-wip-us.apache.org/repos/asf/geode/blob/9905794e/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
new file mode 100644
index 0000000..ba13fb3
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.protobuf;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+@Category(UnitTest.class)
+public class ProtobufStreamProcessorTest {
+  @Test(expected = EOFException.class)
+  public void receiveMessage() throws Exception {
+    InputStream inputStream = new ByteArrayInputStream(new byte[0]);
+    OutputStream outputStream = new ByteArrayOutputStream(2);
+
+    ProtobufStreamProcessor protobufStreamProcessor = new 
ProtobufStreamProcessor();
+    InternalCache mockInternalCache = mock(InternalCache.class);
+    protobufStreamProcessor.receiveMessage(inputStream, outputStream, 
mockInternalCache);
+  }
+}

Reply via email to