Added: 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java
URL: 
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java?view=auto&rev=467075
==============================================================================
--- 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java
 (added)
+++ 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java
 Mon Oct 23 11:38:50 2006
@@ -0,0 +1,373 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl.io;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * Copied from JDK 1.4 and slightly modified
+ *
+ * A piped input stream should be connected
+ * to a piped output stream; the piped  input
+ * stream then provides whatever data bytes
+ * are written to the piped output  stream.
+ * Typically, data is read from a <code>PipedInputStream</code>
+ * object by one thread  and data is written
+ * to the corresponding <code>PipedOutputStream</code>
+ * by some  other thread. Attempting to use
+ * both objects from a single thread is not
+ * recommended, as it may deadlock the thread.
+ * The piped input stream contains a buffer,
+ * decoupling read operations from write operations,
+ * within limits.
+ *
+ * @author James Gosling
+ * @version 1.32, 01/23/03
+ * @see java.io.PipedOutputStream
+ * @since JDK1.0
+ */
+public class PipedInputStream extends InputStream {
+    boolean closedByWriter = false;
+    boolean closedByReader = false;
+    boolean connected = false;
+
+    /* REMIND: identification of the read and write sides needs to be
+        more sophisticated.  Either using thread groups (but what about
+        pipes within a thread?) or using finalization (but it may be a
+        long time until the next GC). */
+    Thread readSide;
+    Thread writeSide;
+
+    /**
+     * The size of the pipe's circular input buffer.
+     *
+     * @since JDK1.1
+     */
+    protected static final int PIPE_SIZE = 1024;
+
+    /**
+     * The circular buffer into which incoming data is placed.
+     *
+     * @since JDK1.1
+     */
+    protected byte buffer[] = new byte[PIPE_SIZE];
+
+    /**
+     * The index of the position in the circular buffer at which the
+     * next byte of data will be stored when received from the connected
+     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
+     * <code>in==out</code> implies the buffer is full
+     *
+     * @since JDK1.1
+     */
+    protected int in = -1;
+
+    /**
+     * The index of the position in the circular buffer at which the next
+     * byte of data will be read by this piped input stream.
+     *
+     * @since JDK1.1
+     */
+    protected int out = 0;
+
+    /**
+     * Creates a <code>PipedInputStream</code> so
+     * that it is connected to the piped output
+     * stream <code>src</code>. Data bytes written
+     * to <code>src</code> will then be  available
+     * as input from this stream.
+     *
+     * @param src the stream to connect to.
+     * @throws java.io.IOException if an I/O error occurs.
+     */
+    public PipedInputStream(PipedOutputStream src) throws IOException {
+        connect(src);
+    }
+
+    /**
+     * Creates a <code>PipedInputStream</code> so
+     * that it is not  yet connected. It must be
+     * connected to a <code>PipedOutputStream</code>
+     * before being used.
+     *
+     * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
+     * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
+     */
+    public PipedInputStream() {
+    }
+
+    /**
+     * Causes this piped input stream to be connected
+     * to the piped  output stream <code>src</code>.
+     * If this object is already connected to some
+     * other piped output  stream, an <code>IOException</code>
+     * is thrown.
+     * <p/>
+     * If <code>src</code> is an
+     * unconnected piped output stream and <code>snk</code>
+     * is an unconnected piped input stream, they
+     * may be connected by either the call:
+     * <p/>
+     * <pre><code>snk.connect(src)</code> </pre>
+     * <p/>
+     * or the call:
+     * <p/>
+     * <pre><code>src.connect(snk)</code> </pre>
+     * <p/>
+     * The two
+     * calls have the same effect.
+     *
+     * @param src The piped output stream to connect to.
+     * @throws IOException if an I/O error occurs.
+     */
+    public void connect(PipedOutputStream src) throws IOException {
+        src.connect(this);
+    }
+
+    /**
+     * Receives a byte of data.
+     *
+     * @param b the byte being received
+     * @return bytes received (i.e. 1) or -1 if buffer is full
+     * @throws IOException If the pipe is broken.
+     * @since JDK1.1
+     */
+    protected synchronized int receive(int b) throws IOException {
+        if (!connected) {
+            throw new IOException("Pipe not connected");
+        } else if (closedByWriter || closedByReader) {
+            throw new IOException("Pipe closed");
+        } else if (readSide != null && !readSide.isAlive()) {
+            throw new IOException("Read end dead");
+        }
+
+        if (in == out) {
+            // buffer full
+            return -1;
+        }
+
+        writeSide = Thread.currentThread();
+        while (in == out) {
+            if ((readSide != null) && !readSide.isAlive()) {
+                throw new IOException("Pipe broken");
+            }
+            /* full: kick any waiting readers */
+            notifyAll();
+            try {
+                wait(100);
+            } catch (InterruptedException ex) {
+                throw new java.io.InterruptedIOException();
+            }
+        }
+        if (in < 0) {
+            in = 0;
+            out = 0;
+        }
+        buffer[in++] = (byte) (b & 0xFF);
+        if (in >= buffer.length) {
+            in = 0;
+        }
+        return 1;
+    }
+
+    /**
+     * Receives data into an array of bytes.  This method will
+     * block until some input is available.
+     *
+     * @param b   the buffer into which the data is received
+     * @param off the start offset of the data
+     * @param len the maximum number of bytes received
+     * @return the actual number of bytes received, -1 is
+     *         returned when the buffer is full
+     * @throws IOException If an I/O error has occurred.
+     */
+    synchronized int receive(byte b[], int off, int len) throws IOException {
+        int read = 0;
+        while (--len >= 0) {
+            if (receive(b[off++]) == -1) {
+                return read;
+            } else {
+                read++;
+            }
+        }
+        return -1; // should never happen
+    }
+
+    /**
+     * Notifies all waiting threads that the last byte of data has been
+     * received.
+     */
+    synchronized void receivedLast() {
+        closedByWriter = true;
+        notifyAll();
+    }
+
+    /**
+     * Reads the next byte of data from this piped input stream. The
+     * value byte is returned as an <code>int</code> in the range
+     * <code>0</code> to <code>255</code>. If no byte is available
+     * because the end of the stream has been reached, the value
+     * <code>-1</code> is returned. This method blocks until input data
+     * is available, the end of the stream is detected, or an exception
+     * is thrown.
+     * If a thread was providing data bytes
+     * to the connected piped output stream, but
+     * the  thread is no longer alive, then an
+     * <code>IOException</code> is thrown.
+     *
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     * @throws IOException if the pipe is broken.
+     */
+    public synchronized int read() throws IOException {
+        if (!connected) {
+            throw new IOException("Pipe not connected");
+        } else if (closedByReader) {
+            throw new IOException("Pipe closed");
+        } else if (writeSide != null && !writeSide.isAlive()
+            && !closedByWriter && (in < 0)) {
+            throw new IOException("Write end dead");
+        }
+
+        readSide = Thread.currentThread();
+        int trials = 2;
+        while (in < 0) {
+            if (closedByWriter) {
+                /* closed by writer, return EOF */
+                return -1;
+            }
+            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 
0)) {
+                throw new IOException("Pipe broken");
+            }
+            /* might be a writer waiting */
+            notifyAll();
+            try {
+                wait(100);
+            } catch (InterruptedException ex) {
+                throw new java.io.InterruptedIOException();
+            }
+        }
+        int ret = buffer[out++] & 0xFF;
+        if (out >= buffer.length) {
+            out = 0;
+        }
+        if (in == out) {
+            /* now empty */
+            in = -1;
+        }
+        return ret;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data from this piped input
+     * stream into an array of bytes. Less than <code>len</code> bytes
+     * will be read if the end of the data stream is reached. This method
+     * blocks until at least one byte of input is available.
+     * If a thread was providing data bytes
+     * to the connected piped output stream, but
+     * the  thread is no longer alive, then an
+     * <code>IOException</code> is thrown.
+     *
+     * @param b   the buffer into which the data is read.
+     * @param off the start offset of the data.
+     * @param len the maximum number of bytes read.
+     * @return the total number of bytes read into the buffer, or
+     *         <code>-1</code> if there is no more data because the end of
+     *         the stream has been reached.
+     * @throws IOException if an I/O error occurs.
+     */
+    public synchronized int read(byte b[], int off, int len) throws 
IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        /* possibly wait on the first character */
+        int c = read();
+        if (c < 0) {
+            return -1;
+        }
+        b[off] = (byte) c;
+        int rlen = 1;
+        while ((in >= 0) && (--len > 0)) {
+            b[off + rlen] = buffer[out++];
+            rlen++;
+            if (out >= buffer.length) {
+                out = 0;
+            }
+            if (in == out) {
+                /* now empty */
+                in = -1;
+            }
+        }
+        return rlen;
+    }
+
+    /**
+     * Returns the number of bytes that can be read from this input
+     * stream without blocking. This method overrides the 
<code>available</code>
+     * method of the parent class.
+     *
+     * @return the number of bytes that can be read from this input stream
+     *         without blocking.
+     * @throws IOException if an I/O error occurs.
+     * @since JDK1.0.2
+     */
+    public synchronized int available() throws IOException {
+        if (in < 0)
+            return 0;
+        else if (in == out)
+            return buffer.length;
+        else if (in > out)
+            return in - out;
+        else
+            return in + buffer.length - out;
+    }
+
+    /**
+     * Return the number of bytes that can be written into the internal buffer
+     * of this input stream
+     * @return the number of bytes which can be written
+     */
+    public synchronized int availableForWrite() {
+        if (in < 0) {
+            // buffer emptry
+            return buffer.length;
+        } else if (in == out) {
+            // buffer full
+            return 0;
+        } else {
+            // out < in since if as soon as out == in, in is resetted
+            return buffer.length - in;            
+        }
+    }
+
+    /**
+     * Closes this piped input stream and releases any system resources
+     * associated with the stream.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    public void close() throws IOException {
+        in = -1;
+        closedByReader = true;
+    }
+}

Added: 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java
URL: 
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java?view=auto&rev=467075
==============================================================================
--- 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java
 (added)
+++ 
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java
 Mon Oct 23 11:38:50 2006
@@ -0,0 +1,181 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.axis2.transport.niohttp.impl.io;
+
+import java.io.*;
+
+/**
+ * Copied from JDK 1.4 and slightly modified
+ *
+ * A piped output stream can be connected to a piped input stream
+ * to create a communications pipe. The piped output stream is the
+ * sending end of the pipe. Typically, data is written to a
+ * <code>PipedOutputStream</code> object by one thread and data is
+ * read from the connected <code>PipedInputStream</code> by some
+ * other thread. Attempting to use both objects from a single thread
+ * is not recommended as it may deadlock the thread.
+ *
+ * @author James Gosling
+ * @version 1.25, 01/23/03
+ * @see java.io.PipedInputStream
+ * @since JDK1.0
+ */
+public class PipedOutputStream extends OutputStream {
+
+    /* REMIND: identification of the read and write sides needs to be
+        more sophisticated.  Either using thread groups (but what about
+        pipes within a thread?) or using finalization (but it may be a
+        long time until the next GC). */
+    private PipedInputStream sink;
+
+    /**
+     * Creates a piped output stream connected to the specified piped
+     * input stream. Data bytes written to this stream will then be
+     * available as input from <code>snk</code>.
+     *
+     * @param snk The piped input stream to connect to.
+     * @throws IOException if an I/O error occurs.
+     */
+    public PipedOutputStream(PipedInputStream snk) throws IOException {
+        connect(snk);
+    }
+
+    /**
+     * Creates a piped output stream that is not yet connected to a
+     * piped input stream. It must be connected to a piped input stream,
+     * either by the receiver or the sender, before being used.
+     *
+     * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
+     * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
+     */
+    public PipedOutputStream() {
+    }
+
+    /**
+     * Connects this piped output stream to a receiver. If this object
+     * is already connected to some other piped input stream, an
+     * <code>IOException</code> is thrown.
+     * <p/>
+     * If <code>snk</code> is an unconnected piped input stream and
+     * <code>src</code> is an unconnected piped output stream, they may
+     * be connected by either the call:
+     * <blockquote><pre>
+     * src.connect(snk)</pre></blockquote>
+     * or the call:
+     * <blockquote><pre>
+     * snk.connect(src)</pre></blockquote>
+     * The two calls have the same effect.
+     *
+     * @param snk the piped input stream to connect to.
+     * @throws IOException if an I/O error occurs.
+     */
+    public synchronized void connect(PipedInputStream snk) throws IOException {
+        if (snk == null) {
+            throw new NullPointerException();
+        } else if (sink != null || snk.connected) {
+            throw new IOException("Already connected");
+        }
+        sink = snk;
+        snk.in = -1;
+        snk.out = 0;
+        snk.connected = true;
+    }
+
+    /**
+     * Writes the specified <code>byte</code> to the piped output stream.
+     * If a thread was reading data bytes from the connected piped input
+     * stream, but the thread is no longer alive, then an
+     * <code>IOException</code> is thrown.
+     * <p/>
+     * Implements the <code>write</code> method of <code>OutputStream</code>.
+     *
+     * @param b the <code>byte</code> to be written.
+     * @throws IOException if an I/O error occurs or if
+     *         the internal buffer of the connected Piped input stream is full
+     */
+    public void write(int b) throws IOException {
+        if (sink == null) {
+            throw new IOException("Pipe not connected");
+        }
+        if (sink.availableForWrite() == 0) {
+            throw new IOException("Internal buffer of the Piped input stream 
is full");
+        }
+        sink.receive(b);
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array
+     * starting at offset <code>off</code> to this piped output stream.
+     * If a thread was reading data bytes from the connected piped input
+     * stream, but the thread is no longer alive, then an
+     * <code>IOException</code> is thrown.
+     *
+     * @param b   the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @throws IOException if an I/O error occurs.
+     */
+    public void write(byte b[], int off, int len) throws IOException {
+        if (sink == null) {
+            throw new IOException("Pipe not connected");
+        } else if (b == null) {
+            throw new NullPointerException();
+        } else if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return;
+        }
+        sink.receive(b, off, len);
+    }
+
+    /**
+     * Flushes this output stream and forces any buffered output bytes
+     * to be written out.
+     * This will notify any readers that bytes are waiting in the pipe.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    public synchronized void flush() throws IOException {
+        if (sink != null) {
+            synchronized (sink) {
+                sink.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Closes this piped output stream and releases any system resources
+     * associated with this stream. This stream may no longer be used for
+     * writing bytes.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    public void close() throws IOException {
+        if (sink != null) {
+            sink.receivedLast();
+        }
+    }
+
+    /**
+     * Returns the number of bytes that can be written to this Piped output 
stream
+     * without blocking.
+     * @return number of bytes that can be written without blocking
+     */
+    public int availableForWrite() {
+        return sink.availableForWrite();
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to