Author: bryanduxbury
Date: Mon Sep 27 17:12:36 2010
New Revision: 1001820

URL: http://svn.apache.org/viewvc?rev=1001820&view=rev
Log:
THRIFT-917. java: THsHaServer should not accept an ExecutorService without 
catching RejectedExecutionException

This patch catches RejectedExecutionException from requestInvoke and closes the 
client connection when that occurs.

Patch: Ed Ceaser

Modified:
    
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
    
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java

Modified: 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java?rev=1001820&r1=1001819&r2=1001820&view=diff
==============================================================================
--- 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java 
(original)
+++ 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java 
Mon Sep 27 17:12:36 2010
@@ -22,6 +22,7 @@ package org.apache.thrift.server;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -31,12 +32,16 @@ import org.apache.thrift.protocol.TBinar
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
  * Like TNonblockingServer, it relies on the use of TFramedTransport.
  */
 public class THsHaServer extends TNonblockingServer {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(THsHaServer.class.getName());
 
   // This wraps all the functionality of queueing and thread pool management
   // for the passing of Invocations from the Selector to workers.
@@ -285,8 +290,14 @@ public class THsHaServer extends TNonblo
    * invoker service instead of immediately invoking. The thread pool takes 
care of the rest.
    */
   @Override
-  protected void requestInvoke(FrameBuffer frameBuffer) {
-    invoker.execute(new Invocation(frameBuffer));
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    try {
+      invoker.execute(new Invocation(frameBuffer));
+      return true;
+    } catch (RejectedExecutionException rx) {
+      LOGGER.warn("ExecutorService rejected execution!", rx);
+      return false;
+    }
   }
 
   /**

Modified: 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java?rev=1001820&r1=1001819&r2=1001820&view=diff
==============================================================================
--- 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
 (original)
+++ 
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
 Mon Sep 27 17:12:36 2010
@@ -254,8 +254,9 @@ public class TNonblockingServer extends 
    * Perform an invocation. This method could behave several different ways
    * - invoke immediately inline, queue for separate execution, etc.
    */
-  protected void requestInvoke(FrameBuffer frameBuffer) {
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
     frameBuffer.invoke();
+    return true;
   }
 
   /**
@@ -420,13 +421,16 @@ public class TNonblockingServer extends 
      */
     private void handleRead(SelectionKey key) {
       FrameBuffer buffer = (FrameBuffer)key.attachment();
-      if (buffer.read()) {
-        // if the buffer's frame read is complete, invoke the method.
-        if (buffer.isFrameFullyRead()) {
-          requestInvoke(buffer);
-        }
-      } else {
+      if (!buffer.read()) {
         cleanupSelectionkey(key);
+        return;
+      }
+
+      // if the buffer's frame read is complete, invoke the method.
+      if (buffer.isFrameFullyRead()) {
+        if (!requestInvoke(buffer)) {
+          cleanupSelectionkey(key);
+        }
       }
     }
 


Reply via email to