milleruntime commented on a change in pull request #743: New MapReduce API URL: https://github.com/apache/accumulo/pull/743#discussion_r230439334
########## File path: hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputInfo.java ########## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.hadoopImpl.mapreduce.InputInfoImpl; +import org.apache.hadoop.mapreduce.Job; + +/** + * Object containing all the information needed for the Map Reduce job. This object is passed to the + * different Input Format types see {@link AccumuloInputFormat#setInfo(Job, InputInfo)}. It uses a + * fluent API: + * + * <pre> + * InputInfo.builder().clientInfo(info).table(name).scanAuths(auths).build(); + * + * InputInfo.builder().clientProperties(props).table(name).scanAuths(auths).addIterator(cfg) + * .setExecutionHints(hints).build(); + * </pre> + * + * @since 2.0 + */ +public interface InputInfo { + /** + * @return the table name set using InputInfo.builder()...table(name) + */ + String getTableName(); + + /** + * @return the client info set using InputInfo.builder().clientInfo(info) + */ + ClientInfo getClientInfo(); + + /** + * @return the client properties set using InputInfo.builder().clientProperties(props) + */ + Properties getClientProperties(); + + /** + * @return the scan authorizations set using InputInfo.builder()...scanAuths(auths) + */ + Authorizations getScanAuths(); + + /** + * @return the context set using InputInfo.builder()...classLoaderContext(context) + */ + Optional<String> getContext(); + + /** + * @return the Ranges set using InputInfo.builder()...ranges(ranges) + */ + Collection<Range> getRanges(); + + /** + * @return the ColumnFamily,ColumnQualifier Pairs set using + * InputInfo.builder()...fetchColumns(cfcqPairs) + */ + Collection<Pair<byte[],byte[]>> getFetchColumns(); + + /** + * @return the collection of IteratorSettings set using InputInfo.builder()...addIterator(cfg) + */ + Collection<IteratorSetting> getIterators(); + + /** + * @return the SamplerConfiguration set using InputInfo.builder()...setSamplerConfiguration(cfg) + */ + Optional<SamplerConfiguration> getSamplerConfig(); + + /** + * @return the Execution Hints set using InputInfo.builder()...setExecutionHints(hints) + */ + Map<String,String> getExecutionHints(); + + /** + * @return boolean if auto adjusting ranges or not + */ + boolean isAutoAdjustRanges(); + + /** + * @return boolean if using scan isolation or not + */ + boolean isScanIsolation(); + + /** + * @return boolean if using local iterators or not + */ + boolean isLocalIterators(); + + /** + * @return boolean if using offline scan or not + */ + boolean isOfflineScan(); + + /** + * @return boolean if using batch scanner or not + */ + boolean isBatchScan(); + + /** + * Builder starting point for map reduce input format information. + */ + static InputInfoBuilder.ClientParams builder() { + return new InputInfoImpl.InputInfoBuilderImpl(); + } + + /** + * Required build values to be set. + */ + interface InputInfoBuilder { + interface ClientParams { + /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param clientInfo + * Accumulo connection information + */ + TableParams clientInfo(ClientInfo clientInfo); + + /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param clientProps + * Accumulo connection information + */ + TableParams clientProperties(Properties clientProps); + } + + interface TableParams { + /** + * Sets the name of the input table, over which this job will scan. + * + * @param tableName + * the table to use when the tablename is null in the write call + */ + AuthsParams table(String tableName); + } + + interface AuthsParams { + /** + * Sets the {@link Authorizations} used to scan. Must be a subset of the user's + * authorizations. If none present use {@link Authorizations#EMPTY} + * + * @param auths + * the user's authorizations + */ + InputFormatOptions scanAuths(Authorizations auths); + } + + interface BatchScanOptions { + /** + * @return newly created {@link InputInfo} + */ + InputInfo build(); + } + + interface NonBatchScanOptions { + /** + * @see InputFormatOptions#scanIsolation() + */ + NonBatchScanOptions scanIsolation(); + + /** + * @see InputFormatOptions#localIterators() + */ + NonBatchScanOptions localIterators(); + + /** + * @see InputFormatOptions#offlineScan() + */ + NonBatchScanOptions offlineScan(); + + /** + * @return newly created {@link InputInfo} + */ + InputInfo build(); + + } + + /** + * Optional values to set using fluent API + */ + interface InputFormatOptions { + /** + * Sets the name of the classloader context on this scanner + * + * @param context + * name of the classloader context + */ + InputFormatOptions classLoaderContext(String context); + + /** + * Sets the input ranges to scan for the single input table associated with this job. + * + * @param ranges + * the ranges that will be mapped over + * @see TableOperations#splitRangeByTablets(String, Range, int) + */ + InputFormatOptions ranges(Collection<Range> ranges); + + /** + * Restricts the columns that will be mapped over for this job for the default input table. + * + * @param cfcqPairs + * a pair of byte[] objects corresponding to column family and column qualifier. If + * the column qualifier is null, the entire column family is selected. An empty set + * is the default and is equivalent to scanning the all columns. + */ + InputFormatOptions fetchColumns(Collection<Pair<byte[],byte[]>> cfcqPairs); Review comment: What do you think of changing this to just "fetchColumn(byte[] colFam, byte[] colQual)" like the one we have in ScannerBase but byte[] instead of Text? Or we could use IteratorSetting.Column class from o.a.a.core.client and have just one parameter. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
