Author: cutting
Date: Fri Aug 26 09:32:51 2005
New Revision: 240278

URL: http://svn.apache.org/viewcvs?rev=240278&view=rev
Log:
Change so that call timeout is based on time since last i/o rather
than time since call initiated.  This permits large values to be
passed without causing timeouts.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java?rev=240278&r1=240277&r2=240278&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java Fri 
Aug 26 09:32:51 2005
@@ -26,6 +26,8 @@
 import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 
 import java.util.Hashtable;
 import java.util.logging.Logger;
@@ -60,12 +62,15 @@
     Writable param;                               // parameter
     Writable value;                               // value, null if error
     String error;                                 // error, null if value
+    long lastActivity;                            // time of last i/o
+    boolean done;                                 // true when call is done
 
     protected Call(Writable param) {
       this.param = param;
       synchronized (Client.this) {
         this.id = counter++;
       }
+      touch();
     }
 
     /** Called by the connection thread when the call is complete and the
@@ -73,6 +78,19 @@
     public synchronized void callComplete() {
         notify();                                 // notify caller
     }
+
+    /** Update lastActivity with the current time. */
+    public synchronized void touch() {
+      lastActivity = System.currentTimeMillis();
+    }
+
+    /** Update lastActivity with the current time. */
+    public synchronized void setResult(Writable value, String error) {
+      this.value = value;
+      this.error = error;
+      this.done = true;
+    }
+    
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -84,15 +102,34 @@
     private DataInputStream in;                   
     private DataOutputStream out;
     private Hashtable calls = new Hashtable();    // currently active calls
+    private Call readingCall;
+    private Call writingCall;
 
     public Connection(InetSocketAddress address) throws IOException {
       this.address = address;
       this.socket = new Socket(address.getAddress(), address.getPort());
       socket.setSoTimeout(timeout);
       this.in = new DataInputStream
-        (new BufferedInputStream(socket.getInputStream()));
+        (new BufferedInputStream
+         (new FilterInputStream(socket.getInputStream()) {
+             public int read(byte[] buf, int off, int len) throws IOException {
+               int value = super.read(buf, off, len);
+               if (readingCall != null) {
+                 readingCall.touch();
+               }
+               return value;
+             }
+          }));
       this.out = new DataOutputStream
-        (new BufferedOutputStream(socket.getOutputStream()));
+        (new BufferedOutputStream
+         (new FilterOutputStream(socket.getOutputStream()) {
+             public void write(byte[] buf, int o, int len) throws IOException {
+               super.write(buf, o, len);
+               if (writingCall != null) {
+                 writingCall.touch();
+               }
+             }
+           }));
       this.setDaemon(true);
       this.setName("Client connection to "
                    + address.getAddress().getHostAddress()
@@ -118,13 +155,16 @@
           if (isError) {
             UTF8 utf8 = new UTF8();
             utf8.readFields(in);                  // read error string
-            call.error = utf8.toString();
-            call.value = null;
+            call.setResult(null, utf8.toString());
           } else {
             Writable value = makeValue();
-            value.readFields(in);                 // read value
-            call.value = value;
-            call.error = null;
+            try {
+              readingCall = call;
+              value.readFields(in);                 // read value
+            } finally {
+              readingCall = null;
+            }
+            call.setResult(value, null);
           }
           call.callComplete();                   // deliver result to caller
         }
@@ -148,9 +188,14 @@
         synchronized (out) {
           if (LOG.isLoggable(Level.FINE))
             LOG.fine(getName() + " sending #" + call.id);
-          out.writeInt(call.id);
-          call.param.write(out);
-          out.flush();
+          try {
+            writingCall = call;
+            out.writeInt(call.id);
+            call.param.write(out);
+            out.flush();
+          } finally {
+            writingCall = null;
+          }
         }
         error = false;
       } finally {
@@ -237,13 +282,17 @@
     Call call = new Call(param);
     synchronized (call) {
       connection.sendParam(call);                 // send the parameter
-      try {
-        call.wait(timeout);                       // wait for the result
-      } catch (InterruptedException e) {}
+      long wait = timeout;
+      do {
+        try {
+          call.wait(wait);                       // wait for the result
+        } catch (InterruptedException e) {}
+        wait = timeout - (System.currentTimeMillis() - call.lastActivity);
+      } while (!call.done && wait > 0);
 
       if (call.error != null) {
         throw new IOException(call.error);
-      } else if (call.value == null) {
+      } else if (!call.done) {
         throw new IOException("timed out waiting for response");
       } else {
         return call.value;


Reply via email to