Repository: ant Updated Branches: refs/heads/master de2104496 -> 00a4216d4
BZ-58451 BZ-58833 Give StreamPumper a chance to finish cleanly before interrupting its thread, to prevent truncated output Project: http://git-wip-us.apache.org/repos/asf/ant/repo Commit: http://git-wip-us.apache.org/repos/asf/ant/commit/6c860e00 Tree: http://git-wip-us.apache.org/repos/asf/ant/tree/6c860e00 Diff: http://git-wip-us.apache.org/repos/asf/ant/diff/6c860e00 Branch: refs/heads/master Commit: 6c860e0036a8742f4cc35e3e6a8e595e42804c89 Parents: d43d83e Author: Jaikiran Pai <[email protected]> Authored: Sat Dec 16 15:18:30 2017 +0530 Committer: Jaikiran Pai <[email protected]> Committed: Thu Jan 4 11:13:09 2018 +0530 ---------------------------------------------------------------------- WHATSNEW | 4 + .../taskdefs/exec/exec-with-redirector.xml | 122 ++++++++++++++++++ .../tools/ant/taskdefs/PumpStreamHandler.java | 16 ++- .../apache/tools/ant/taskdefs/StreamPumper.java | 124 ++++++++++++++----- .../ant/taskdefs/ExecStreamRedirectorTest.java | 91 ++++++++++++++ 5 files changed, 322 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ant/blob/6c860e00/WHATSNEW ---------------------------------------------------------------------- diff --git a/WHATSNEW b/WHATSNEW index 2b383da..235e723 100644 --- a/WHATSNEW +++ b/WHATSNEW @@ -23,6 +23,10 @@ Fixed bugs: suggesting the fix. Bugzilla Report 19516 + * Fixed an issue where the content redirected from output/error + streams of a process, could end up being truncated. + Bugzilla Report 58833, 58451 + Other changes: -------------- http://git-wip-us.apache.org/repos/asf/ant/blob/6c860e00/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml ---------------------------------------------------------------------- diff --git a/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml b/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml new file mode 100644 index 0000000..27b680d --- /dev/null +++ b/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml @@ -0,0 +1,122 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="exec-redirector-test" basedir="."> + + <target name="setUp"> + <!-- This "output" property is set on the project in the Java test case (ExecStreamRedirectorTest) --> + <mkdir dir="${output}"/> + <condition property="dir.listing.command" value="ls" else="cmd.exe"> + <os family="unix"/> + </condition> + <condition property="dir.to.ls" value="/usr/bin" else="${user.dir}"> + <os family="unix"/> + </condition> + <condition property="dir.listing.command.arg" value="-l" else="dir"> + <os family="unix"/> + </condition> + <property name="dir.to.ls" value="/usr/bin"/> + </target> + + <target name="list-dir"> + <!-- Just do listing of the same directory and redirect the output to different files --> + <parallel> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls1.txt" error="${output}/ls1.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls2.txt" error="${output}/ls2.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls3.txt" error="${output}/ls3.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls4.txt" error="${output}/ls4.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls5.txt" error="${output}/ls5.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls6.txt" error="${output}/ls6.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls7.txt" error="${output}/ls7.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls8.txt" error="${output}/ls8.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls9.txt" error="${output}/ls9.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls10.txt" error="${output}/ls10.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls11.txt" error="${output}/ls11.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls12.txt" error="${output}/ls12.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls13.txt" error="${output}/ls13.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls14.txt" error="${output}/ls14.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls15.txt" error="${output}/ls15.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + <exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true"> + <redirector output="${output}/ls16.txt" error="${output}/ls16.err" alwayslog="true"/> + <arg value="${dir.listing.command.arg}"/> + <arg value="${dir.to.ls}"/> + </exec> + </parallel> + </target> + +</project> http://git-wip-us.apache.org/repos/asf/ant/blob/6c860e00/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java ---------------------------------------------------------------------- diff --git a/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java b/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java index 9a23947..31671fc 100644 --- a/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java +++ b/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java @@ -21,6 +21,7 @@ package org.apache.tools.ant.taskdefs; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.TimeUnit; import org.apache.tools.ant.util.FileUtils; @@ -183,12 +184,21 @@ public class PumpStreamHandler implements ExecuteStreamHandler { if (!t.isAlive()) { return; } - + StreamPumper.PostStopHandle postStopHandle = null; if (s != null && !s.isFinished()) { - s.stop(); + postStopHandle = s.stop(); + } + if (postStopHandle != null && postStopHandle.isInPostStopTasks()) { + // the stream pumper is in post stop tasks (like flushing output), which + // indicates that the stream pumper has respected the stop request and + // is cleaning up before finishing. Give it some time to finish this + // post stop activity, before trying to force interrupt the underlying thread + // of the stream pumper + postStopHandle.awaitPostStopCompletion(2, TimeUnit.SECONDS); } - t.join(JOIN_TIMEOUT); while ((s == null || !s.isFinished()) && t.isAlive()) { + // we waited for the thread/stream pumper to finish, but it hasn't yet. + // so we interrupt it t.interrupt(); t.join(JOIN_TIMEOUT); } http://git-wip-us.apache.org/repos/asf/ant/blob/6c860e00/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java ---------------------------------------------------------------------- diff --git a/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java b/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java index 676b5fb..c7afc4d 100644 --- a/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java +++ b/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java @@ -17,11 +17,13 @@ */ package org.apache.tools.ant.taskdefs; +import org.apache.tools.ant.util.FileUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - -import org.apache.tools.ant.util.FileUtils; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * Copies all data from an input stream to an output stream. @@ -34,7 +36,7 @@ public class StreamPumper implements Runnable { private final InputStream is; private final OutputStream os; - private volatile boolean finish; + private volatile boolean askedToStop; private volatile boolean finished; private final boolean closeWhenExhausted; private boolean autoflush = false; @@ -42,6 +44,7 @@ public class StreamPumper implements Runnable { private int bufferSize = SMALL_BUFFER_SIZE; private boolean started = false; private final boolean useAvailable; + private PostStopHandle postStopHandle; /** * Create a new StreamPumper. @@ -58,7 +61,6 @@ public class StreamPumper implements Runnable { /** * Create a new StreamPumper. - * * <p><b>Note:</b> If you set useAvailable to true, you must * explicitly invoke {@link #stop stop} or interrupt the * corresponding Thread when you are done or the run method will @@ -120,41 +122,29 @@ public class StreamPumper implements Runnable { final byte[] buf = new byte[bufferSize]; - int length; try { - while (true) { + int length; + while (!this.askedToStop && !Thread.interrupted()) { waitForInput(is); - if (finish || Thread.interrupted()) { + if (askedToStop || Thread.interrupted()) { break; } length = is.read(buf); - if (length <= 0 || Thread.interrupted()) { + if (length < 0) { + // EOF break; } - os.write(buf, 0, length); - if (autoflush) { - os.flush(); - } - if (finish) { //NOSONAR - break; - } - } - // On completion, drain any available data (which might be the first data available for quick executions) - if (finish) { - while ((length = is.available()) > 0) { - if (Thread.interrupted()) { - break; - } - length = is.read(buf, 0, Math.min(length, buf.length)); - if (length <= 0) { - break; - } + if (length > 0) { + // we did read something, so write it out os.write(buf, 0, length); + if (autoflush) { + os.flush(); + } } } - os.flush(); + this.doPostStop(); } catch (InterruptedException ie) { // likely PumpStreamHandler trying to stop us } catch (Exception e) { @@ -166,7 +156,7 @@ public class StreamPumper implements Runnable { FileUtils.close(os); } finished = true; - finish = false; + askedToStop = false; synchronized (this) { notifyAll(); } @@ -206,6 +196,7 @@ public class StreamPumper implements Runnable { /** * Get the size in bytes of the read buffer. + * * @return the int size of the read buffer. */ public synchronized int getBufferSize() { @@ -225,19 +216,26 @@ public class StreamPumper implements Runnable { * Note that it may continue to block on the input stream * but it will really stop the thread as soon as it gets EOF * or any byte, and it will be marked as finished. + * @return Returns a {@link PostStopHandle} for the callers to + * know if the status of post-stop activities, that happen, before this + * {@link StreamPumper} is actually finished * @since Ant 1.6.3 + * @since Ant 10.2.0 this method returns a {@link PostStopHandle} */ - /*package*/ synchronized void stop() { - finish = true; + /*package*/ + synchronized PostStopHandle stop() { + askedToStop = true; + postStopHandle = new PostStopHandle(); notifyAll(); + return postStopHandle; } private static final long POLL_INTERVAL = 100; private void waitForInput(InputStream is) - throws IOException, InterruptedException { + throws IOException, InterruptedException { if (useAvailable) { - while (!finish && is.available() == 0) { + while (!askedToStop && is.available() == 0) { if (Thread.interrupted()) { throw new InterruptedException(); } @@ -249,4 +247,66 @@ public class StreamPumper implements Runnable { } } + private void doPostStop() throws IOException { + try { + final byte[] buf = new byte[bufferSize]; + int length; + // We were asked to stop, the contract allows us to do any non-blocking + // final bits of reads, before actually finishing. So we try and drain any (non-blocking) available + // data. We *don't* check the thread interrupt status, anymore, once we start draining this non-blocking + // available data, to allow us to cleanly write out any available data. + if (askedToStop) { + int bytesReadableWithoutBlocking; + while ((bytesReadableWithoutBlocking = is.available()) > 0) { + length = is.read(buf, 0, Math.min(bytesReadableWithoutBlocking, buf.length)); + if (length <= 0) { + break; + } + os.write(buf, 0, length); + } + } + // this can potentially be blocking, but that's OK since our post stop activity is allowed to + // cleanup/flush any data and the PostStopHandle let's the caller control over how long they want + // this to go, before actually interrupting the thread + os.flush(); + } finally { + if (this.postStopHandle != null) { + this.postStopHandle.latch.countDown(); + this.postStopHandle.inPostStopTasks = false; + } + } + } + + /** + * A handle that can be used after {@link #stop()} has been invoked to check if the + * {@link StreamPumper} is in the process of do some post-stop tasks (like flushing + * of streams), before finishing. + */ + final class PostStopHandle { + private boolean inPostStopTasks = true; + private final CountDownLatch latch = new CountDownLatch(1); + + /** + * Returns true if the {@link StreamPumper} is doing post-stop tasks (like flushing of streams). + * Else returns false. + * @return + */ + boolean isInPostStopTasks() { + return inPostStopTasks; + } + + /** + * Waits for a maximum of {@code timeout} time for the post-stop activities to complete. + * + * @param timeout The maximum amount of time to wait for the post-stop activities to complete + * @param timeUnit The unit of {@code timeout} + * @return Returns true if the post-stop activities completed within the specified {@code timeout}. + * Else returns false + * @throws InterruptedException If the current thread was interrupted while waiting + */ + boolean awaitPostStopCompletion(final long timeout, final TimeUnit timeUnit) throws InterruptedException { + return this.latch.await(timeout, timeUnit); + } + } + } http://git-wip-us.apache.org/repos/asf/ant/blob/6c860e00/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java ---------------------------------------------------------------------- diff --git a/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java b/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java new file mode 100644 index 0000000..ba39fd5 --- /dev/null +++ b/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java @@ -0,0 +1,91 @@ +package org.apache.tools.ant.taskdefs; + +import org.apache.tools.ant.Project; +import org.apache.tools.ant.ProjectHelper; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@code exec} task which uses a {@code redirector} to redirect its output and error streams + */ +public class ExecStreamRedirectorTest { + + private Project project; + + @Before + public void setUp() throws Exception { + project = new Project(); + project.init(); + final File antFile = new File(System.getProperty("root"), "src/etc/testcases/taskdefs/exec/exec-with-redirector.xml"); + project.setUserProperty("ant.file", antFile.getAbsolutePath()); + final File outputDir = this.createTmpDir(); + project.setUserProperty("output", outputDir.toString()); + ProjectHelper.configureProject(project, antFile); + project.executeTarget("setUp"); + } + + /** + * Tests that the redirected streams of the exec'ed process aren't truncated. + * + * @throws Exception + * @see <a href="https://bz.apache.org/bugzilla/show_bug.cgi?id=58451">bz-58451</a> and + * <a href="https://bz.apache.org/bugzilla/show_bug.cgi?id=58833">bz-58833</a> for more details + */ + @Test + public void testRedirection() throws Exception { + final String dirToList = project.getProperty("dir.to.ls"); + assertNotNull("Directory to list isn't available", dirToList); + assertTrue(dirToList + " is not a directory", new File(dirToList).isDirectory()); + + project.executeTarget("list-dir"); + + // verify the redirected output + final String outputDirPath = project.getProperty("output"); + byte[] dirListingOutput = null; + for (int i = 1; i <= 16; i++) { + final File redirectedOutputFile = new File(outputDirPath, "ls" + i + ".txt"); + assertTrue(redirectedOutputFile + " is missing or not a regular file", redirectedOutputFile.isFile()); + final byte[] redirectedOutput = readAllBytes(redirectedOutputFile); + assertNotNull("No content was redirected to " + redirectedOutputFile, redirectedOutput); + assertFalse("Content in redirected file " + redirectedOutputFile + " was empty", redirectedOutput.length == 0); + if (dirListingOutput != null) { + // compare the directory listing that was redirected to these files. all files should have the same content + assertTrue("Redirected output in file " + redirectedOutputFile + + " doesn't match content in other redirected output file(s)", Arrays.equals(dirListingOutput, redirectedOutput)); + } + dirListingOutput = redirectedOutput; + } + } + + private File createTmpDir() { + final File tmpDir = new File(System.getProperty("java.io.tmpdir"), String.valueOf("temp-" + System.nanoTime())); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + return tmpDir; + } + + private static byte[] readAllBytes(final File file) throws IOException { + final FileInputStream fis = new FileInputStream(file); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + final byte[] dataChunk = new byte[1024]; + int numRead = -1; + while ((numRead = fis.read(dataChunk)) > 0) { + bos.write(dataChunk, 0, numRead); + } + } finally { + fis.close(); + } + return bos.toByteArray(); + } +}
