Author: kturner Date: Tue May 29 15:50:07 2012 New Revision: 1343790 URL: http://svn.apache.org/viewvc?rev=1343790&view=rev Log: ACCUMULO-580 roll back changes to add batch size to batch scanner
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java Tue May 29 15:50:07 2012 @@ -25,6 +25,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.impl.ScannerOptions; import org.apache.accumulo.core.client.mock.IteratorAdapter; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -52,7 +53,9 @@ import org.apache.hadoop.io.Text; * the source scanner (which will execute server side) and to the client side scanner (which will execute client side). */ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner { + private int size; private int timeOut; + private Range range; private boolean isolated = false; @@ -131,9 +134,9 @@ public class ClientSideIteratorScanner e */ public ClientSideIteratorScanner(Scanner scanner) { smi = new ScannerTranslator(scanner); - setRange(scanner.getRange()); - setBatchSize(scanner.getBatchSize()); - setTimeOut(scanner.getTimeOut()); + this.range = new Range((Key) null, (Key) null); + this.size = Constants.SCAN_BATCH_SIZE; + this.timeOut = Integer.MAX_VALUE; } /** @@ -147,7 +150,7 @@ public class ClientSideIteratorScanner e @Override public Iterator<Entry<Key,Value>> iterator() { - smi.scanner.setBatchSize(getBatchSize()); + smi.scanner.setBatchSize(size); smi.scanner.setTimeOut(timeOut); if (isolated) smi.scanner.enableIsolation(); @@ -225,6 +228,16 @@ public class ClientSideIteratorScanner e } @Override + public void setBatchSize(int size) { + this.size = size; + } + + @Override + public int getBatchSize() { + return size; + } + + @Override public void enableIsolation() { this.isolated = true; } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java Tue May 29 15:50:07 2012 @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map.Entry; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.impl.IsolationException; import org.apache.accumulo.core.client.impl.ScannerOptions; import org.apache.accumulo.core.data.ByteSequence; @@ -209,6 +210,7 @@ public class IsolatedScanner extends Sca private Scanner scanner; private Range range; private int timeOut; + private int batchSize; private RowBufferFactory bufferFactory; public IsolatedScanner(Scanner scanner) { @@ -217,15 +219,15 @@ public class IsolatedScanner extends Sca public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) { this.scanner = scanner; - setRange(scanner.getRange()); - setBatchSize(scanner.getBatchSize()); - setTimeOut(scanner.getTimeOut()); + this.range = new Range(); + this.timeOut = Integer.MAX_VALUE; + this.batchSize = Constants.SCAN_BATCH_SIZE; this.bufferFactory = bufferFactory; } @Override public Iterator<Entry<Key,Value>> iterator() { - return new RowBufferingIterator(scanner, this, range, timeOut, getBatchSize(), bufferFactory); + return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, bufferFactory); } @Override @@ -250,6 +252,16 @@ public class IsolatedScanner extends Sca } @Override + public void setBatchSize(int size) { + this.batchSize = size; + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override public void enableIsolation() { // aye aye captain, already done sir } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java Tue May 29 15:50:07 2012 @@ -58,6 +58,21 @@ public interface Scanner extends Scanner public Range getRange(); /** + * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server. + * + * @param size + * the number of Key/Value pairs to fetch per call to Accumulo + */ + public void setBatchSize(int size); + + /** + * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from a tablet server. + * + * @return the batch size configured for this scanner + */ + public int getBatchSize(); + + /** * Enables row isolation. Writes that occur to a row after a scan of that row has begun will not be seen if this option is enabled. */ public void enableIsolation(); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java Tue May 29 15:50:07 2012 @@ -92,21 +92,6 @@ public interface ScannerBase extends Ite public void clearScanIterators(); /** - * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server. - * - * @param size - * the number of Key/Value pairs to fetch per call to Accumulo - */ - public void setBatchSize(int size); - - /** - * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from a tablet server. - * - * @return the batch size configured for this scanner - */ - public int getBatchSize(); - - /** * Returns an iterator over an accumulo table. This iterator uses the options that are currently set for its lifetime, so setting options will have no effect * on existing iterators. * Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java Tue May 29 15:50:07 2012 @@ -104,7 +104,7 @@ class OfflineIterator implements Iterato return new MultiIterator(allIters, false); } } - + private SortedKeyValueIterator<Key,Value> iter; private Range range; private KeyExtent currentExtent; @@ -114,7 +114,7 @@ class OfflineIterator implements Iterato private Instance instance; private ScannerOptions options; private ArrayList<SortedKeyValueIterator<Key,Value>> readers; - + /** * @param offlineScanner * @param instance @@ -130,7 +130,7 @@ class OfflineIterator implements Iterato if (this.options.fetchedColumns.size() > 0) { range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); } - + this.tableId = table.toString(); this.authorizations = authorizations; this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>(); @@ -141,12 +141,12 @@ class OfflineIterator implements Iterato while (iter != null && !iter.hasTop()) nextTablet(); - + } catch (Exception e) { throw new RuntimeException(e); } } - + @Override public boolean hasNext() { return iter != null && iter.hasTop(); @@ -158,7 +158,7 @@ class OfflineIterator implements Iterato byte[] v = iter.getTopValue().get(); // copy just like tablet server does, do this before calling next KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length)); - + iter.next(); while (iter != null && !iter.hasTop()) @@ -195,19 +195,19 @@ class OfflineIterator implements Iterato iter = null; return; } - + if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) { iter = null; return; } - + nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false); } - + List<String> relFiles = new ArrayList<String>(); Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles); - + while (eloc.getSecond() != null) { if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { Tables.clearCache(instance); @@ -226,7 +226,7 @@ class OfflineIterator implements Iterato if (!extent.getTableId().toString().equals(tableId)) { throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent); } - + if (currentExtent != null && !extent.isPreviousExtent(currentExtent)) throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent); @@ -259,7 +259,7 @@ class OfflineIterator implements Iterato while (row.hasNext()) { Entry<Key,Value> entry = row.next(); Key key = entry.getKey(); - + if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { relFiles.add(key.getColumnQualifier().toString()); } @@ -272,11 +272,11 @@ class OfflineIterator implements Iterato if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { extent = new KeyExtent(key.getRow(), entry.getValue()); } - + } return new Pair<KeyExtent,String>(extent, location); } - + /** * @param absFiles * @return @@ -299,7 +299,7 @@ class OfflineIterator implements Iterato } readers.clear(); - + // TODO need to close files for (String file : absFiles) { FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); @@ -326,7 +326,7 @@ class OfflineIterator implements Iterato return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList, options.serverSideIteratorOptions, iterEnv, false)); } - + @Override public void remove() { throw new UnsupportedOperationException(); @@ -339,6 +339,7 @@ class OfflineIterator implements Iterato */ public class OfflineScanner extends ScannerOptions implements Scanner { + private int batchSize; private int timeOut; private Range range; @@ -353,12 +354,13 @@ public class OfflineScanner extends Scan this.credentials = credentials; this.tableId = new Text(tableId); this.range = new Range((Key) null, (Key) null); - + this.authorizations = authorizations; + this.batchSize = Constants.SCAN_BATCH_SIZE; this.timeOut = Integer.MAX_VALUE; } - + @Override public void setTimeOut(int timeOut) { this.timeOut = timeOut; @@ -380,6 +382,16 @@ public class OfflineScanner extends Scan } @Override + public void setBatchSize(int size) { + this.batchSize = size; + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override public void enableIsolation() { } @@ -393,5 +405,5 @@ public class OfflineScanner extends Scan public Iterator<Entry<Key,Value>> iterator() { return new OfflineIterator(this, instance, credentials, authorizations, tableId, range); } - + } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java Tue May 29 15:50:07 2012 @@ -30,6 +30,7 @@ package org.apache.accumulo.core.client. import java.util.Iterator; import java.util.Map.Entry; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -52,6 +53,7 @@ public class ScannerImpl extends Scanner private Authorizations authorizations; private Text table; + private int size; private int timeOut; private Range range; @@ -65,6 +67,7 @@ public class ScannerImpl extends Scanner this.range = new Range((Key) null, (Key) null); this.authorizations = authorizations; + this.size = Constants.SCAN_BATCH_SIZE; this.timeOut = Integer.MAX_VALUE; } @@ -95,12 +98,25 @@ public class ScannerImpl extends Scanner return range; } + @Override + public synchronized void setBatchSize(int size) { + if (size > 0) + this.size = size; + else + throw new IllegalArgumentException("size must be greater than zero"); + } + + @Override + public synchronized int getBatchSize() { + return size; + } + /** * Returns an iterator over an accumulo table. This iterator uses the options that are currently set on the scanner for its lifetime. So setting options on a * Scanner object will have no effect on existing iterators. */ public synchronized Iterator<Entry<Key,Value>> iterator() { - return new ScannerIterator(instance, credentials, table, authorizations, range, getBatchSize(), timeOut, this, isolated); + return new ScannerIterator(instance, credentials, table, authorizations, range, size, timeOut, this, isolated); } @Override Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java Tue May 29 15:50:07 2012 @@ -27,7 +27,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Column; @@ -47,8 +46,6 @@ public class ScannerOptions implements S private String regexIterName = null; - private int size = Constants.SCAN_BATCH_SIZE; - protected ScannerOptions() {} public ScannerOptions(ScannerOptions so) { @@ -102,7 +99,7 @@ public class ScannerOptions implements S serverSideIteratorOptions.remove(iteratorName); } - + /** * Override any existing options on the given named iterator */ @@ -181,19 +178,6 @@ public class ScannerOptions implements S } @Override - public synchronized void setBatchSize(int size) { - if (size > 0) - this.size = size; - else - throw new IllegalArgumentException("size must be greater than zero"); - } - - @Override - public synchronized int getBatchSize() { - return size; - } - - @Override public Iterator<Entry<Key,Value>> iterator() { throw new UnsupportedOperationException(); } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Tue May 29 15:50:07 2012 @@ -69,6 +69,7 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; + public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> { private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class); @@ -81,7 +82,7 @@ public class TabletServerBatchReaderIter private final ExecutorService queryThreadPool; private final ScannerOptions options; - private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue; + private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(1000); private Entry<Key,Value> nextEntry = null; private Object nextLock = new Object(); @@ -130,7 +131,6 @@ public class TabletServerBatchReaderIter this.numThreads = numThreads; this.queryThreadPool = queryThreadPool; this.options = new ScannerOptions(scannerOptions); - this.resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(this.options.getBatchSize()); if (options.fetchedColumns.size() > 0) { ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size()); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java Tue May 29 15:50:07 2012 @@ -32,6 +32,7 @@ import org.apache.accumulo.core.security public class MockScanner extends MockScannerBase implements Scanner { int timeOut = 0; + int batchSize = 0; Range range = new Range(); MockScanner(MockTable table, Authorizations auths) { @@ -59,6 +60,16 @@ public class MockScanner extends MockSca } @Override + public void setBatchSize(int size) { + this.batchSize = size; + } + + @Override + public int getBatchSize() { + return this.batchSize; + } + + @Override public void enableIsolation() {} @Override