This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 7d176b6 Make CachableBlockFile builder require nonNull for cacheId (#2160) 7d176b6 is described below commit 7d176b60fc6a2ce28fcc3cfdc9a3b68684c76f87 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Jul 23 11:27:55 2021 -0400 Make CachableBlockFile builder require nonNull for cacheId (#2160) --- .../org/apache/accumulo/core/client/rfile/RFileScanner.java | 6 +++--- .../accumulo/core/client/rfile/RFileSummariesRetriever.java | 6 +++--- .../core/file/blockfile/impl/CachableBlockFile.java | 10 +++------- .../org/apache/accumulo/core/summary/SummaryReader.java | 8 ++++---- .../accumulo/core/file/rfile/MultiLevelIndexTest.java | 8 ++++---- .../java/org/apache/accumulo/core/file/rfile/RFileTest.java | 13 +++++++------ 6 files changed, 24 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 3413aec..bb4356d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -347,9 +347,9 @@ class RFileScanner extends ScannerOptions implements Scanner { for (int i = 0; i < sources.length; i++) { // TODO may have been a bug with multiple files and caching in older version... FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream(); - CachableBuilder cb = new CachableBuilder().cacheId("source-" + i).input(inputStream) - .length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider) - .cryptoService(cryptoService); + CachableBuilder cb = + new CachableBuilder().input(inputStream, "source-" + i).length(sources[i].getLength()) + .conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService); readers.add(new RFile.Reader(cb)); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java index 8dbb7a1..9341f50 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java @@ -93,9 +93,9 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions try { SummaryCollection all = new SummaryCollection(); CryptoService cservice = CryptoServiceFactory.newInstance(acuconf, ClassloaderType.JAVA); - for (RFileSource source : sources) { - SummaryReader fileSummary = SummaryReader.load(in.getFileSystem().getConf(), - source.getInputStream(), source.getLength(), summarySelector, factory, cservice); + for (int i = 0; i < sources.length; i++) { + SummaryReader fileSummary = SummaryReader.load(in.getFileSystem().getConf(), sources[i], + "source-" + i, summarySelector, factory, cservice); SummaryCollection sc = fileSummary .getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow))); all.merge(sc, factory); diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 2b15fe3..b4c24e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -77,11 +77,6 @@ public class CachableBlockFile { Configuration hadoopConf = null; CryptoService cryptoService = null; - public CachableBuilder cacheId(String id) { - this.cacheId = id; - return this; - } - public CachableBuilder conf(Configuration hadoopConf) { this.hadoopConf = hadoopConf; return this; @@ -94,7 +89,8 @@ public class CachableBlockFile { return this; } - public CachableBuilder input(InputStream is) { + public CachableBuilder input(InputStream is, String cacheId) { + this.cacheId = cacheId; this.inputSupplier = () -> is; return this; } @@ -359,7 +355,7 @@ public class CachableBlockFile { } public Reader(CachableBuilder b) { - this.cacheId = b.cacheId; + this.cacheId = Objects.requireNonNull(b.cacheId); this.inputSupplier = b.inputSupplier; this.lengthSupplier = b.lengthSupplier; this.fileLenCache = b.fileLenCache; diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index ce18259..341b6d9 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -21,7 +21,6 @@ package org.apache.accumulo.core.summary; import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.PrintStream; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -30,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.function.Predicate; +import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; @@ -174,11 +174,11 @@ public class SummaryReader { return fileSummaries; } - public static SummaryReader load(Configuration conf, InputStream inputStream, long length, + public static SummaryReader load(Configuration conf, RFileSource source, String cacheId, Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory, CryptoService cryptoService) throws IOException { - CachableBuilder cb = new CachableBuilder().input(inputStream).length(length).conf(conf) - .cryptoService(cryptoService); + CachableBuilder cb = new CachableBuilder().input(source.getInputStream(), cacheId) + .length(source.getLength()).conf(conf).cryptoService(cryptoService); return load(new CachableBlockFile.Reader(cb), summarySelector, factory); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java index 6eaabc3..fb2cdd0 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.file.rfile; +import static org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType.JAVA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,7 +30,6 @@ import java.util.Random; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoServiceFactory; -import org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; @@ -66,7 +66,7 @@ public class MultiLevelIndexTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", hadoopConf, - CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA)); + CryptoServiceFactory.newInstance(aconf, JAVA)); BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize)); @@ -86,8 +86,8 @@ public class MultiLevelIndexTest { byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in = new FSDataInputStream(bais); - CachableBuilder cb = new CachableBuilder().input(in).length(data.length).conf(hadoopConf) - .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA)); + CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(data.length) + .conf(hadoopConf).cryptoService(CryptoServiceFactory.newInstance(aconf, JAVA)); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(cb); Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 66572ed..ed62565 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -309,8 +309,8 @@ public class RFileTest { LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); - CachableBuilder cb = new CachableBuilder().cacheId("source-1").input(in).length(fileLength) - .conf(conf).cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService( + CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(fileLength).conf(conf) + .cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService( CryptoServiceFactory.newInstance(accumuloConfiguration, ClassloaderType.JAVA)); reader = new RFile.Reader(cb); if (cfsi) @@ -1746,10 +1746,11 @@ public class RFileTest { aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); BlockCacheManager manager = BlockCacheManagerFactory.getInstance(aconf); manager.start(new BlockCacheConfiguration(aconf)); - CachableBuilder cb = new CachableBuilder().input(in2).length(data.length).conf(hadoopConf) - .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA)) - .cacheProvider(new BasicCacheProvider(manager.getBlockCache(CacheType.INDEX), - manager.getBlockCache(CacheType.DATA))); + CachableBuilder cb = + new CachableBuilder().input(in2, "cache-1").length(data.length).conf(hadoopConf) + .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA)) + .cacheProvider(new BasicCacheProvider(manager.getBlockCache(CacheType.INDEX), + manager.getBlockCache(CacheType.DATA))); Reader reader = new RFile.Reader(cb); checkIndex(reader);