[
https://issues.apache.org/jira/browse/ACCUMULO-3602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14392011#comment-14392011
]
ASF GitHub Bot commented on ACCUMULO-3602:
------------------------------------------
Github user joshelser commented on a diff in the pull request:
https://github.com/apache/accumulo/pull/25#discussion_r27628943
--- Diff:
core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputSplit.java
---
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import
org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
+import
org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import
org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Base64;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.log4j.Level;
+
+/**
+ * Abstracts over configurations common to all InputSplits. Specifically
it leaves out methods
+ * related to number of ranges and locations per InputSplit as those vary
by implementation.
+ *
+ * @see RangeInputSplit
+ * @see BatchInputSplit
+ */
+public abstract class AccumuloInputSplit extends InputSplit implements
Writable {
+ protected String[] locations;
+ protected String tableId, tableName, instanceName, zooKeepers, principal;
+ protected TokenSource tokenSource;
+ protected String tokenFile;
+ protected AuthenticationToken token;
+ protected Boolean mockInstance;
+ protected Authorizations auths;
+ protected Set<Pair<Text,Text>> fetchedColumns;
+ protected List<IteratorSetting> iterators;
+ protected Level level;
+
+ public abstract float getProgress(Key currentKey);
+
+ public AccumuloInputSplit() {
+ locations = new String[0];
+ tableName = "";
+ tableId = "";
+ }
+
+ public AccumuloInputSplit(AccumuloInputSplit split) throws IOException {
+ this.setLocations(split.getLocations());
+ this.setTableName(split.getTableName());
+ this.setTableId(split.getTableId());
+ }
+
+ protected AccumuloInputSplit(String table, String tableId, String[]
locations) {
+ setLocations(locations);
+ this.tableName = table;
+ this.tableId = tableId;
+ }
+
+ private static byte[] extractBytes(ByteSequence seq, int numBytes) {
+ byte[] bytes = new byte[numBytes + 1];
+ bytes[0] = 0;
+ for (int i = 0; i < numBytes; i++) {
+ if (i >= seq.length())
+ bytes[i + 1] = 0;
+ else
+ bytes[i + 1] = seq.byteAt(i);
+ }
+ return bytes;
+ }
+
+ public static float getProgress(ByteSequence start, ByteSequence end,
ByteSequence position) {
+ int maxDepth = Math.min(Math.max(end.length(), start.length()),
position.length());
+ BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
+ BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
+ BigInteger positionBI = new BigInteger(extractBytes(position,
maxDepth));
+ return (float) (positionBI.subtract(startBI).doubleValue() /
endBI.subtract(startBI).doubleValue());
+ }
+
+ public long getRangeLength(Range range) throws IOException {
+ Text startRow = range.isInfiniteStartKey() ? new Text(new byte[]
{Byte.MIN_VALUE}) : range.getStartKey().getRow();
+ Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[]
{Byte.MAX_VALUE}) : range.getEndKey().getRow();
+ int maxCommon = Math.min(7, Math.min(startRow.getLength(),
stopRow.getLength()));
+ long diff = 0;
+
+ byte[] start = startRow.getBytes();
+ byte[] stop = stopRow.getBytes();
+ for (int i = 0; i < maxCommon; ++i) {
+ diff |= 0xff & (start[i] ^ stop[i]);
+ diff <<= Byte.SIZE;
+ }
+
+ if (startRow.getLength() != stopRow.getLength())
+ diff |= 0xff;
+
+ return diff + 1;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return Arrays.copyOf(locations, locations.length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tableName = in.readUTF();
+ tableId = in.readUTF();
+ int numLocs = in.readInt();
+ locations = new String[numLocs];
+ for (int i = 0; i < numLocs; ++i)
+ locations[i] = in.readUTF();
+
+ if (in.readBoolean()) {
+ mockInstance = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ int numColumns = in.readInt();
+ List<String> columns = new ArrayList<String>(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ columns.add(in.readUTF());
+ }
+
+ fetchedColumns =
InputConfigurator.deserializeFetchedColumns(columns);
+ }
+
+ if (in.readBoolean()) {
+ String strAuths = in.readUTF();
+ auths = new Authorizations(strAuths.getBytes(UTF_8));
+ }
+
+ if (in.readBoolean()) {
+ principal = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ int ordinal = in.readInt();
+ this.tokenSource = TokenSource.values()[ordinal];
+
+ switch (this.tokenSource) {
+ case INLINE:
+ String tokenClass = in.readUTF();
+ byte[] base64TokenBytes = in.readUTF().getBytes(UTF_8);
+ byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
+
+ this.token =
AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes);
+ break;
+
+ case FILE:
+ this.tokenFile = in.readUTF();
+
+ break;
+ default:
+ throw new IOException("Cannot parse unknown TokenSource
ordinal");
+ }
+ }
+
+ if (in.readBoolean()) {
+ instanceName = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ zooKeepers = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ int numIterators = in.readInt();
+ iterators = new ArrayList<IteratorSetting>(numIterators);
+ for (int i = 0; i < numIterators; i++) {
+ iterators.add(new IteratorSetting(in));
+ }
+ }
+
+ if (in.readBoolean()) {
+ level = Level.toLevel(in.readInt());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tableName);
+ out.writeUTF(tableId);
+ out.writeInt(locations.length);
+ for (int i = 0; i < locations.length; ++i)
+ out.writeUTF(locations[i]);
+
+ out.writeBoolean(null != mockInstance);
+ if (null != mockInstance) {
+ out.writeBoolean(mockInstance);
+ }
+
+ out.writeBoolean(null != fetchedColumns);
+ if (null != fetchedColumns) {
+ String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
+ out.writeInt(cols.length);
+ for (String col : cols) {
+ out.writeUTF(col);
+ }
+ }
+
+ out.writeBoolean(null != auths);
+ if (null != auths) {
+ out.writeUTF(auths.serialize());
+ }
+
+ out.writeBoolean(null != principal);
+ if (null != principal) {
+ out.writeUTF(principal);
+ }
+
+ out.writeBoolean(null != tokenSource);
+ if (null != tokenSource) {
+ out.writeInt(tokenSource.ordinal());
+
+ if (null != token && null != tokenFile) {
+ throw new IOException("Cannot use both inline AuthenticationToken
and file-based AuthenticationToken");
+ } else if (null != token) {
+ out.writeUTF(token.getClass().getCanonicalName());
+
out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
+ } else {
+ out.writeUTF(tokenFile);
+ }
+ }
+
+ out.writeBoolean(null != instanceName);
+ if (null != instanceName) {
+ out.writeUTF(instanceName);
+ }
+
+ out.writeBoolean(null != zooKeepers);
+ if (null != zooKeepers) {
+ out.writeUTF(zooKeepers);
+ }
+
+ out.writeBoolean(null != iterators);
+ if (null != iterators) {
+ out.writeInt(iterators.size());
+ for (IteratorSetting iterator : iterators) {
+ iterator.write(out);
+ }
+ }
+
+ out.writeBoolean(null != level);
+ if (null != level) {
+ out.writeInt(level.toInt());
+ }
+ }
+
+ /**
+ * Use {@link #getTableName}
+ */
+ @Deprecated
+ public String getTable() {
+ return getTableName();
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Use {@link #setTableName}
+ */
+ @Deprecated
+ public void setTable(String table) {
+ setTableName(table);
+ }
+
+ public void setTableName(String table) {
+ this.tableName = table;
+ }
+
+ public void setTableId(String tableId) {
+ this.tableId = tableId;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ /**
+ * @see #getInstance(ClientConfiguration)
+ */
+ @Deprecated
--- End diff --
Thanks for catching this already!
> BatchScanner optimization for AccumuloInputFormat
> -------------------------------------------------
>
> Key: ACCUMULO-3602
> URL: https://issues.apache.org/jira/browse/ACCUMULO-3602
> Project: Accumulo
> Issue Type: Improvement
> Components: client
> Affects Versions: 1.6.1, 1.6.2
> Reporter: Eugene Cheipesh
> Assignee: Eugene Cheipesh
> Labels: performance
> Fix For: 1.7.0
>
>
> Currently {{AccumuloInputFormat}} produces a split for reach {{Range}}
> specified in the configuration. Some table indexing schemes, for instance
> z-order geospacial index, produce large number of small ranges resulting in
> large number of splits. This is specifically a concern when using
> {{AccumuloInputFormat}} as a source for Spark RDD where each Split is mapped
> to an RDD partition.
> Large number of small RDD partitions leads to poor parallism on read and high
> overhead on processing. A desirable alternative is to group ranges by tablet
> into a single split and use {{BatchScanner}} to produce the records. Grouping
> by tablets is useful because it represents Accumulos attempt to distributed
> stored records and can be influance by the user through table splits.
> The grouping functionality already exists in the internal {{TabletLocator}}
> class.
> Current proposal is to modify {{AbstractInputFormat}} such that it generates
> either {{RangeInputSplit}} or {{MultiRangeInputSplit}} based on a new setting
> in {{InputConfigurator}}. {{AccumuloInputFormat}} would then be able to
> inspect the type of the split and instantiate an appropriate reader.
> The functinality of {{TabletLocator}} should be exposed as a public API in
> 1.7 as it is useful for optimizations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)