Repository: carbondata
Updated Branches:
  refs/heads/master 0e1d37e64 -> 27b178d98


[CARBONDATA-1359] Unable to use carbondata on hive

This closes #1231


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/27b178d9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/27b178d9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/27b178d9

Branch: refs/heads/master
Commit: 27b178d98777a019c0439192bf967e24acde358c
Parents: 0e1d37e
Author: shivangi <shivangi.gu...@knoldus.in>
Authored: Fri Aug 4 13:25:02 2017 +0530
Committer: chenliang613 <chenliang...@apache.org>
Committed: Thu Aug 17 19:00:47 2017 +0800

----------------------------------------------------------------------
 .../hive/MapredCarbonInputFormat.java           | 59 ++++++++++----------
 1 file changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/27b178d9/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 5190846..273536a 100644
--- 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -52,32 +52,6 @@ public class MapredCarbonInputFormat extends 
CarbonInputFormat<ArrayWritable>
     implements InputFormat<Void, ArrayWritable>, 
CombineHiveInputFormat.AvoidSplitCombination {
   private static final String CARBON_TABLE = 
"mapreduce.input.carboninputformat.table";
 
-  @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) 
throws IOException {
-    org.apache.hadoop.mapreduce.JobContext jobContext = 
Job.getInstance(jobConf);
-    List<org.apache.hadoop.mapreduce.InputSplit> splitList = 
super.getSplits(jobContext);
-    InputSplit[] splits = new InputSplit[splitList.size()];
-    CarbonInputSplit split;
-    for (int i = 0; i < splitList.size(); i++) {
-      split = (CarbonInputSplit) splitList.get(i);
-      splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), 
split.getPath(), split.getStart(),
-          split.getLength(), split.getLocations(), 
split.getNumberOfBlocklets(), split.getVersion(),
-          split.getBlockStorageIdMap());
-    }
-    return splits;
-  }
-
-  @Override
-  public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit 
inputSplit, JobConf jobConf,
-      Reporter reporter) throws IOException {
-    String path = null;
-    if (inputSplit instanceof CarbonHiveInputSplit) {
-      path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
-    }
-    QueryModel queryModel = getQueryModel(jobConf, path);
-    CarbonReadSupport<ArrayWritable> readSupport = new 
CarbonDictionaryDecodeReadSupport<>();
-    return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, 
jobConf);
-  }
-
   /**
    * this method will read the schema from the physical file and populate into 
CARBON_TABLE
    *
@@ -94,7 +68,7 @@ public class MapredCarbonInputFormat extends 
CarbonInputFormat<ArrayWritable>
     } else {
       if (paths != null) {
         for (String inputPath : inputPaths) {
-          if (paths.startsWith(inputPath)) {
+          if (paths.startsWith(inputPath.replace("file:", ""))) {
             validInputPath = inputPath;
             break;
           }
@@ -106,6 +80,7 @@ public class MapredCarbonInputFormat extends 
CarbonInputFormat<ArrayWritable>
     // read the schema file to get the absoluteTableIdentifier having the 
correct table id
     // persisted in the schema
     CarbonTable carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    configuration.set(CARBON_TABLE, 
ObjectSerializationUtil.convertObjectToString(carbonTable));
     setTableInfo(configuration, carbonTable.getTableInfo());
   }
 
@@ -117,6 +92,32 @@ public class MapredCarbonInputFormat extends 
CarbonInputFormat<ArrayWritable>
     return (CarbonTable) 
ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
+  @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) 
throws IOException {
+    org.apache.hadoop.mapreduce.JobContext jobContext = 
Job.getInstance(jobConf);
+    List<org.apache.hadoop.mapreduce.InputSplit> splitList = 
super.getSplits(jobContext);
+    InputSplit[] splits = new InputSplit[splitList.size()];
+    CarbonInputSplit split;
+    for (int i = 0; i < splitList.size(); i++) {
+      split = (CarbonInputSplit) splitList.get(i);
+      splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), 
split.getPath(), split.getStart(),
+          split.getLength(), split.getLocations(), 
split.getNumberOfBlocklets(), split.getVersion(),
+          split.getBlockStorageIdMap());
+    }
+    return splits;
+  }
+
+  @Override
+  public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit 
inputSplit, JobConf jobConf,
+      Reporter reporter) throws IOException {
+    String path = null;
+    if (inputSplit instanceof CarbonHiveInputSplit) {
+      path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
+    }
+    QueryModel queryModel = getQueryModel(jobConf, path);
+    CarbonReadSupport<ArrayWritable> readSupport = new 
CarbonDictionaryDecodeReadSupport<>();
+    return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, 
jobConf);
+  }
+
   private QueryModel getQueryModel(Configuration configuration, String path) 
throws IOException {
     CarbonTable carbonTable = getCarbonTable(configuration, path);
     // getting the table absoluteTableIdentifier from the carbonTable
@@ -128,8 +129,8 @@ public class MapredCarbonInputFormat extends 
CarbonInputFormat<ArrayWritable>
     String projection = getProjection(configuration, carbonTable,
         identifier.getCarbonTableIdentifier().getTableName());
     CarbonQueryPlan queryPlan = 
CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
-    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, 
carbonTable,
-        new DataTypeConverterImpl());
+    QueryModel queryModel =
+        QueryModel.createModel(identifier, queryPlan, carbonTable, new 
DataTypeConverterImpl());
     // set the filter to the query model in order to filter blocklet before 
scan
     Expression filter = getFilterPredicates(configuration);
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);

Reply via email to