http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index ca15722..1bdc591 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -117,7 +117,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest } assert f.exists(); - RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)); + RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build(); assertEquals(dataPre.length + rawPost.length, reader.length()); byte[] result = new byte[(int)reader.length()];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 793348a..c7f3c36 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.MmappedRegions; import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.schema.CachingParams; @@ -133,7 +134,7 @@ public class SSTableReaderTest @Test public void testSpannedIndexPositions() throws IOException { - MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments + MmappedRegions.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java index b875a6a..4a72281 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java @@ -542,14 +542,12 @@ public class BufferedDataOutputStreamTest ndosp.flush(); - @SuppressWarnings("resource") - ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0); - + DataInputBuffer in = new DataInputBuffer(generated.toByteArray()); assertEquals(expectedSize, generated.toByteArray().length); for (long v : testValues) { - assertEquals(v, bbdi.readVInt()); + assertEquals(v, in.readVInt()); } } @@ -574,13 +572,11 @@ public class BufferedDataOutputStreamTest ndosp.flush(); - @SuppressWarnings("resource") - ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0); - + DataInputBuffer in = new DataInputBuffer(generated.toByteArray()); assertEquals(expectedSize, generated.toByteArray().length); for (long v : testValues) - assertEquals(v, bbdi.readUnsignedVInt()); + assertEquals(v, in.readUnsignedVInt()); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index e051c00..364ea71 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -25,9 +25,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.util.Arrays; -import java.util.concurrent.Callable; import static org.apache.cassandra.Util.expectEOF; import static org.apache.cassandra.Util.expectException; @@ -96,7 +94,7 @@ public class BufferedRandomAccessFileTest // test readBytes(int) method r.seek(0); - ByteBuffer fileContent = r.readBytes((int) w.length()); + ByteBuffer fileContent = ByteBufferUtil.read(r, (int) w.length()); assertEquals(fileContent.limit(), w.length()); assert ByteBufferUtil.string(fileContent).equals("Hello" + new String(bigData)); @@ -204,25 +202,19 @@ public class BufferedRandomAccessFileTest final ChannelProxy channel = new ChannelProxy(w.getPath()); final RandomAccessReader r = RandomAccessReader.open(channel); - ByteBuffer content = r.readBytes((int) r.length()); + ByteBuffer content = ByteBufferUtil.read(r, (int) r.length()); // after reading whole file we should be at EOF assertEquals(0, ByteBufferUtil.compare(content, data)); assert r.bytesRemaining() == 0 && r.isEOF(); r.seek(0); - content = r.readBytes(10); // reading first 10 bytes + content = ByteBufferUtil.read(r, 10); // reading first 10 bytes assertEquals(ByteBufferUtil.compare(content, "cccccccccc".getBytes()), 0); assertEquals(r.bytesRemaining(), r.length() - content.limit()); // trying to read more than file has right now - expectEOF(new Callable<Object>() - { - public Object call() throws IOException - { - return r.readBytes((int) r.length() + 10); - } - }); + expectEOF(() -> ByteBufferUtil.read(r, (int) r.length() + 10)); w.finish(); r.close(); @@ -249,23 +241,9 @@ public class BufferedRandomAccessFileTest assertEquals(file.bytesRemaining(), file.length() - 20); // trying to seek past the end of the file should produce EOFException - expectException(new Callable<Object>() - { - public Object call() - { - file.seek(file.length() + 30); - return null; - } - }, IllegalArgumentException.class); + expectException(() -> { file.seek(file.length() + 30); return null; }, IllegalArgumentException.class); - expectException(new Callable<Object>() - { - public Object call() throws IOException - { - file.seek(-1); - return null; - } - }, IllegalArgumentException.class); // throws IllegalArgumentException + expectException(() -> { file.seek(-1); return null; }, IllegalArgumentException.class); // throws IllegalArgumentException file.close(); channel.close(); @@ -352,16 +330,11 @@ public class BufferedRandomAccessFileTest { File file1 = writeTemporaryFile(new byte[16]); try (final ChannelProxy channel = new ChannelProxy(file1); - final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L)) + final RandomAccessReader file = new RandomAccessReader.Builder(channel) + .bufferSize(bufferSize) + .build()) { - expectEOF(new Callable<Object>() - { - public Object call() throws IOException - { - file.readFully(target, offset, 17); - return null; - } - }); + expectEOF(() -> { file.readFully(target, offset, 17); return null; }); } } @@ -370,15 +343,11 @@ public class BufferedRandomAccessFileTest { File file1 = writeTemporaryFile(new byte[16]); try (final ChannelProxy channel = new ChannelProxy(file1); - final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L)) + final RandomAccessReader file = new RandomAccessReader.Builder(channel).bufferSize(bufferSize).build()) { - expectEOF(new Callable<Object>() - { - public Object call() throws IOException - { - while (true) - file.readFully(target, 0, n); - } + expectEOF(() -> { + while (true) + file.readFully(target, 0, n); }); } } @@ -459,30 +428,17 @@ public class BufferedRandomAccessFileTest r.close(); // closing to test read after close - expectException(new Callable<Object>() - { - public Object call() - { - return r.read(); - } - }, AssertionError.class); + expectException(() -> r.read(), NullPointerException.class); //Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer //Writing to a BufferedOutputStream that is closed generates no error //Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a //write of a 0 length, but that is kind of a corner case - expectException(new Callable<Object>() - { - public Object call() throws IOException - { - w.write(generateByteArray(1)); - return null; - } - }, NullPointerException.class); + expectException(() -> { w.write(generateByteArray(1)); return null; }, NullPointerException.class); try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath()))) { - ByteBuffer contents = copy.readBytes((int) copy.length()); + ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length()); assertEquals(contents.limit(), data.length); assertEquals(ByteBufferUtil.compare(contents, data), 0); @@ -526,7 +482,7 @@ public class BufferedRandomAccessFileTest channel.close(); } - @Test (expected = AssertionError.class) + @Test(expected = AssertionError.class) public void testAssertionErrorWhenBytesPastMarkIsNegative() throws IOException { try (SequentialWriter w = createTempFile("brafAssertionErrorWhenBytesPastMarkIsNegative")) @@ -565,14 +521,7 @@ public class BufferedRandomAccessFileTest assertTrue(copy.bytesRemaining() == 0 && copy.isEOF()); // can't seek past the end of the file for read-only files - expectException(new Callable<Object>() - { - public Object call() - { - copy.seek(copy.length() + 1); - return null; - } - }, IllegalArgumentException.class); + expectException(() -> { copy.seek(copy.length() + 1); return null; }, IllegalArgumentException.class); copy.seek(0); copy.skipBytes(5); @@ -582,7 +531,7 @@ public class BufferedRandomAccessFileTest assertTrue(!copy.isEOF()); copy.seek(0); - ByteBuffer contents = copy.readBytes((int) copy.length()); + ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length()); assertEquals(contents.limit(), copy.length()); assertTrue(ByteBufferUtil.compare(contents, data) == 0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java new file mode 100644 index 0000000..57428af --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; + +import static org.junit.Assert.*; +import org.apache.cassandra.io.util.ChecksummedRandomAccessReader; +import org.apache.cassandra.io.util.ChecksummedSequentialWriter; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; + +public class ChecksummedRandomAccessReaderTest +{ + @Test + public void readFully() throws IOException + { + final File data = File.createTempFile("testReadFully", "data"); + final File crc = File.createTempFile("testReadFully", "crc"); + + final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering. + ThreadLocalRandom.current().nextBytes(expected); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(expected); + writer.finish(); + + assert data.exists(); + + RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build(); + byte[] b = new byte[expected.length]; + reader.readFully(b); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } + + @Test + public void seek() throws IOException + { + final File data = File.createTempFile("testSeek", "data"); + final File crc = File.createTempFile("testSeek", "crc"); + + final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size + ThreadLocalRandom.current().nextBytes(dataBytes); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(dataBytes); + writer.finish(); + + assert data.exists(); + + RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build(); + + final int seekPosition = 66000; + reader.seek(seekPosition); + + byte[] b = new byte[dataBytes.length - seekPosition]; + reader.readFully(b); + + byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } + + @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class) + public void corruptionDetection() throws IOException + { + final File data = File.createTempFile("corruptionDetection", "data"); + final File crc = File.createTempFile("corruptionDetection", "crc"); + + final byte[] expected = new byte[5 * 1024]; + Arrays.fill(expected, (byte) 0); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(expected); + writer.finish(); + + assert data.exists(); + + // simulate corruption of file + try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw")) + { + dataFile.seek(1024); + dataFile.write((byte) 5); + } + + RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build(); + byte[] b = new byte[expected.length]; + reader.readFully(b); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java new file mode 100644 index 0000000..fcee9b7 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import com.google.common.primitives.Ints; +import org.junit.Test; + +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FileSegmentInputStreamTest +{ + private ByteBuffer allocateBuffer(int size) + { + ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size)); + long seed = System.nanoTime(); + //seed = 365238103404423L; + System.out.println("Seed " + seed); + + new Random(seed).nextBytes(ret.array()); + return ret; + } + + @Test + public void testRead() throws IOException + { + testRead(0, 4096, 1024); + testRead(1024, 4096, 1024); + testRead(4096, 4096, 1024); + } + + private void testRead(int offset, int size, int checkInterval) throws IOException + { + final ByteBuffer buffer = allocateBuffer(size); + final String path = buffer.toString(); + + FileSegmentInputStream reader = new FileSegmentInputStream(buffer.duplicate(), path, offset); + assertEquals(path, reader.getPath()); + + for (int i = offset; i < (size + offset); i += checkInterval) + { + reader.seek(i); + assertFalse(reader.isEOF()); + assertEquals(i, reader.getFilePointer()); + + buffer.position(i - offset); + + int remaining = buffer.remaining(); + assertEquals(remaining, reader.bytesRemaining()); + byte[] expected = new byte[buffer.remaining()]; + buffer.get(expected); + assertTrue(Arrays.equals(expected, ByteBufferUtil.read(reader, remaining).array())); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + assertEquals(buffer.capacity() + offset, reader.getFilePointer()); + } + + reader.close(); + reader.close(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testMarkNotSupported() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0); + assertFalse(reader.markSupported()); + assertEquals(0, reader.bytesPastMark(null)); + reader.mark(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testResetNotSupported() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0); + reader.reset(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testSeekNegative() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0); + reader.seek(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testSeekBeforeOffset() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024); + reader.seek(1023); + } + + @Test(expected = IllegalArgumentException.class) + public void testSeekPastLength() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024); + reader.seek(2049); + } + + @Test(expected = EOFException.class) + public void testReadBytesTooMany() throws Exception + { + FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024); + ByteBufferUtil.read(reader, 2049); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MemoryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/MemoryTest.java b/test/unit/org/apache/cassandra/io/util/MemoryTest.java index 9be69ac..81dee7e 100644 --- a/test/unit/org/apache/cassandra/io/util/MemoryTest.java +++ b/test/unit/org/apache/cassandra/io/util/MemoryTest.java @@ -18,8 +18,11 @@ */ package org.apache.cassandra.io.util; +import java.io.EOFException; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; @@ -27,6 +30,10 @@ import org.junit.Test; import junit.framework.Assert; import org.apache.cassandra.utils.memory.MemoryUtil; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class MemoryTest { @@ -45,6 +52,36 @@ public class MemoryTest memory.close(); } + @Test + public void testInputStream() throws IOException + { + byte[] bytes = new byte[4096]; + ThreadLocalRandom.current().nextBytes(bytes); + final Memory memory = Memory.allocate(bytes.length); + memory.setBytes(0, bytes, 0, bytes.length); + + try(MemoryInputStream stream = new MemoryInputStream(memory, 1024)) + { + byte[] bb = new byte[bytes.length]; + assertEquals(bytes.length, stream.available()); + + stream.readFully(bb); + assertEquals(0, stream.available()); + + assertTrue(Arrays.equals(bytes, bb)); + + try + { + stream.readInt(); + fail("Expected EOF exception"); + } + catch (EOFException e) + { + //pass + } + } + } + private static void test(ByteBuffer canon, Memory memory) { ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java new file mode 100644 index 0000000..9df3fed --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import com.google.common.primitives.Ints; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.utils.ChecksumType; + +import static junit.framework.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class MmappedRegionsTest +{ + private static final Logger logger = LoggerFactory.getLogger(MmappedRegionsTest.class); + + private static ByteBuffer allocateBuffer(int size) + { + ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size)); + long seed = System.nanoTime(); + //seed = 365238103404423L; + logger.info("Seed {}", seed); + + new Random(seed).nextBytes(ret.array()); + return ret; + } + + private static File writeFile(String fileName, ByteBuffer buffer) throws IOException + { + File ret = File.createTempFile(fileName, "1"); + ret.deleteOnExit(); + + try (SequentialWriter writer = SequentialWriter.open(ret)) + { + writer.write(buffer); + writer.finish(); + } + + assert ret.exists(); + assert ret.length() >= buffer.capacity(); + return ret; + + } + + @Test + public void testEmpty() throws Exception + { + ByteBuffer buffer = allocateBuffer(1024); + try(ChannelProxy channel = new ChannelProxy(writeFile("testEmpty", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + assertTrue(regions.isEmpty()); + assertTrue(regions.isValid(channel)); + } + } + + @Test + public void testTwoSegments() throws Exception + { + ByteBuffer buffer = allocateBuffer(2048); + try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(1024); + for (int i = 0; i < 1024; i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(0, region.bottom()); + assertEquals(1024, region.top()); + } + + regions.extend(2048); + for (int i = 0; i < 2048; i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + if (i < 1024) + { + assertEquals(0, region.bottom()); + assertEquals(1024, region.top()); + } + else + { + assertEquals(1024, region.bottom()); + assertEquals(2048, region.top()); + } + } + } + } + + @Test + public void testSmallSegmentSize() throws Exception + { + int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + MmappedRegions.MAX_SEGMENT_SIZE = 1024; + + ByteBuffer buffer = allocateBuffer(4096); + try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(1024); + regions.extend(2048); + regions.extend(4096); + + final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(SIZE * (i / SIZE), region.bottom()); + assertEquals(SIZE + (SIZE * (i / SIZE)), region.top()); + } + } + finally + { + MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE; + } + } + + @Test + public void testAllocRegions() throws Exception + { + int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + MmappedRegions.MAX_SEGMENT_SIZE = 1024; + + ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3); + + try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(buffer.capacity()); + + final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(SIZE * (i / SIZE), region.bottom()); + assertEquals(SIZE + (SIZE * (i / SIZE)), region.top()); + } + } + finally + { + MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE; + } + } + + @Test + public void testCopy() throws Exception + { + ByteBuffer buffer = allocateBuffer(128 * 1024); + + MmappedRegions snapshot; + ChannelProxy channelCopy; + + try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer)); + MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4)) + { + // create 3 more segments, one per quater capacity + regions.extend(buffer.capacity() / 2); + regions.extend(3 * buffer.capacity() / 4); + regions.extend(buffer.capacity()); + + // make a snapshot + snapshot = regions.sharedCopy(); + + // keep the channel open + channelCopy = channel.sharedCopy(); + } + + assertFalse(snapshot.isCleanedUp()); + + final int SIZE = buffer.capacity() / 4; + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = snapshot.floor(i); + assertNotNull(region); + assertEquals(SIZE * (i / SIZE), region.bottom()); + assertEquals(SIZE + (SIZE * (i / SIZE)), region.top()); + + // check we can access the buffer + assertNotNull(region.buffer.duplicate().getInt()); + } + + assertNull(snapshot.close(null)); + assertNull(channelCopy.close(null)); + assertTrue(snapshot.isCleanedUp()); + } + + @Test(expected = AssertionError.class) + public void testCopyCannotExtend() throws Exception + { + ByteBuffer buffer = allocateBuffer(128 * 1024); + + MmappedRegions snapshot; + ChannelProxy channelCopy; + + try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(buffer.capacity() / 2); + + // make a snapshot + snapshot = regions.sharedCopy(); + + // keep the channel open + channelCopy = channel.sharedCopy(); + } + + try + { + snapshot.extend(buffer.capacity()); + } + finally + { + assertNull(snapshot.close(null)); + assertNull(channelCopy.close(null)); + } + } + + @Test + public void testExtendOutOfOrder() throws Exception + { + ByteBuffer buffer = allocateBuffer(4096); + try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(4096); + regions.extend(1024); + regions.extend(2048); + + for (int i = 0; i < buffer.capacity(); i++) + { + MmappedRegions.Region region = regions.floor(i); + assertNotNull(region); + assertEquals(0, region.bottom()); + assertEquals(4096, region.top()); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeExtend() throws Exception + { + ByteBuffer buffer = allocateBuffer(1024); + try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer)); + MmappedRegions regions = MmappedRegions.empty(channel)) + { + regions.extend(-1); + } + } + + @Test + public void testMapForCompressionMetadata() throws Exception + { + int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE; + MmappedRegions.MAX_SEGMENT_SIZE = 1024; + + ByteBuffer buffer = allocateBuffer(128 * 1024); + File f = File.createTempFile("testMapForCompressionMetadata", "1"); + f.deleteOnExit(); + + File cf = File.createTempFile(f.getName() + ".metadata", "1"); + cf.deleteOnExit(); + + MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)) + .replayPosition(null); + try(SequentialWriter writer = new CompressedSequentialWriter(f, + cf.getAbsolutePath(), + CompressionParams.snappy(), + sstableMetadataCollector)) + { + writer.write(buffer); + writer.finish(); + } + + CompressionMetadata metadata = new CompressionMetadata(cf.getAbsolutePath(), f.length(), ChecksumType.CRC32); + try(ChannelProxy channel = new ChannelProxy(f); + MmappedRegions regions = MmappedRegions.map(channel, metadata)) + { + + assertFalse(regions.isEmpty()); + int i = 0; + while(i < buffer.capacity()) + { + CompressionMetadata.Chunk chunk = metadata.chunkFor(i); + + MmappedRegions.Region region = regions.floor(chunk.offset); + assertNotNull(region); + + ByteBuffer compressedChunk = region.buffer.duplicate(); + assertNotNull(compressedChunk); + assertEquals(chunk.length + 4, compressedChunk.capacity()); + + assertEquals(chunk.offset, region.bottom()); + assertEquals(chunk.offset + chunk.length + 4, region.top()); + + i += metadata.chunkLength(); + } + } + finally + { + MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE; + metadata.close(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalArgForMap1() throws Exception + { + ByteBuffer buffer = allocateBuffer(1024); + try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer)); + MmappedRegions regions = MmappedRegions.map(channel, 0)) + { + assertTrue(regions.isEmpty()); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalArgForMap2() throws Exception + { + ByteBuffer buffer = allocateBuffer(1024); + try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer)); + MmappedRegions regions = MmappedRegions.map(channel, -1L)) + { + assertTrue(regions.isEmpty()); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalArgForMap3() throws Exception + { + ByteBuffer buffer = allocateBuffer(1024); + try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap3", buffer)); + MmappedRegions regions = MmappedRegions.map(channel, null)) + { + assertTrue(regions.isEmpty()); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java index 3aad7e9..3ebbc67 100644 --- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java @@ -14,7 +14,6 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.Random; -import org.apache.cassandra.io.util.NIODataInputStream; import org.junit.Test; import com.google.common.base.Charsets; @@ -180,17 +179,10 @@ public class NIODataInputStreamTest } @SuppressWarnings("resource") - @Test(expected = IllegalArgumentException.class) - public void testTooSmallBufferSize() throws Exception - { - new NIODataInputStream(new FakeChannel(), 4); - } - - @SuppressWarnings("resource") @Test(expected = NullPointerException.class) public void testNullRBC() throws Exception { - new NIODataInputStream(null, 8); + new NIODataInputStream(null, 9); } @SuppressWarnings("resource") @@ -769,7 +761,7 @@ public class NIODataInputStreamTest out.writeUnsignedVInt(value); buf.position(ii); - NIODataInputStream in = new DataInputBuffer(buf, false); + RebufferingInputStream in = new DataInputBuffer(buf, false); assertEquals(value, in.readUnsignedVInt()); } @@ -792,7 +784,7 @@ public class NIODataInputStreamTest out.writeUnsignedVInt(value); buf.position(0); - NIODataInputStream in = new DataInputBuffer(buf, false); + RebufferingInputStream in = new DataInputBuffer(buf, false); assertEquals(value, in.readUnsignedVInt()); @@ -831,7 +823,7 @@ public class NIODataInputStreamTest truncated.put(buf); truncated.flip(); - NIODataInputStream in = new DataInputBuffer(truncated, false); + RebufferingInputStream in = new DataInputBuffer(truncated, false); boolean threw = false; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java new file mode 100644 index 0000000..f0d4383 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java @@ -0,0 +1,483 @@ +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class RandomAccessReaderTest +{ + private static final Logger logger = LoggerFactory.getLogger(RandomAccessReaderTest.class); + + private static final class Parameters + { + public final long fileLength; + public final int bufferSize; + + public BufferType bufferType; + public int maxSegmentSize; + public boolean mmappedRegions; + public byte[] expected; + + Parameters(long fileLength, int bufferSize) + { + this.fileLength = fileLength; + this.bufferSize = bufferSize; + this.bufferType = BufferType.OFF_HEAP; + this.maxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE; + this.mmappedRegions = false; + this.expected = "The quick brown fox jumps over the lazy dog".getBytes(FileUtils.CHARSET); + } + + public Parameters mmappedRegions(boolean mmappedRegions) + { + this.mmappedRegions = mmappedRegions; + return this; + } + + public Parameters bufferType(BufferType bufferType) + { + this.bufferType = bufferType; + return this; + } + + public Parameters maxSegmentSize(int maxSegmentSize) + { + this.maxSegmentSize = maxSegmentSize; + return this; + } + + public Parameters expected(byte[] expected) + { + this.expected = expected; + return this; + } + } + + @Test + public void testBufferedOffHeap() throws IOException + { + testReadFully(new Parameters(8192, 4096).bufferType(BufferType.OFF_HEAP)); + } + + @Test + public void testBufferedOnHeap() throws IOException + { + testReadFully(new Parameters(8192, 4096).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testBigBufferSize() throws IOException + { + testReadFully(new Parameters(8192, 65536).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testTinyBufferSize() throws IOException + { + testReadFully(new Parameters(8192, 16).bufferType(BufferType.ON_HEAP)); + } + + @Test + public void testOneSegment() throws IOException + { + testReadFully(new Parameters(8192, 4096).mmappedRegions(true)); + } + + @Test + public void testMultipleSegments() throws IOException + { + testReadFully(new Parameters(8192, 4096).mmappedRegions(true).maxSegmentSize(1024)); + } + + @Test + public void testVeryLarge() throws IOException + { + final long SIZE = 1L << 32; // 2GB + Parameters params = new Parameters(SIZE, 1 << 20); // 1MB + + try(ChannelProxy channel = new ChannelProxy("abc", new FakeFileChannel(SIZE))) + { + RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel) + .bufferType(params.bufferType) + .bufferSize(params.bufferSize); + + try(RandomAccessReader reader = builder.build()) + { + assertEquals(channel.size(), reader.length()); + assertEquals(channel.size(), reader.bytesRemaining()); + assertEquals(Integer.MAX_VALUE, reader.available()); + + assertEquals(channel.size(), reader.skip(channel.size())); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + } + } + + /** A fake file channel that simply increments the position and doesn't + * actually read anything. We use it to simulate very large files, > 2G. + */ + private static final class FakeFileChannel extends FileChannel + { + private final long size; + private long position; + + FakeFileChannel(long size) + { + this.size = size; + } + + public int read(ByteBuffer dst) + { + int ret = dst.remaining(); + position += ret; + dst.position(dst.limit()); + return ret; + } + + public long read(ByteBuffer[] dsts, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + public int write(ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + public long write(ByteBuffer[] srcs, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + public long position() + { + return position; + } + + public FileChannel position(long newPosition) + { + position = newPosition; + return this; + } + + public long size() + { + return size; + } + + public FileChannel truncate(long size) + { + throw new UnsupportedOperationException(); + } + + public void force(boolean metaData) + { + throw new UnsupportedOperationException(); + } + + public long transferTo(long position, long count, WritableByteChannel target) + { + throw new UnsupportedOperationException(); + } + + public long transferFrom(ReadableByteChannel src, long position, long count) + { + throw new UnsupportedOperationException(); + } + + public int read(ByteBuffer dst, long position) + { + int ret = dst.remaining(); + this.position = position + ret; + dst.position(dst.limit()); + return ret; + } + + public int write(ByteBuffer src, long position) + { + throw new UnsupportedOperationException(); + } + + public MappedByteBuffer map(MapMode mode, long position, long size) + { + throw new UnsupportedOperationException(); + } + + public FileLock lock(long position, long size, boolean shared) + { + throw new UnsupportedOperationException(); + } + + public FileLock tryLock(long position, long size, boolean shared) + { + throw new UnsupportedOperationException(); + } + + protected void implCloseChannel() + { + + } + } + + private static File writeFile(Parameters params) throws IOException + { + final File f = File.createTempFile("testReadFully", "1"); + f.deleteOnExit(); + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + long numWritten = 0; + while (numWritten < params.fileLength) + { + writer.write(params.expected); + numWritten += params.expected.length; + } + + writer.finish(); + } + + assert f.exists(); + assert f.length() >= params.fileLength; + return f; + } + + private static void testReadFully(Parameters params) throws IOException + { + final File f = writeFile(params); + try(ChannelProxy channel = new ChannelProxy(f)) + { + RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel) + .bufferType(params.bufferType) + .bufferSize(params.bufferSize); + if (params.mmappedRegions) + builder.regions(MmappedRegions.map(channel, f.length())); + + try(RandomAccessReader reader = builder.build()) + { + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(f.length(), reader.length()); + assertEquals(f.length(), reader.bytesRemaining()); + assertEquals(Math.min(Integer.MAX_VALUE, f.length()), reader.available()); + + byte[] b = new byte[params.expected.length]; + long numRead = 0; + while (numRead < params.fileLength) + { + reader.readFully(b); + assertTrue(Arrays.equals(params.expected, b)); + numRead += b.length; + } + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + + if (builder.regions != null) + assertNull(builder.regions.close(null)); + } + } + + @Test + public void testReadBytes() throws IOException + { + File f = File.createTempFile("testReadBytes", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + writer.write(expected.getBytes()); + writer.finish(); + } + + assert f.exists(); + + try(ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(f.getAbsolutePath(), reader.getPath()); + assertEquals(expected.length(), reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + } + } + + @Test + public void testReset() throws IOException + { + File f = File.createTempFile("testMark", "1"); + final String expected = "The quick brown fox jumps over the lazy dog"; + final int numIterations = 10; + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + for (int i = 0; i < numIterations; i++) + writer.write(expected.getBytes()); + writer.finish(); + } + + assert f.exists(); + + try(ChannelProxy channel = new ChannelProxy(f); + RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(expected.length() * numIterations, reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + + assertFalse(reader.isEOF()); + assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining()); + + FileMark mark = reader.mark(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + assertTrue(reader.isEOF()); + assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark()); + assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark)); + + reader.reset(mark); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + reader.reset(); + assertEquals(0, reader.bytesPastMark()); + assertEquals(0, reader.bytesPastMark(mark)); + assertFalse(reader.isEOF()); + for (int i = 0; i < (numIterations - 1); i++) + { + b = ByteBufferUtil.read(reader, expected.length()); + assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); + } + + assertTrue(reader.isEOF()); + } + } + + @Test + public void testSeekSingleThread() throws IOException, InterruptedException + { + testSeek(1); + } + + @Test + public void testSeekMultipleThreads() throws IOException, InterruptedException + { + testSeek(10); + } + + private static void testSeek(int numThreads) throws IOException, InterruptedException + { + final File f = File.createTempFile("testMark", "1"); + final byte[] expected = new byte[1 << 16]; + + long seed = System.nanoTime(); + //seed = 365238103404423L; + logger.info("Seed {}", seed); + Random r = new Random(seed); + r.nextBytes(expected); + + try(SequentialWriter writer = SequentialWriter.open(f)) + { + writer.write(expected); + writer.finish(); + } + + assert f.exists(); + + try(final ChannelProxy channel = new ChannelProxy(f)) + { + final Runnable worker = () -> + { + try(RandomAccessReader reader = new RandomAccessReader.Builder(channel).build()) + { + assertEquals(expected.length, reader.length()); + + ByteBuffer b = ByteBufferUtil.read(reader, expected.length); + assertTrue(Arrays.equals(expected, b.array())); + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + reader.seek(0); + b = ByteBufferUtil.read(reader, expected.length); + assertTrue(Arrays.equals(expected, b.array())); + assertTrue(reader.isEOF()); + assertEquals(0, reader.bytesRemaining()); + + for (int i = 0; i < 10; i++) + { + int pos = r.nextInt(expected.length); + reader.seek(pos); + assertEquals(pos, reader.getPosition()); + + ByteBuffer buf = ByteBuffer.wrap(expected, pos, expected.length - pos) + .order(ByteOrder.BIG_ENDIAN); + + while (reader.bytesRemaining() > 4) + assertEquals(buf.getInt(), reader.readInt()); + } + + reader.close(); + } + catch (Exception ex) + { + ex.printStackTrace(); + fail(ex.getMessage()); + } + }; + + if (numThreads == 1) + { + worker.run(); + } + else + { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) + executor.submit(worker); + + executor.shutdown(); + executor.awaitTermination(1, TimeUnit.MINUTES); + } + } + } +}