[accumulo] branch master updated: Make RFileScanner use table props throughout (#493)
This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new efdd011 Make RFileScanner use table props throughout (#493) efdd011 is described below commit efdd0114298864d8411d74031d04ef93224c4c2c Author: Mike MillerAuthorDate: Tue May 22 19:15:28 2018 -0400 Make RFileScanner use table props throughout (#493) --- .../accumulo/core/client/rfile/RFileScanner.java | 24 +- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 57bc144..6caaaf0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment; import org.apache.accumulo.core.client.impl.ScannerOptions; import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; @@ -78,6 +79,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private Opts opts; private int batchSize = 1000; private long readaheadThreshold = 3; + private AccumuloConfiguration tableConf; static class Opts { InputArgs in; @@ -179,13 +181,17 @@ class RFileScanner extends ScannerOptions implements Scanner { } this.opts = opts; +if (null != opts.tableConfig && opts.tableConfig.size() > 0) { + ConfigurationCopy tableCC = new ConfigurationCopy(DefaultConfiguration.getInstance()); + opts.tableConfig.forEach(tableCC::set); + this.tableConf = tableCC; +} else { + this.tableConf = DefaultConfiguration.getInstance(); +} if (opts.indexCacheSize > 0 || opts.dataCacheSize > 0) { - ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); - if (null != opts.tableConfig) { -opts.tableConfig.forEach(cc::set); - } - + ConfigurationCopy cc = tableConf instanceof ConfigurationCopy ? (ConfigurationCopy) tableConf + : new ConfigurationCopy(tableConf); try { blockCacheManager = BlockCacheManagerFactory.getClientInstance(cc); if (opts.indexCacheSize > 0) { @@ -347,9 +353,8 @@ class RFileScanner extends ScannerOptions implements Scanner { for (int i = 0; i < sources.length; i++) { // TODO may have been a bug with multiple files and caching in older version... FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream(); -readers.add(new RFile.Reader( -new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), -opts.in.getConf(), dataCache, indexCache, DefaultConfiguration.getInstance(; +readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, +sources[i].getLength(), opts.in.getConf(), dataCache, indexCache, tableConf))); } if (getSamplerConfiguration() != null) { @@ -377,8 +382,7 @@ class RFileScanner extends ScannerOptions implements Scanner { try { if (opts.tableConfig != null && opts.tableConfig.size() > 0) { - ConfigurationCopy conf = new ConfigurationCopy(opts.tableConfig); - iterator = IteratorUtil.loadIterators(IteratorScope.scan, iterator, null, conf, + iterator = IteratorUtil.loadIterators(IteratorScope.scan, iterator, null, tableConf, serverSideIteratorList, serverSideIteratorOptions, new IterEnv()); } else { iterator = IteratorUtil.loadIterators(iterator, serverSideIteratorList, -- To stop receiving notification emails like this one, please contact mmil...@apache.org.
[accumulo] 01/01: Merge branch '1.9'
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 5a0a63151db7fc3c54f8731257499cb08a72af24 Merge: b484841 1180d76 Author: Keith TurnerAuthorDate: Tue May 22 18:02:29 2018 -0400 Merge branch '1.9' .../apache/accumulo/core/file/FileOperations.java | 18 .../file/blockfile/impl/CachableBlockFile.java | 50 +- .../accumulo/core/file/rfile/RFileOperations.java | 5 ++- .../org/apache/accumulo/tserver/FileManager.java | 8 +++- .../tserver/TabletServerResourceManager.java | 7 ++- 5 files changed, 72 insertions(+), 16 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index a6ee3bc,5a26ad2..9c69377 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@@ -444,7 -438,12 +457,12 @@@ public abstract class FileOperations /** * (Optional) set the index cache to be used to optimize reads within the constructed reader. */ -public SubbuilderType withIndexCache(BlockCache indexCache); +SubbuilderType withIndexCache(BlockCache indexCache); + + /** + * (Optional) set the file len cache to be used to optimize reads within the constructed reader. + */ -public SubbuilderType withFileLenCache(Cache fileLenCache); ++SubbuilderType withFileLenCache(Cache fileLenCache); } /** diff --cc core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index de3d88d,336ec4d..7c4f97c --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@@ -16,25 -16,29 +16,26 @@@ */ package org.apache.accumulo.core.file.blockfile.impl; -import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.lang.ref.SoftReference; -import java.util.concurrent.Callable; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; + import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.blockfile.ABlockReader; -import org.apache.accumulo.core.file.blockfile.ABlockWriter; -import org.apache.accumulo.core.file.blockfile.BlockFileReader; -import org.apache.accumulo.core.file.blockfile.BlockFileWriter; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache.Loader; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable; import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader; -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender; -import org.apache.accumulo.core.file.streams.PositionedOutput; +import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; import org.apache.accumulo.core.file.streams.RateLimitedInputStream; -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@@ -43,9 -47,10 +44,10 @@@ import org.apache.hadoop.fs.Seekable import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import com.google.common.cache.Cache; /** - * * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks * and metadatablocks */ @@@ -60,24 -147,24 +62,25 @@@ public class CachableBlockFile } /** - * - * * Class wraps the BCFile reader. - * */ - public static class Reader implements BlockFileReader { - + public static class Reader implements Closeable { private final RateLimiter readLimiter; -private BCFile.Reader _bc; -private final String fileName; -private BlockCache _dCache = null; -private BlockCache _iCache = null; +// private BCFile.Reader _bc; +private final String cacheId; +private final BlockCache _dCache; +private final BlockCache _iCache; + private Cache fileLenCache = null; -private InputStream fin = null; -private FileSystem fs; -private Configuration conf; +private volatile InputStream fin =
[accumulo] branch master updated (b484841 -> 5a0a631)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git. from b484841 Merge branch 'master' of github.com:apache/accumulo add 0c2e7e0 fixes #467 cache file lengths add 1180d76 Use full path as key in file len cache #467 new 5a0a631 Merge branch '1.9' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/core/file/FileOperations.java | 18 .../file/blockfile/impl/CachableBlockFile.java | 50 +- .../accumulo/core/file/rfile/RFileOperations.java | 5 ++- .../org/apache/accumulo/tserver/FileManager.java | 8 +++- .../tserver/TabletServerResourceManager.java | 7 ++- 5 files changed, 72 insertions(+), 16 deletions(-) -- To stop receiving notification emails like this one, please contact ktur...@apache.org.
[accumulo] branch 1.9 updated: Use full path as key in file len cache #467
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/1.9 by this push: new 1180d76 Use full path as key in file len cache #467 1180d76 is described below commit 1180d76f87315f59aa6310015aeed7fb19937012 Author: Keith TurnerAuthorDate: Tue May 22 17:21:28 2018 -0400 Use full path as key in file len cache #467 I had used just the file name as the file len cache key, however I realized file names created by older versions of Accumulo may not be unique. However, the path should be unique for older versions because the tablet dir+file name was unique. --- .../accumulo/core/file/blockfile/impl/CachableBlockFile.java | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index a8d1e74..336ec4d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -288,10 +288,9 @@ public class CachableBlockFile { this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration); } -private static long getFileLen(Cache fileLenCache, final FileSystem fs, -final Path path) throws IOException { +private long getFileLen(final Path path) throws IOException { try { -return fileLenCache.get(path.getName(), new Callable() { +return fileLenCache.get(fileName, new Callable() { @Override public Long call() throws Exception { return fs.getFileStatus(path).getLen(); @@ -316,14 +315,14 @@ public class CachableBlockFile { if (fileLenCache != null) { try { -init(fsIn, getFileLen(fileLenCache, fs, path), conf, accumuloConfiguration); +init(fsIn, getFileLen(path), conf, accumuloConfiguration); } catch (Exception e) { log.debug("Failed to open {}, clearing file length cache and retrying", fileName, e); -fileLenCache.invalidate(path.getName()); +fileLenCache.invalidate(fileName); } if (_bc == null) { -init(fsIn, getFileLen(fileLenCache, fs, path), conf, accumuloConfiguration); +init(fsIn, getFileLen(path), conf, accumuloConfiguration); } } else { init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration); -- To stop receiving notification emails like this one, please contact ktur...@apache.org.
[accumulo] branch master updated: Add retry mechanism to AccumuloReloadingVFSClassLoader (#453)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 18a9221 Add retry mechanism to AccumuloReloadingVFSClassLoader (#453) new b484841 Merge branch 'master' of github.com:apache/accumulo 18a9221 is described below commit 18a9221581c7676f748899b2a274ce04f71600f9 Author: Hannah Pellon <> AuthorDate: Mon Apr 30 18:51:10 2018 + Add retry mechanism to AccumuloReloadingVFSClassLoader (#453) * Also addresses issues mentioned in ACCUMULO-1507 --- .../vfs/AccumuloReloadingVFSClassLoader.java | 54 +- .../classloader/vfs/AccumuloVFSClassLoader.java| 17 --- .../vfs/AccumuloReloadingVFSClassLoaderTest.java | 54 ++ 3 files changed, 118 insertions(+), 7 deletions(-) diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java index 0f275c9..a084db3 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java @@ -45,10 +45,16 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC private static final Logger log = LoggerFactory.getLogger(AccumuloReloadingVFSClassLoader.class); - // set to 5 mins. The rational behind this large time is to avoid a gazillion tservers all asking + // set to 5 mins. The rationale behind this large time is to avoid a gazillion tservers all asking // the name node for info too frequently. private static final int DEFAULT_TIMEOUT = 5 * 60 * 1000; + private volatile long maxWaitInterval = 6; + + private volatile long maxRetries = -1; + + private volatile long sleepInterval = 5000; + private FileObject[] files; private VFSClassLoader cl; private final ReloadingClassLoader parent; @@ -79,6 +85,35 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC FileSystemManager vfs = AccumuloVFSClassLoader.generateVfs(); FileObject[] files = AccumuloVFSClassLoader.resolve(vfs, uris); + long retries = 0; + long currentSleepMillis = sleepInterval; + + if (files.length == 0) { +while (files.length == 0 && retryPermitted(retries)) { + + try { +log.debug("VFS path was empty. Waiting " + currentSleepMillis + " ms to retry"); +Thread.sleep(currentSleepMillis); + +files = AccumuloVFSClassLoader.resolve(vfs, uris); +retries++; + +currentSleepMillis = Math.min(maxWaitInterval, currentSleepMillis + sleepInterval); + + } catch (InterruptedException e) { +log.error("VFS Retry Interruped", e); +throw new RuntimeException(e); + } + +} +// There is a chance that the listener was removed from the top level directory or +// its children if they were deleted within some time window. Re-add files to be +// monitored. The Monitor will ignore files that are already/still being monitored. +for (FileObject file : files) { + monitor.addFile(file); +} + } + log.debug("Rebuilding dynamic classloader using files- {}", stringify(files)); VFSClassLoader cl; @@ -214,4 +249,21 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC return buf.toString(); } + // VisibleForTesting intentionally not using annotation from Guava because it adds unwanted + // dependency + void setMaxRetries(long maxRetries) { +this.maxRetries = maxRetries; + } + + long getMaxRetries() { +return maxRetries; + } + + long getMaxWaitInterval() { +return maxWaitInterval; + } + + private boolean retryPermitted(long retries) { +return (maxRetries < 0 || retries < maxRetries); + } } diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java index 7ee586c..3aa87eb 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java @@ -151,14 +151,19 @@ public class AccumuloVFSClassLoader { case IMAGINARY: // assume its a pattern String pattern = fo.getName().getBaseName(); - if (fo.getParent() != null && fo.getParent().getType() == FileType.FOLDER) { + if (fo.getParent() !=
[accumulo] branch 1.9 updated (a398f8b -> 0c2e7e0)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a change to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git. from a398f8b [maven-release-plugin] prepare for next development iteration add 0c2e7e0 fixes #467 cache file lengths No new revisions were added by this update. Summary of changes: .../apache/accumulo/core/file/FileOperations.java | 18 .../file/blockfile/impl/CachableBlockFile.java | 49 +++--- .../accumulo/core/file/rfile/RFileOperations.java | 5 ++- .../org/apache/accumulo/tserver/FileManager.java | 8 +++- .../tserver/TabletServerResourceManager.java | 7 +++- 5 files changed, 76 insertions(+), 11 deletions(-) -- To stop receiving notification emails like this one, please contact ktur...@apache.org.
[accumulo] branch master updated: #408 - Remove deprecated ClientConfiguration from MapReduce API (#489)
This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 1cbd22d #408 - Remove deprecated ClientConfiguration from MapReduce API (#489) 1cbd22d is described below commit 1cbd22d1354f3127033c83a79302b6e2eb59f7ec Author: Mike WalchAuthorDate: Tue May 22 11:22:52 2018 -0400 #408 - Remove deprecated ClientConfiguration from MapReduce API (#489) * Removed unnecessary fields in RangeInputSplit * Updated init method in AbstractInputFormat --- .../core/client/mapred/AbstractInputFormat.java| 84 +-- .../core/client/mapred/AccumuloOutputFormat.java | 12 +- .../core/client/mapreduce/AbstractInputFormat.java | 92 +--- .../client/mapreduce/AccumuloOutputFormat.java | 11 +- .../core/client/mapreduce/RangeInputSplit.java | 155 +--- .../core/client/mapreduce/impl/SplitUtils.java | 16 +-- .../mapreduce/lib/impl/ConfiguratorBase.java | 160 - .../mapreduce/lib/impl/InputConfigurator.java | 15 +- .../lib/impl/MapReduceClientOnDefaultTable.java| 9 +- .../core/client/mapred/RangeInputSplitTest.java| 12 -- .../core/client/mapreduce/RangeInputSplitTest.java | 12 -- .../client/mapreduce/impl/BatchInputSplitTest.java | 13 -- .../mapreduce/lib/impl/ConfiguratorBaseTest.java | 65 + .../core/client/impl/ConnectionInfoImpl.java | 2 +- .../minicluster/impl/MiniAccumuloClusterImpl.java | 3 +- .../test/mapred/AccumuloInputFormatIT.java | 4 - .../apache/accumulo/test/mapred/TokenFileIT.java | 3 +- .../test/mapreduce/AccumuloInputFormatIT.java | 48 --- .../accumulo/test/mapreduce/TokenFileIT.java | 6 +- 19 files changed, 257 insertions(+), 465 deletions(-) diff --git a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index 49bab9a..c38566b 100644 --- a/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/client/mapreduce/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -27,13 +27,13 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; @@ -49,7 +49,6 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; -import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; @@ -126,10 +125,21 @@ public abstract class AbstractInputFormat implements InputFormat { * Connection information for Accumulo * @since 2.0.0 */ - public static void setConnectionInfo(JobConf job, ConnectionInfo info) - throws AccumuloSecurityException { -setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); -setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + public static void setConnectionInfo(JobConf job, ConnectionInfo info) { +ConnectionInfo inputInfo = InputConfigurator.updateToken(job.getCredentials(), info); +InputConfigurator.setConnectionInfo(CLASS, job, inputInfo); + } + + /** + * Retrieves {@link ConnectionInfo} from the configuration + * + * @param job + * Hadoop job instance configuration + * @return {@link ConnectionInfo} object + * @since 2.0.0 + */ + protected static ConnectionInfo getConnectionInfo(JobConf job) { +return InputConfigurator.getConnectionInfo(CLASS, job); } /** @@ -259,7 +269,8 @@ public abstract class AbstractInputFormat implements InputFormat { * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ @Deprecated - public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { + public static void setZooKeeperInstance(JobConf