keith-turner closed pull request #324: ACCUMULO-4744 Fixed RFile API scanner bug URL: https://github.com/apache/accumulo/pull/324
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 4dfba68850..bc0df8253f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -265,7 +265,8 @@ public SamplerConfiguration getSamplerConfiguration() { List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length); for (int i = 0; i < sources.length; i++) { FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream(); - readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache, + + readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()))); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 4fa66341f2..3ecb5cafc7 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -147,7 +147,7 @@ public long getStartPos() throws IOException { public static class Reader implements BlockFileReader { private final RateLimiter readLimiter; private BCFile.Reader _bc; - private String fileName = "not_available"; + private final String fileName; private BlockCache _dCache = null; private BlockCache _iCache = null; private InputStream fin = null; @@ -251,16 +251,18 @@ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, this.readLimiter = readLimiter; } - public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index, - AccumuloConfiguration accumuloConfiguration) throws IOException { + public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data, + BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException { + this.fileName = cacheId; this._dCache = data; this._iCache = index; this.readLimiter = null; init(fsin, len, conf, accumuloConfiguration); } - public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, + public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { + this.fileName = cacheId; this.readLimiter = null; init(fsin, len, conf, accumuloConfiguration); } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java index 4993810b3a..8748d8c57b 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; @@ -623,4 +624,27 @@ private Reader getReader(LocalFileSystem localFs, String testFile) throws IOExce .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build(); return reader; } + + @Test + public void testMultipleFilesAndCache() throws Exception { + SortedMap<Key,Value> testData = createTestData(100, 10, 10); + List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(), createTmpTestFile()); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + for (int i = 0; i < files.size(); i++) { + try (RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) { + for (Entry<Key,Value> entry : testData.entrySet()) { + if (entry.getKey().hashCode() % files.size() == i) { + writer.append(entry.getKey(), entry.getValue()); + } + } + } + } + + Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()])).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000) + .build(); + Assert.assertEquals(testData, toMap(scanner)); + scanner.close(); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java index 08889c0e79..7d947eba1e 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java @@ -76,7 +76,7 @@ private void runTest(int maxBlockSize, int num) throws IOException { byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in = new FSDataInputStream(bais); - CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf); + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", in, data.length, CachedConfiguration.getInstance(), aconf); Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8); BlockRead rootIn = _cbr.getMetaBlock("root"); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index fc43ef1092..5967f85f04 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -267,7 +267,8 @@ public void openReader(boolean cfsi) throws IOException { LruBlockCache indexCache = new LruBlockCache(100000000, 100000); LruBlockCache dataCache = new LruBlockCache(100000000, 100000); - CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()); + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf, dataCache, indexCache, + AccumuloConfiguration.getDefaultConfiguration()); reader = new RFile.Reader(_cbr); if (cfsi) iter = new ColumnFamilySkippingIterator(reader); @@ -1624,7 +1625,7 @@ private void runVersionTest(int version) throws IOException { SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration(); - CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(), aconf); + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length, CachedConfiguration.getInstance(), aconf); Reader reader = new RFile.Reader(_cbr); checkIndex(reader); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services