[ 
https://issues.apache.org/jira/browse/ACCUMULO-3602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14391987#comment-14391987
 ] 

ASF GitHub Bot commented on ACCUMULO-3602:
------------------------------------------

Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/25#discussion_r27628573
  
    --- Diff: 
core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
 ---
    @@ -433,24 +435,76 @@ public void initialize(InputSplit inSplit, JobConf 
job) throws IOException {
           // but the scanner will use the table id resolved at job setup time
           InputTableConfig tableConfig = getInputTableConfig(job, 
split.getTableName());
     
    -      Boolean isOffline = split.isOffline();
    -      if (null == isOffline) {
    -        isOffline = tableConfig.isOfflineScan();
    -      }
    +      log.debug("Creating connector with user: " + principal);
    +      log.debug("Creating scanner for table: " + table);
    +      log.debug("Authorizations are: " + authorizations);
     
    -      Boolean isIsolated = split.isIsolatedScan();
    -      if (null == isIsolated) {
    -        isIsolated = tableConfig.shouldUseIsolatedScanners();
    -      }
    +      if (split instanceof 
org.apache.accumulo.core.client.mapreduce.RangeInputSplit) {
    +        org.apache.accumulo.core.client.mapreduce.RangeInputSplit 
rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
     
    -      Boolean usesLocalIterators = split.usesLocalIterators();
    -      if (null == usesLocalIterators) {
    -        usesLocalIterators = tableConfig.shouldUseLocalIterators();
    -      }
    +        Boolean isOffline = rangeSplit.isOffline();
    +        if (null == isOffline) {
    +          isOffline = tableConfig.isOfflineScan();
    +        }
    +
    +        Boolean isIsolated = rangeSplit.isIsolatedScan();
    +        if (null == isIsolated) {
    +          isIsolated = tableConfig.shouldUseIsolatedScanners();
    +        }
     
    -      List<IteratorSetting> iterators = split.getIterators();
    -      if (null == iterators) {
    -        iterators = tableConfig.getIterators();
    +        Boolean usesLocalIterators = rangeSplit.usesLocalIterators();
    +        if (null == usesLocalIterators) {
    +          usesLocalIterators = tableConfig.shouldUseLocalIterators();
    +        }
    +
    +        Scanner scanner;
    +
    +        try {
    +          if (isOffline) {
    +            scanner = new OfflineScanner(instance, new 
Credentials(principal, token), split.getTableId(), authorizations);
    +          } else if (instance instanceof MockInstance) {
    +            scanner = instance.getConnector(principal, 
token).createScanner(split.getTableName(), authorizations);
    +          } else {
    +            ClientConfiguration clientConf = getClientConfiguration(job);
    +            ClientContext context = new ClientContext(instance, new 
Credentials(principal, token), clientConf);
    +            scanner = new ScannerImpl(context, split.getTableId(), 
authorizations);
    +          }
    +          if (isIsolated) {
    +            log.info("Creating isolated scanner");
    +            scanner = new IsolatedScanner(scanner);
    +          }
    +          if (usesLocalIterators) {
    +            log.info("Using local iterators");
    +            scanner = new ClientSideIteratorScanner(scanner);
    +          }
    +          setupIterators(job, scanner, split.getTableName(), split);
    +        } catch (Exception e) {
    +          throw new IOException(e);
    +        }
    +
    +        scanner.setRange(rangeSplit.getRange());
    +
    +        // do this last after setting all scanner options
    +        scannerIterator = scanner.iterator();
    --- End diff --
    
    Duplicated call to `scanner.iterator()`. Looks like you do it down below 
too?


> BatchScanner optimization for AccumuloInputFormat
> -------------------------------------------------
>
>                 Key: ACCUMULO-3602
>                 URL: https://issues.apache.org/jira/browse/ACCUMULO-3602
>             Project: Accumulo
>          Issue Type: Improvement
>          Components: client
>    Affects Versions: 1.6.1, 1.6.2
>            Reporter: Eugene Cheipesh
>            Assignee: Eugene Cheipesh
>              Labels: performance
>             Fix For: 1.7.0
>
>
> Currently {{AccumuloInputFormat}} produces a split for reach {{Range}} 
> specified in the configuration. Some table indexing schemes, for instance 
> z-order geospacial index, produce large number of small ranges resulting in 
> large number of splits. This is specifically a concern when using 
> {{AccumuloInputFormat}} as a source for Spark RDD where each Split is mapped 
> to an RDD partition.
> Large number of small RDD partitions leads to poor parallism on read and high 
> overhead on processing. A desirable alternative is to group ranges by tablet 
> into a single split and use {{BatchScanner}} to produce the records. Grouping 
> by tablets is useful because it represents Accumulos attempt to distributed 
> stored records and can be influance by the user through table splits.
> The grouping functionality already exists in the internal {{TabletLocator}} 
> class. 
> Current proposal is to modify {{AbstractInputFormat}} such that it generates 
> either {{RangeInputSplit}} or {{MultiRangeInputSplit}} based on a new setting 
> in {{InputConfigurator}}.  {{AccumuloInputFormat}} would then be able to 
> inspect the type of the split and instantiate an appropriate reader.
> The functinality of {{TabletLocator}} should be exposed as a public API in 
> 1.7 as it is useful for optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to