keith-turner closed pull request #324: ACCUMULO-4744 Fixed RFile API scanner bug
URL: https://github.com/apache/accumulo/pull/324
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 4dfba68850..bc0df8253f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -265,7 +265,8 @@ public SamplerConfiguration getSamplerConfiguration() {
       List<SortedKeyValueIterator<Key,Value>> readers = new 
ArrayList<>(sources.length);
       for (int i = 0; i < sources.length; i++) {
         FSDataInputStream inputStream = (FSDataInputStream) 
sources[i].getInputStream();
-        readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, 
sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
+
+        readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + 
i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, 
indexCache,
             AccumuloConfiguration.getDefaultConfiguration())));
       }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 4fa66341f2..3ecb5cafc7 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -147,7 +147,7 @@ public long getStartPos() throws IOException {
   public static class Reader implements BlockFileReader {
     private final RateLimiter readLimiter;
     private BCFile.Reader _bc;
-    private String fileName = "not_available";
+    private final String fileName;
     private BlockCache _dCache = null;
     private BlockCache _iCache = null;
     private InputStream fin = null;
@@ -251,16 +251,18 @@ public Reader(FileSystem fs, Path dataFile, Configuration 
conf, BlockCache data,
       this.readLimiter = readLimiter;
     }
 
-    public <InputStreamType extends InputStream & Seekable> 
Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, 
BlockCache index,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
+    public <InputStreamType extends InputStream & Seekable> Reader(String 
cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
+        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws 
IOException {
+      this.fileName = cacheId;
       this._dCache = data;
       this._iCache = index;
       this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    public <InputStreamType extends InputStream & Seekable> 
Reader(InputStreamType fsin, long len, Configuration conf,
+    public <InputStreamType extends InputStream & Seekable> Reader(String 
cacheId, InputStreamType fsin, long len, Configuration conf,
         AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.fileName = cacheId;
       this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 4993810b3a..8748d8c57b 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -25,6 +25,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -623,4 +624,27 @@ private Reader getReader(LocalFileSystem localFs, String 
testFile) throws IOExce
         
.withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
     return reader;
   }
+
+  @Test
+  public void testMultipleFilesAndCache() throws Exception {
+    SortedMap<Key,Value> testData = createTestData(100, 10, 10);
+    List<String> files = Arrays.asList(createTmpTestFile(), 
createTmpTestFile(), createTmpTestFile());
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+
+    for (int i = 0; i < files.size(); i++) {
+      try (RFileWriter writer = 
RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build()) {
+        for (Entry<Key,Value> entry : testData.entrySet()) {
+          if (entry.getKey().hashCode() % files.size() == i) {
+            writer.append(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+    }
+
+    Scanner scanner = RFile.newScanner().from(files.toArray(new 
String[files.size()])).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000)
+        .build();
+    Assert.assertEquals(testData, toMap(scanner));
+    scanner.close();
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 08889c0e79..7d947eba1e 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -76,7 +76,7 @@ private void runTest(int maxBlockSize, int num) throws 
IOException {
     byte[] data = baos.toByteArray();
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in = new FSDataInputStream(bais);
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, 
data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", 
in, data.length, CachedConfiguration.getInstance(), aconf);
 
     Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
     BlockRead rootIn = _cbr.getMetaBlock("root");
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index fc43ef1092..5967f85f04 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -267,7 +267,8 @@ public void openReader(boolean cfsi) throws IOException {
       LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
       LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
 
-      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, 
fileLength, conf, dataCache, indexCache, 
AccumuloConfiguration.getDefaultConfiguration());
+      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", 
in, fileLength, conf, dataCache, indexCache,
+          AccumuloConfiguration.getDefaultConfiguration());
       reader = new RFile.Reader(_cbr);
       if (cfsi)
         iter = new ColumnFamilySkippingIterator(reader);
@@ -1624,7 +1625,7 @@ private void runVersionTest(int version) throws 
IOException {
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
     FSDataInputStream in2 = new FSDataInputStream(bais);
     AccumuloConfiguration aconf = 
AccumuloConfiguration.getDefaultConfiguration();
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, 
data.length, CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", 
in2, data.length, CachedConfiguration.getInstance(), aconf);
     Reader reader = new RFile.Reader(_cbr);
     checkIndex(reader);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to