Author: bodewig
Date: Thu Nov 6 06:33:43 2008
New Revision: 711860
URL: http://svn.apache.org/viewvc?rev=711860&view=rev
Log:
emulate async I/O when processing output of forked processes in order to deal
with the case where a child of the forked process outlives its parent. PR
5003. Based on a patch by Adam Sotona.
Modified:
ant/core/trunk/CONTRIBUTORS
ant/core/trunk/WHATSNEW
ant/core/trunk/contributors.xml
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java
Modified: ant/core/trunk/CONTRIBUTORS
URL:
http://svn.apache.org/viewvc/ant/core/trunk/CONTRIBUTORS?rev=711860&r1=711859&r2=711860&view=diff
==============================================================================
Binary files - no diff available.
Modified: ant/core/trunk/WHATSNEW
URL:
http://svn.apache.org/viewvc/ant/core/trunk/WHATSNEW?rev=711860&r1=711859&r2=711860&view=diff
==============================================================================
--- ant/core/trunk/WHATSNEW (original)
+++ ant/core/trunk/WHATSNEW Thu Nov 6 06:33:43 2008
@@ -113,6 +113,14 @@
http://ant.apache.org/antlibs/dotnet/index.html
instead.
+ * the logic of closing streams connected to forked processes (read
+ the input and output of <exec> and friends) has been changed to
+ deal with cases where child processes of the forked processes live
+ longer than their parents and keep Ant from exiting.
+ It is unlikely but possible that the changed logic breaks stream
+ handling on certain Java VMs.
+ Bugzilla issue 5003.
+
Fixed bugs:
-----------
Modified: ant/core/trunk/contributors.xml
URL:
http://svn.apache.org/viewvc/ant/core/trunk/contributors.xml?rev=711860&r1=711859&r2=711860&view=diff
==============================================================================
--- ant/core/trunk/contributors.xml (original)
+++ ant/core/trunk/contributors.xml Thu Nov 6 06:33:43 2008
@@ -39,6 +39,10 @@
<last>Bryzak</last>
</name>
<name>
+ <first>Adam</first>
+ <last>Sotona</last>
+ </name>
+ <name>
<first>Aleksandr</first>
<last>Ishutin</last>
</name>
Modified:
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
URL:
http://svn.apache.org/viewvc/ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java?rev=711860&r1=711859&r2=711860&view=diff
==============================================================================
---
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
(original)
+++
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
Thu Nov 6 06:33:43 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.tools.ant.taskdefs.condition.Os;
/**
* Copies standard output and error of subprocesses to standard output and
@@ -129,16 +130,8 @@
* Stop pumping the streams.
*/
public void stop() {
- try {
- outputThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- try {
- errorThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
+ finish(outputThread);
+ finish(errorThread);
if (inputPump != null) {
inputPump.stop();
@@ -156,6 +149,35 @@
}
}
+ private static final long JOIN_TIMEOUT = 500;
+
+ /**
+ * Waits for a thread to finish while trying to make it finish
+ * quicker by stopping the pumper (if the thread is a [EMAIL PROTECTED]
+ * ThreadWithPumper ThreadWithPumper} instance) or interrupting
+ * the thread.
+ *
+ * @since Ant 1.8.0
+ */
+ protected final void finish(Thread t) {
+ try {
+ t.join(JOIN_TIMEOUT);
+ StreamPumper s = null;
+ if (t instanceof ThreadWithPumper) {
+ s = ((ThreadWithPumper) t).getPumper();
+ }
+ if (s != null && !s.isFinished()) {
+ s.stop();
+ }
+ while ((s == null || !s.isFinished()) && t.isAlive()) {
+ t.interrupt();
+ t.join(JOIN_TIMEOUT);
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
/**
* Get the error stream.
* @return <code>OutputStream</code>.
@@ -207,12 +229,16 @@
* @param is the input stream to copy from.
* @param os the output stream to copy to.
* @param closeWhenExhausted if true close the inputstream.
- * @return a thread object that does the pumping.
+ * @return a thread object that does the pumping, subclasses
+ * should return an instance of [EMAIL PROTECTED] ThreadWithPumper
+ * ThreadWithPumper}.
*/
protected Thread createPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
final Thread result
- = new Thread(new StreamPumper(is, os, closeWhenExhausted));
+ = new ThreadWithPumper(new StreamPumper(is, os,
+ closeWhenExhausted,
+ Os.isFamily("windows")));
result.setDaemon(true);
return result;
}
@@ -224,9 +250,25 @@
*/
/*protected*/ StreamPumper createInputPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
- StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted);
+ StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted,
+ false);
pumper.setAutoflush(true);
return pumper;
}
+ /**
+ * Specialized subclass that allows access to the running StreamPumper.
+ *
+ * @since Ant 1.8.0
+ */
+ protected static class ThreadWithPumper extends Thread {
+ private final StreamPumper pumper;
+ public ThreadWithPumper(StreamPumper p) {
+ super(p);
+ pumper = p;
+ }
+ protected StreamPumper getPumper() {
+ return pumper;
+ }
+ }
}
Modified:
ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java
URL:
http://svn.apache.org/viewvc/ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java?rev=711860&r1=711859&r2=711860&view=diff
==============================================================================
--- ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java
(original)
+++ ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java Thu
Nov 6 06:33:43 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.tools.ant.util.FileUtils;
/**
* Copies all data from an input stream to an output stream.
@@ -30,15 +31,16 @@
private static final int SMALL_BUFFER_SIZE = 128;
- private InputStream is;
- private OutputStream os;
+ private final InputStream is;
+ private final OutputStream os;
private volatile boolean finish;
private volatile boolean finished;
- private boolean closeWhenExhausted;
+ private final boolean closeWhenExhausted;
private boolean autoflush = false;
private Exception exception = null;
private int bufferSize = SMALL_BUFFER_SIZE;
private boolean started = false;
+ private final boolean useAvailable;
/**
* Create a new StreamPumper.
@@ -49,9 +51,40 @@
* the input is exhausted.
*/
public StreamPumper(InputStream is, OutputStream os, boolean
closeWhenExhausted) {
+ this(is, os, closeWhenExhausted, false);
+ }
+
+
+ /**
+ * Create a new StreamPumper.
+ *
+ * <p><b>Note:</b> If you set useAvailable to true, you must
+ * explicitly invoke [EMAIL PROTECTED] #stop stop} or interrupt the
+ * corresponding Thread when you are done or the run method will
+ * never finish on some JVMs (namely those where available returns
+ * 0 on a closed stream). Setting it to true may also impact
+ * performance negatively. This flag should only be set to true
+ * if you intend to stop the pumper before the input stream gets
+ * closed.</p>
+ *
+ * @param is input stream to read data from
+ * @param os output stream to write data to.
+ * @param closeWhenExhausted if true, the output stream will be closed when
+ * the input is exhausted.
+ * @param useAvailable whether the pumper should use [EMAIL PROTECTED]
+ * java.io.InputStream#available available} to determine
+ * whether input is ready, thus trying to emulate
+ * non-blocking behavior.
+ *
+ * @since Ant 1.8.0
+ */
+ public StreamPumper(InputStream is, OutputStream os,
+ boolean closeWhenExhausted,
+ boolean useAvailable) {
this.is = is;
this.os = os;
this.closeWhenExhausted = closeWhenExhausted;
+ this.useAvailable = useAvailable;
}
/**
@@ -90,8 +123,14 @@
int length;
try {
while (true) {
+ waitForInput(is);
+
+ if (finish || Thread.interrupted()) {
+ break;
+ }
+
length = is.read(buf);
- if ((length <= 0) || finish) {
+ if (length <= 0 || finish || Thread.interrupted()) {
break;
}
os.write(buf, 0, length);
@@ -100,17 +139,15 @@
}
}
os.flush();
+ } catch (InterruptedException ie) {
+ // likely PumpStreamHandler trying to stop us
} catch (Exception e) {
synchronized (this) {
exception = e;
}
} finally {
if (closeWhenExhausted) {
- try {
- os.close();
- } catch (IOException e) {
- // ignore
- }
+ FileUtils.close(os);
}
finished = true;
synchronized (this) {
@@ -177,4 +214,22 @@
finish = true;
notifyAll();
}
+
+ private static final long POLL_INTERVAL = 100;
+
+ private void waitForInput(InputStream is)
+ throws IOException, InterruptedException {
+ if (useAvailable) {
+ while (!finish && is.available() == 0) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ synchronized (this) {
+ this.wait(POLL_INTERVAL);
+ }
+ }
+ }
+ }
+
}