Repository: carbondata Updated Branches: refs/heads/master 0e1d37e64 -> 27b178d98
[CARBONDATA-1359] Unable to use carbondata on hive This closes #1231 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/27b178d9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/27b178d9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/27b178d9 Branch: refs/heads/master Commit: 27b178d98777a019c0439192bf967e24acde358c Parents: 0e1d37e Author: shivangi <shivangi.gu...@knoldus.in> Authored: Fri Aug 4 13:25:02 2017 +0530 Committer: chenliang613 <chenliang...@apache.org> Committed: Thu Aug 17 19:00:47 2017 +0800 ---------------------------------------------------------------------- .../hive/MapredCarbonInputFormat.java | 59 ++++++++++---------- 1 file changed, 30 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/27b178d9/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 5190846..273536a 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -52,32 +52,6 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination { private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; - @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); - List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext); - InputSplit[] splits = new InputSplit[splitList.size()]; - CarbonInputSplit split; - for (int i = 0; i < splitList.size(); i++) { - split = (CarbonInputSplit) splitList.get(i); - splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(), - split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), split.getVersion(), - split.getBlockStorageIdMap()); - } - return splits; - } - - @Override - public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, - Reporter reporter) throws IOException { - String path = null; - if (inputSplit instanceof CarbonHiveInputSplit) { - path = ((CarbonHiveInputSplit) inputSplit).getPath().toString(); - } - QueryModel queryModel = getQueryModel(jobConf, path); - CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>(); - return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf); - } - /** * this method will read the schema from the physical file and populate into CARBON_TABLE * @@ -94,7 +68,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> } else { if (paths != null) { for (String inputPath : inputPaths) { - if (paths.startsWith(inputPath)) { + if (paths.startsWith(inputPath.replace("file:", ""))) { validInputPath = inputPath; break; } @@ -106,6 +80,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> // read the schema file to get the absoluteTableIdentifier having the correct table id // persisted in the schema CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); + configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); setTableInfo(configuration, carbonTable.getTableInfo()); } @@ -117,6 +92,32 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); } + @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); + List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext); + InputSplit[] splits = new InputSplit[splitList.size()]; + CarbonInputSplit split; + for (int i = 0; i < splitList.size(); i++) { + split = (CarbonInputSplit) splitList.get(i); + splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(), + split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), split.getVersion(), + split.getBlockStorageIdMap()); + } + return splits; + } + + @Override + public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + String path = null; + if (inputSplit instanceof CarbonHiveInputSplit) { + path = ((CarbonHiveInputSplit) inputSplit).getPath().toString(); + } + QueryModel queryModel = getQueryModel(jobConf, path); + CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>(); + return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf); + } + private QueryModel getQueryModel(Configuration configuration, String path) throws IOException { CarbonTable carbonTable = getCarbonTable(configuration, path); // getting the table absoluteTableIdentifier from the carbonTable @@ -128,8 +129,8 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> String projection = getProjection(configuration, carbonTable, identifier.getCarbonTableIdentifier().getTableName()); CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); - QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable, - new DataTypeConverterImpl()); + QueryModel queryModel = + QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl()); // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);