Author: cutting
Date: Fri Aug 26 09:32:51 2005
New Revision: 240278
URL: http://svn.apache.org/viewcvs?rev=240278&view=rev
Log:
Change so that call timeout is based on time since last i/o rather
than time since call initiated. This permits large values to be
passed without causing timeouts.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java
URL:
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java?rev=240278&r1=240277&r2=240278&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ipc/Client.java Fri
Aug 26 09:32:51 2005
@@ -26,6 +26,8 @@
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.util.Hashtable;
import java.util.logging.Logger;
@@ -60,12 +62,15 @@
Writable param; // parameter
Writable value; // value, null if error
String error; // error, null if value
+ long lastActivity; // time of last i/o
+ boolean done; // true when call is done
protected Call(Writable param) {
this.param = param;
synchronized (Client.this) {
this.id = counter++;
}
+ touch();
}
/** Called by the connection thread when the call is complete and the
@@ -73,6 +78,19 @@
public synchronized void callComplete() {
notify(); // notify caller
}
+
+ /** Update lastActivity with the current time. */
+ public synchronized void touch() {
+ lastActivity = System.currentTimeMillis();
+ }
+
+ /** Update lastActivity with the current time. */
+ public synchronized void setResult(Writable value, String error) {
+ this.value = value;
+ this.error = error;
+ this.done = true;
+ }
+
}
/** Thread that reads responses and notifies callers. Each connection owns a
@@ -84,15 +102,34 @@
private DataInputStream in;
private DataOutputStream out;
private Hashtable calls = new Hashtable(); // currently active calls
+ private Call readingCall;
+ private Call writingCall;
public Connection(InetSocketAddress address) throws IOException {
this.address = address;
this.socket = new Socket(address.getAddress(), address.getPort());
socket.setSoTimeout(timeout);
this.in = new DataInputStream
- (new BufferedInputStream(socket.getInputStream()));
+ (new BufferedInputStream
+ (new FilterInputStream(socket.getInputStream()) {
+ public int read(byte[] buf, int off, int len) throws IOException {
+ int value = super.read(buf, off, len);
+ if (readingCall != null) {
+ readingCall.touch();
+ }
+ return value;
+ }
+ }));
this.out = new DataOutputStream
- (new BufferedOutputStream(socket.getOutputStream()));
+ (new BufferedOutputStream
+ (new FilterOutputStream(socket.getOutputStream()) {
+ public void write(byte[] buf, int o, int len) throws IOException {
+ super.write(buf, o, len);
+ if (writingCall != null) {
+ writingCall.touch();
+ }
+ }
+ }));
this.setDaemon(true);
this.setName("Client connection to "
+ address.getAddress().getHostAddress()
@@ -118,13 +155,16 @@
if (isError) {
UTF8 utf8 = new UTF8();
utf8.readFields(in); // read error string
- call.error = utf8.toString();
- call.value = null;
+ call.setResult(null, utf8.toString());
} else {
Writable value = makeValue();
- value.readFields(in); // read value
- call.value = value;
- call.error = null;
+ try {
+ readingCall = call;
+ value.readFields(in); // read value
+ } finally {
+ readingCall = null;
+ }
+ call.setResult(value, null);
}
call.callComplete(); // deliver result to caller
}
@@ -148,9 +188,14 @@
synchronized (out) {
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " sending #" + call.id);
- out.writeInt(call.id);
- call.param.write(out);
- out.flush();
+ try {
+ writingCall = call;
+ out.writeInt(call.id);
+ call.param.write(out);
+ out.flush();
+ } finally {
+ writingCall = null;
+ }
}
error = false;
} finally {
@@ -237,13 +282,17 @@
Call call = new Call(param);
synchronized (call) {
connection.sendParam(call); // send the parameter
- try {
- call.wait(timeout); // wait for the result
- } catch (InterruptedException e) {}
+ long wait = timeout;
+ do {
+ try {
+ call.wait(wait); // wait for the result
+ } catch (InterruptedException e) {}
+ wait = timeout - (System.currentTimeMillis() - call.lastActivity);
+ } while (!call.done && wait > 0);
if (call.error != null) {
throw new IOException(call.error);
- } else if (call.value == null) {
+ } else if (!call.done) {
throw new IOException("timed out waiting for response");
} else {
return call.value;