Author: kturner
Date: Tue May 29 15:50:07 2012
New Revision: 1343790

URL: http://svn.apache.org/viewvc?rev=1343790&view=rev
Log:
ACCUMULO-580 roll back changes to add batch size to batch scanner

Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
 Tue May 29 15:50:07 2012
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -52,7 +53,9 @@ import org.apache.hadoop.io.Text;
  * the source scanner (which will execute server side) and to the client side 
scanner (which will execute client side).
  */
 public class ClientSideIteratorScanner extends ScannerOptions implements 
Scanner {
+  private int size;
   private int timeOut;
+  
   private Range range;
   private boolean isolated = false;
   
@@ -131,9 +134,9 @@ public class ClientSideIteratorScanner e
    */
   public ClientSideIteratorScanner(Scanner scanner) {
     smi = new ScannerTranslator(scanner);
-    setRange(scanner.getRange());
-    setBatchSize(scanner.getBatchSize());
-    setTimeOut(scanner.getTimeOut());
+    this.range = new Range((Key) null, (Key) null);
+    this.size = Constants.SCAN_BATCH_SIZE;
+    this.timeOut = Integer.MAX_VALUE;
   }
   
   /**
@@ -147,7 +150,7 @@ public class ClientSideIteratorScanner e
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    smi.scanner.setBatchSize(getBatchSize());
+    smi.scanner.setBatchSize(size);
     smi.scanner.setTimeOut(timeOut);
     if (isolated)
       smi.scanner.enableIsolation();
@@ -225,6 +228,16 @@ public class ClientSideIteratorScanner e
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.size = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return size;
+  }
+  
+  @Override
   public void enableIsolation() {
     this.isolated = true;
   }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
 Tue May 29 15:50:07 2012
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -209,6 +210,7 @@ public class IsolatedScanner extends Sca
   private Scanner scanner;
   private Range range;
   private int timeOut;
+  private int batchSize;
   private RowBufferFactory bufferFactory;
   
   public IsolatedScanner(Scanner scanner) {
@@ -217,15 +219,15 @@ public class IsolatedScanner extends Sca
   
   public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
     this.scanner = scanner;
-    setRange(scanner.getRange());
-    setBatchSize(scanner.getBatchSize());
-    setTimeOut(scanner.getTimeOut());
+    this.range = new Range();
+    this.timeOut = Integer.MAX_VALUE;
+    this.batchSize = Constants.SCAN_BATCH_SIZE;
     this.bufferFactory = bufferFactory;
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    return new RowBufferingIterator(scanner, this, range, timeOut, 
getBatchSize(), bufferFactory);
+    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, 
bufferFactory);
   }
   
   @Override
@@ -250,6 +252,16 @@ public class IsolatedScanner extends Sca
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {
     // aye aye captain, already done sir
   }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java 
(original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java 
Tue May 29 15:50:07 2012
@@ -58,6 +58,21 @@ public interface Scanner extends Scanner
   public Range getRange();
   
   /**
+   * Sets the number of Key/Value pairs that will be fetched at a time from a 
tablet server.
+   * 
+   * @param size
+   *          the number of Key/Value pairs to fetch per call to Accumulo
+   */
+  public void setBatchSize(int size);
+  
+  /**
+   * Returns the batch size (number of Key/Value pairs) that will be fetched 
at a time from a tablet server.
+   * 
+   * @return the batch size configured for this scanner
+   */
+  public int getBatchSize();
+  
+  /**
    * Enables row isolation. Writes that occur to a row after a scan of that 
row has begun will not be seen if this option is enabled.
    */
   public void enableIsolation();

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
 Tue May 29 15:50:07 2012
@@ -92,21 +92,6 @@ public interface ScannerBase extends Ite
   public void clearScanIterators();
   
   /**
-   * Sets the number of Key/Value pairs that will be fetched at a time from a 
tablet server.
-   * 
-   * @param size
-   *          the number of Key/Value pairs to fetch per call to Accumulo
-   */
-  public void setBatchSize(int size);
-  
-  /**
-   * Returns the batch size (number of Key/Value pairs) that will be fetched 
at a time from a tablet server.
-   * 
-   * @return the batch size configured for this scanner
-   */
-  public int getBatchSize();
-  
-  /**
    * Returns an iterator over an accumulo table. This iterator uses the 
options that are currently set for its lifetime, so setting options will have 
no effect
    * on existing iterators.
    * 

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
 Tue May 29 15:50:07 2012
@@ -104,7 +104,7 @@ class OfflineIterator implements Iterato
       return new MultiIterator(allIters, false);
     }
   }
-  
+
   private SortedKeyValueIterator<Key,Value> iter;
   private Range range;
   private KeyExtent currentExtent;
@@ -114,7 +114,7 @@ class OfflineIterator implements Iterato
   private Instance instance;
   private ScannerOptions options;
   private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-  
+
   /**
    * @param offlineScanner
    * @param instance
@@ -130,7 +130,7 @@ class OfflineIterator implements Iterato
     if (this.options.fetchedColumns.size() > 0) {
       range = range.bound(this.options.fetchedColumns.first(), 
this.options.fetchedColumns.last());
     }
-    
+
     this.tableId = table.toString();
     this.authorizations = authorizations;
     this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
@@ -141,12 +141,12 @@ class OfflineIterator implements Iterato
       
       while (iter != null && !iter.hasTop())
         nextTablet();
-      
+
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
-  
+
   @Override
   public boolean hasNext() {
     return iter != null && iter.hasTop();
@@ -158,7 +158,7 @@ class OfflineIterator implements Iterato
       byte[] v = iter.getTopValue().get();
       // copy just like tablet server does, do this before calling next
       KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, 
v.length));
-      
+
       iter.next();
       
       while (iter != null && !iter.hasTop())
@@ -195,19 +195,19 @@ class OfflineIterator implements Iterato
         iter = null;
         return;
       }
-      
+
       if (range.afterEndKey(new 
Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) {
         iter = null;
         return;
       }
-      
+
       nextRange = new Range(currentExtent.getMetadataEntry(), false, null, 
false);
     }
-    
+
     List<String> relFiles = new ArrayList<String>();
     
     Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-    
+
     while (eloc.getSecond() != null) {
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         Tables.clearCache(instance);
@@ -226,7 +226,7 @@ class OfflineIterator implements Iterato
     if (!extent.getTableId().toString().equals(tableId)) {
       throw new AccumuloException(" did not find tablets for table " + tableId 
+ " " + extent);
     }
-    
+
     if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
       throw new AccumuloException(" " + currentExtent + " is not previous 
extent " + extent);
     
@@ -259,7 +259,7 @@ class OfflineIterator implements Iterato
     while (row.hasNext()) {
       Entry<Key,Value> entry = row.next();
       Key key = entry.getKey();
-      
+
       if 
(key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
         relFiles.add(key.getColumnQualifier().toString());
       }
@@ -272,11 +272,11 @@ class OfflineIterator implements Iterato
       if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
         extent = new KeyExtent(key.getRow(), entry.getValue());
       }
-      
+
     }
     return new Pair<KeyExtent,String>(extent, location);
   }
-  
+
   /**
    * @param absFiles
    * @return
@@ -299,7 +299,7 @@ class OfflineIterator implements Iterato
     }
     
     readers.clear();
-    
+
     // TODO need to close files
     for (String file : absFiles) {
       FileSKVIterator reader = FileOperations.getInstance().openReader(file, 
false, fs, conf, acuTableConf, null, null);
@@ -326,7 +326,7 @@ class OfflineIterator implements Iterato
     return 
iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, 
visFilter, extent, acuTableConf, options.serverSideIteratorList,
         options.serverSideIteratorOptions, iterEnv, false));
   }
-  
+
   @Override
   public void remove() {
     throw new UnsupportedOperationException();
@@ -339,6 +339,7 @@ class OfflineIterator implements Iterato
  */
 public class OfflineScanner extends ScannerOptions implements Scanner {
   
+  private int batchSize;
   private int timeOut;
   private Range range;
   
@@ -353,12 +354,13 @@ public class OfflineScanner extends Scan
     this.credentials = credentials;
     this.tableId = new Text(tableId);
     this.range = new Range((Key) null, (Key) null);
-    
+
     this.authorizations = authorizations;
     
+    this.batchSize = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
-  
+
   @Override
   public void setTimeOut(int timeOut) {
     this.timeOut = timeOut;
@@ -380,6 +382,16 @@ public class OfflineScanner extends Scan
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {
     
   }
@@ -393,5 +405,5 @@ public class OfflineScanner extends Scan
   public Iterator<Entry<Key,Value>> iterator() {
     return new OfflineIterator(this, instance, credentials, authorizations, 
tableId, range);
   }
-  
+
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
 Tue May 29 15:50:07 2012
@@ -30,6 +30,7 @@ package org.apache.accumulo.core.client.
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
@@ -52,6 +53,7 @@ public class ScannerImpl extends Scanner
   private Authorizations authorizations;
   private Text table;
   
+  private int size;
   private int timeOut;
   
   private Range range;
@@ -65,6 +67,7 @@ public class ScannerImpl extends Scanner
     this.range = new Range((Key) null, (Key) null);
     this.authorizations = authorizations;
     
+    this.size = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
   
@@ -95,12 +98,25 @@ public class ScannerImpl extends Scanner
     return range;
   }
   
+  @Override
+  public synchronized void setBatchSize(int size) {
+    if (size > 0)
+      this.size = size;
+    else
+      throw new IllegalArgumentException("size must be greater than zero");
+  }
+  
+  @Override
+  public synchronized int getBatchSize() {
+    return size;
+  }
+  
   /**
    * Returns an iterator over an accumulo table. This iterator uses the 
options that are currently set on the scanner for its lifetime. So setting 
options on a
    * Scanner object will have no effect on existing iterators.
    */
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, 
range, getBatchSize(), timeOut, this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, 
range, size, timeOut, this, isolated);
   }
   
   @Override

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
 Tue May 29 15:50:07 2012
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.data.Column;
@@ -47,8 +46,6 @@ public class ScannerOptions implements S
   
   private String regexIterName = null;
   
-  private int size = Constants.SCAN_BATCH_SIZE;
-  
   protected ScannerOptions() {}
   
   public ScannerOptions(ScannerOptions so) {
@@ -102,7 +99,7 @@ public class ScannerOptions implements S
     
     serverSideIteratorOptions.remove(iteratorName);
   }
-  
+    
   /**
    * Override any existing options on the given named iterator
    */
@@ -181,19 +178,6 @@ public class ScannerOptions implements S
   }
   
   @Override
-  public synchronized void setBatchSize(int size) {
-    if (size > 0)
-      this.size = size;
-    else
-      throw new IllegalArgumentException("size must be greater than zero");
-  }
-  
-  @Override
-  public synchronized int getBatchSize() {
-    return size;
-  }
-  
-  @Override
   public Iterator<Entry<Key,Value>> iterator() {
     throw new UnsupportedOperationException();
   }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
 Tue May 29 15:50:07 2012
@@ -69,6 +69,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+
 public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value>> {
   
   private static final Logger log = 
Logger.getLogger(TabletServerBatchReaderIterator.class);
@@ -81,7 +82,7 @@ public class TabletServerBatchReaderIter
   private final ExecutorService queryThreadPool;
   private final ScannerOptions options;
   
-  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue;
+  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new 
ArrayBlockingQueue<Entry<Key,Value>>(1000);
   private Entry<Key,Value> nextEntry = null;
   private Object nextLock = new Object();
   
@@ -130,7 +131,6 @@ public class TabletServerBatchReaderIter
     this.numThreads = numThreads;
     this.queryThreadPool = queryThreadPool;
     this.options = new ScannerOptions(scannerOptions);
-    this.resultsQueue = new 
ArrayBlockingQueue<Entry<Key,Value>>(this.options.getBatchSize());
     
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java?rev=1343790&r1=1343789&r2=1343790&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
 Tue May 29 15:50:07 2012
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.security
 public class MockScanner extends MockScannerBase implements Scanner {
   
   int timeOut = 0;
+  int batchSize = 0;
   Range range = new Range();
   
   MockScanner(MockTable table, Authorizations auths) {
@@ -59,6 +60,16 @@ public class MockScanner extends MockSca
   }
   
   @Override
+  public void setBatchSize(int size) {
+    this.batchSize = size;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+  
+  @Override
   public void enableIsolation() {}
   
   @Override


Reply via email to