Author: jukka
Date: Sun Oct 7 16:27:19 2007
New Revision: 582698
URL: http://svn.apache.org/viewvc?rev=582698&view=rev
Log:
TIKA-45 - RereadableInputStream needs to be able to read to the end of the
original stream on first rewind
- Committed patch from Keith Bennett
Modified:
incubator/tika/trunk/CHANGES.txt
incubator/tika/trunk/src/main/java/org/apache/tika/utils/RereadableInputStream.java
incubator/tika/trunk/src/test/java/org/apache/tika/RereadableInputStreamTest.java
Modified: incubator/tika/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/tika/trunk/CHANGES.txt?rev=582698&r1=582697&r2=582698&view=diff
==============================================================================
--- incubator/tika/trunk/CHANGES.txt (original)
+++ incubator/tika/trunk/CHANGES.txt Sun Oct 7 16:27:19 2007
@@ -73,3 +73,6 @@
33. TIKA-46 - Use Metadata in Parser (jukka & mattmann)
34. TIKA-48 - Merge MS Extractors and Parsers (jukka)
+
+35. TIKA-45 - RereadableInputStream needs to be able to read to
+ the end of the original stream on first rewind. (K. Bennett)
Modified:
incubator/tika/trunk/src/main/java/org/apache/tika/utils/RereadableInputStream.java
URL:
http://svn.apache.org/viewvc/incubator/tika/trunk/src/main/java/org/apache/tika/utils/RereadableInputStream.java?rev=582698&r1=582697&r2=582698&view=diff
==============================================================================
---
incubator/tika/trunk/src/main/java/org/apache/tika/utils/RereadableInputStream.java
(original)
+++
incubator/tika/trunk/src/main/java/org/apache/tika/utils/RereadableInputStream.java
Sun Oct 7 16:27:19 2007
@@ -26,30 +26,139 @@
import java.io.InputStream;
import java.io.OutputStream;
+
+/**
+ * Wraps an input stream, reading it only once, but making it available
+ * for rereading an arbitrary number of times. The stream's bytes are
+ * stored in memory up to a user specified maximum, and then stored in a
+ * temporary file which is deleted when this class' close() method is called.
+ */
public class RereadableInputStream extends InputStream {
+ /**
+ * The inputStream currently being used by this object to read contents;
+ * may be the original stream passed in, or a stream that reads
+ * the saved copy.
+ */
private InputStream inputStream;
+ /**
+ * Maximum number of bytes that can be stored in memory before
+ * storage will be moved to a temporary file.
+ */
private int maxBytesInMemory;
+ /**
+ * True when the original stream is being read; set to false when
+ * reading is set to use the stored data instead.
+ */
private boolean firstPass = true;
+ /**
+ * Whether or not the stream's contents are being stored in a file
+ * as opposed to memory.
+ */
private boolean bufferIsInFile;
+ /**
+ * The buffer used to store the stream's content; this storage is moved
+ * to a file when the stored data's size exceeds maxBytesInMemory.
+ */
private byte[] byteBuffer;
+ /**
+ * The total number of bytes read from the original stream at the time.
+ */
private int size;
+ /**
+ * File used to store the stream's contents; is null until the stored
+ * content's size exceeds maxBytesInMemory.
+ */
private File storeFile;
+ /**
+ * OutputStream used to save the content of the input stream in a
+ * temporary file.
+ */
private OutputStream storeOutputStream;
+
+ /**
+ * Specifies whether or not to read to the end of stream on first
+ * rewind. This defaults to true. If this is set to false,
+ * then the first time when rewind() is called, only those bytes
+ * already read from the original stream will be available from then on.
+ */
+ private boolean readToEndOfStreamOnFirstRewind = true;
+
+ // TODO: At some point it would be better to replace the current approach
+ // (specifying the above) with more automated behavior. The stream could
+ // keep the original stream open until EOF was reached. For example, if:
+ //
+ // the original stream is 10 bytes, and
+ // only 2 bytes are read on the first pass
+ // rewind() is called
+ // 5 bytes are read
+ //
+ // In this case, this instance gets the first 2 from its store,
+ // and the next 3 from the original stream, saving those additional 3
+ // bytes in the store. In this way, only the maximum number of bytes
+ // ever needed must be saved in the store; unused bytes are never read.
+ // The original stream is closed when EOF is reached, or when close()
+ // is called, whichever comes first. Using this approach eliminates
+ // the need to specify the flag (though makes implementation more complex).
+
+
+
+ /**
+ * Creates a rereadable input stream.
+ *
+ * @param inputStream stream containing the source of data
+ * @param maxBytesInMemory maximum number of bytes to use to store
+ * the stream's contents in memory before switching to disk; note that
+ * the instance will preallocate a byte array whose size is
+ * maxBytesInMemory. This byte array will be made available for
+ * garbage collection (i.e. its reference set to null) when the
+ * content size exceeds the array's size, when close() is called, or
+ * when there are no more references to the instance.
+ */
public RereadableInputStream(InputStream inputStream, int
maxBytesInMemory) {
+ this(inputStream, maxBytesInMemory, true);
+ }
+
+ /**
+ * Creates a rereadable input stream.
+ *
+ * @param inputStream stream containing the source of data
+ * @param maxBytesInMemory maximum number of bytes to use to store
+ * the stream's contents in memory before switching to disk; note that
+ * the instance will preallocate a byte array whose size is
+ * maxBytesInMemory. This byte array will be made available for
+ * garbage collection (i.e. its reference set to null) when the
+ * content size exceeds the array's size, when close() is called, or
+ * when there are no more references to the instance.
+ * @param readToEndOfStreamOnFirstRewind Specifies whether or not to
+ * read to the end of stream on first rewind. If this is set to false,
+ * then when rewind() is first called, only those bytes already read
+ * from the original stream will be available from then on.
+ */
+ public RereadableInputStream(InputStream inputStream, int maxBytesInMemory,
+ boolean readToEndOfStreamOnFirstRewind) {
this.inputStream = inputStream;
this.maxBytesInMemory = maxBytesInMemory;
byteBuffer = new byte[maxBytesInMemory];
+ this.readToEndOfStreamOnFirstRewind = readToEndOfStreamOnFirstRewind;
}
+ /**
+ * Reads a byte from the stream, saving it in the store if it is being
+ * read from the original stream. Implements the abstract
+ * InputStream.read().
+ *
+ * @return the read byte, or -1 on end of stream.
+ * @throws IOException
+ */
public int read() throws IOException {
int inputByte = inputStream.read();
if (firstPass) {
@@ -58,27 +167,20 @@
return inputByte;
}
- private void saveByte(int inputByte) throws IOException {
+ /**
+ * "Rewinds" the stream to the beginning for rereading.
+ * @throws IOException
+ */
+ public void rewind() throws IOException {
- if (!bufferIsInFile) {
- boolean switchToFile = (size == (maxBytesInMemory));
- if (switchToFile) {
- storeFile = File.createTempFile("streamstore_", ".tmp");
- bufferIsInFile = true;
- storeOutputStream = new BufferedOutputStream(
- new FileOutputStream(storeFile));
- storeOutputStream.write(byteBuffer, 0, size);
- storeOutputStream.write(inputByte);
- } else {
- byteBuffer[size] = (byte) inputByte;
+ if (firstPass && readToEndOfStreamOnFirstRewind) {
+ // Force read to end of stream to fill store with any
+ // remaining bytes from original stream.
+ while(read() != -1) {
+ // empty loop
}
- } else {
- storeOutputStream.write(inputByte);
}
- ++size;
- }
- public void rewind() throws IOException {
closeStream();
if (storeOutputStream != null) {
storeOutputStream.close();
@@ -86,22 +188,72 @@
}
firstPass = false;
boolean newStreamIsInMemory = (size < maxBytesInMemory);
- inputStream = newStreamIsInMemory ? new
ByteArrayInputStream(byteBuffer)
- : new BufferedInputStream(new FileInputStream(storeFile));
+ inputStream = newStreamIsInMemory
+ ? new ByteArrayInputStream(byteBuffer)
+ : new BufferedInputStream(new FileInputStream(storeFile));
}
- public void closeStream() throws IOException {
+ /**
+ * Closes the input stream currently used for reading (may either be
+ * the original stream or a memory or file stream after the first pass).
+ *
+ * @throws IOException
+ */
+ // Does anyone need/want for this to be public?
+ private void closeStream() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
}
}
+ /**
+ * Closes the input stream and removes the temporary file if one was
+ * created.
+ *
+ * @throws IOException
+ */
public void close() throws IOException {
closeStream();
super.close();
if (storeFile != null) {
storeFile.delete();
}
+ }
+
+ /**
+ * Returns the number of bytes read from the original stream.
+ *
+ * @return number of bytes read
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Saves the byte read from the original stream to the store.
+ *
+ * @param inputByte byte read from original stream
+ * @throws IOException
+ */
+ private void saveByte(int inputByte) throws IOException {
+
+ if (!bufferIsInFile) {
+ boolean switchToFile = (size == (maxBytesInMemory));
+ if (switchToFile) {
+ storeFile = File.createTempFile("streamstore_", ".tmp");
+ bufferIsInFile = true;
+ storeOutputStream = new BufferedOutputStream(
+ new FileOutputStream(storeFile));
+ storeOutputStream.write(byteBuffer, 0, size);
+ storeOutputStream.write(inputByte);
+ byteBuffer = null; // release for garbage collection
+ } else {
+ byteBuffer[size] = (byte) inputByte;
+ }
+ } else {
+ storeOutputStream.write(inputByte);
+ }
+ ++size;
}
}
Modified:
incubator/tika/trunk/src/test/java/org/apache/tika/RereadableInputStreamTest.java
URL:
http://svn.apache.org/viewvc/incubator/tika/trunk/src/test/java/org/apache/tika/RereadableInputStreamTest.java?rev=582698&r1=582697&r2=582698&view=diff
==============================================================================
---
incubator/tika/trunk/src/test/java/org/apache/tika/RereadableInputStreamTest.java
(original)
+++
incubator/tika/trunk/src/test/java/org/apache/tika/RereadableInputStreamTest.java
Sun Oct 7 16:27:19 2007
@@ -37,8 +37,7 @@
public void test() throws IOException {
- File file = createTestFile();
- InputStream is = new BufferedInputStream(new FileInputStream(file));
+ InputStream is = createTestInputStream();
RereadableInputStream ris = new RereadableInputStream(is,
MEMORY_THRESHOLD);
try {
@@ -52,13 +51,54 @@
ris.rewind();
}
} finally {
- is.close();
+ // The RereadableInputStream should close the original input
+ // stream (if it hasn't already).
ris.close();
}
}
+ /**
+ * Test that the constructor's readToEndOfStreamOnFirstRewind parameter
+ * correctly determines the behavior.
+ *
+ * @throws IOException
+ */
+ public void testRewind() throws IOException {
+ doTestRewind(true);
+ doTestRewind(false);
+ }
+
+ private void doTestRewind(boolean readToEndOnRewind) throws IOException {
+
+ RereadableInputStream ris = null;
+
+ try {
+ InputStream s1 = createTestInputStream();
+ ris = new RereadableInputStream(s1, 5, readToEndOnRewind);
+ ris.read();
+ assertEquals(1, ris.getSize());
+ ris.rewind();
+ boolean moreBytesWereRead = (ris.getSize() > 1);
+ if (readToEndOnRewind) {
+ assertTrue(moreBytesWereRead);
+ } else {
+ assertFalse(moreBytesWereRead);
+ }
+ } finally {
+ if (ris != null) {
+ ris.close();
+ }
+ }
+
+ }
+
+ private InputStream createTestInputStream() throws IOException {
+ return new BufferedInputStream(new FileInputStream(createTestFile()));
+ }
+
private File createTestFile() throws IOException {
File testfile = File.createTempFile("ris_test", ".tmp");
+ testfile.deleteOnExit();
FileOutputStream fos = new FileOutputStream(testfile);
for (int i = 0; i < TEST_SIZE; i++) {
fos.write(i);