Repository: cassandra
Updated Branches:
  refs/heads/trunk 504d3c515 -> 060c7961d


Throw EOFException if we run out of chunks in compressed file

Patch by marcuse; reviewed by yukim for CASSANDRA-7664.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76adf0e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76adf0e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76adf0e1

Branch: refs/heads/trunk
Commit: 76adf0e12ed91ee7c75164872202bff29a2ad7f4
Parents: ecf1bae
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Aug 19 11:45:57 2014 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Aug 20 08:21:43 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../compress/CompressedInputStream.java         | 17 +++++++++++++--
 .../compress/CompressedInputStreamTest.java     | 23 +++++++++++++++-----
 3 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 304d9bf..c8f7591 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.10
+ * Throw EOFException if we run out of chunks in compressed datafile
+   (CASSANDRA-7664)
  * Throw InvalidRequestException when queries contain relations on entire
    collection columns (CASSANDRA-7506)
  * Fix PRSI handling of CQL3 row markers for row cleanup (CASSANDRA-7787)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 698c2fe..ef019c2 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -58,6 +58,8 @@ public class CompressedInputStream extends InputStream
     // raw checksum bytes
     private final byte[] checksumBytes = new byte[4];
 
+    private static final byte[] POISON_PILL = new byte[0];
+
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
@@ -83,7 +85,10 @@ public class CompressedInputStream extends InputStream
         {
             try
             {
-                decompress(dataBuffer.take());
+                byte[] compressedWithCRC = dataBuffer.take();
+                if (compressedWithCRC == POISON_PILL)
+                    throw new EOFException("No chunk available");
+                decompress(compressedWithCRC);
             }
             catch (InterruptedException e)
             {
@@ -162,7 +167,15 @@ public class CompressedInputStream extends InputStream
 
                 int bufferRead = 0;
                 while (bufferRead < readLength)
-                    bufferRead += source.read(compressedWithCRC, bufferRead, 
readLength - bufferRead);
+                {
+                    int r = source.read(compressedWithCRC, bufferRead, 
readLength - bufferRead);
+                    if (r < 0)
+                    {
+                        dataBuffer.put(POISON_PILL);
+                        return; // throw exception where we consume dataBuffer
+                    }
+                    bufferRead += r;
+                }
                 dataBuffer.put(compressedWithCRC);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76adf0e1/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 027c84c..532b506 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.streaming.compress;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.util.*;
@@ -42,18 +43,23 @@ public class CompressedInputStreamTest
     @Test
     public void testCompressedRead() throws Exception
     {
-        testCompressedReadWith(new long[]{0L});
-        testCompressedReadWith(new long[]{1L});
-        testCompressedReadWith(new long[]{100L});
+        testCompressedReadWith(new long[]{0L}, false);
+        testCompressedReadWith(new long[]{1L}, false);
+        testCompressedReadWith(new long[]{100L}, false);
 
-        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L});
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
     }
 
+    @Test(expected = EOFException.class)
+    public void testTruncatedRead() throws Exception
+    {
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+    }
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */
-    private void testCompressedReadWith(long[] valuesToCheck) throws Exception
+    private void testCompressedReadWith(long[] valuesToCheck, boolean 
testTruncate) throws Exception
     {
         assert valuesToCheck != null && valuesToCheck.length > 0;
 
@@ -95,6 +101,13 @@ public class CompressedInputStreamTest
         }
         f.close();
 
+        if (testTruncate)
+        {
+            byte [] actuallyRead = new byte[50];
+            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
+            toRead = actuallyRead;
+        }
+
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info, true);

Reply via email to