PHOENIX-3486 RoundRobinResultIterator doesn't work correctly because of setting Scan's cache size inappropriately in PhoenixInputForamt
Signed-off-by: Sergey Soldatov <s...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/275421bc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/275421bc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/275421bc Branch: refs/heads/master Commit: 275421bcd6f20dd748a9b2e63df13426832547ba Parents: 130f29d Author: Jeongdae Kim <kjd9...@gmail.com> Authored: Mon Nov 21 11:03:19 2016 +0900 Committer: Sergey Soldatov <s...@apache.org> Committed: Sun Feb 19 17:14:31 2017 -0800 ---------------------------------------------------------------------- .../hive/mapreduce/PhoenixInputFormat.java | 27 ++++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/275421bc/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 7eab317..3a94655 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HConnection; @@ -142,11 +143,8 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri .newJobContext(new Job(jobConf))); boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1); - if (LOG.isDebugEnabled()) { - LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize); - } + setScanCacheSize(jobConf); // Adding Localization HConnection connection = HConnectionManager.createConnection(jobConf); @@ -166,10 +164,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri if (splitByStats) { for (Scan aScan : scans) { - if (scanCacheSize > 0) { - aScan.setCaching(scanCacheSize); - } - if (LOG.isDebugEnabled()) { LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + @@ -183,12 +177,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri psplits.add(inputSplit); } } else { - if (scanCacheSize > 0) { - for (Scan aScan : scans) { - aScan.setCaching(scanCacheSize); - } - } - if (LOG.isDebugEnabled()) { LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans @@ -216,6 +204,17 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri return psplits; } + private void setScanCacheSize(JobConf jobConf) { + int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1); + if (scanCacheSize > 0) { + jobConf.setInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, scanCacheSize); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize); + } + } + @Override public RecordReader<WritableComparable, T> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws