Repository: carbondata
Updated Branches:
  refs/heads/master 269f4c378 -> 6e58418eb


[CARBONDATA-3062] Fix Compatibility issue with cache_level as blocklet

In case of hybrid store we can have block as well as blocklet schema.
Scenario:
When there is a hybrid store in which few loads are from legacy store which do 
not contain the blocklet information and hence they will be, by
default have cache_level as BLOCK and few loads with latest store which contain 
the BLOCKLET information and have cache_level BLOCKLET. For these
type of scenarios we need to have separate task and footer schemas. For all 
loads with/without blocklet info there will not be any additional cost
of maintaining 2 variables

This closes #2883


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e58418e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e58418e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e58418e

Branch: refs/heads/master
Commit: 6e58418eb15effbf60290d2e1b8ff06f8613d714
Parents: 269f4c3
Author: Indhumathi27 <indhumathi...@gmail.com>
Authored: Tue Oct 30 21:38:56 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Fri Nov 2 10:54:49 2018 +0530

----------------------------------------------------------------------
 .../block/SegmentPropertiesAndSchemaHolder.java | 82 +++++++++++++-------
 .../indexstore/blockletindex/BlockDataMap.java  |  2 +-
 .../blockletindex/BlockletDataMap.java          |  2 +-
 ...ithColumnMetCacheAndCacheLevelProperty.scala |  2 +-
 .../merger/RowResultMergerProcessor.java        |  6 +-
 5 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index cc6341b..1b7e1f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -284,11 +284,17 @@ public class SegmentPropertiesAndSchemaHolder {
     private int[] columnCardinality;
     private SegmentProperties segmentProperties;
     private List<CarbonColumn> minMaxCacheColumns;
-    private CarbonRowSchema[] taskSummarySchema;
-    // same variable can be used for block and blocklet schema because at any 
given cache_level
-    // with either block or blocklet and whenever cache_level is changed the 
cache and its
-    // corresponding segmentProperties is flushed
-    private CarbonRowSchema[] fileFooterEntrySchema;
+    // in case of hybrid store we can have block as well as blocklet schema
+    // Scenario: When there is a hybrid store in which few loads are from 
legacy store which do
+    // not contain the blocklet information and hence they will be, by default 
have cache_level as
+    // BLOCK and few loads with latest store which contain the BLOCKLET 
information and have
+    // cache_level BLOCKLET. For these type of scenarios we need to have 
separate task and footer
+    // schemas. For all loads with/without blocklet info there will not be any 
additional cost
+    // of maintaining 2 variables
+    private CarbonRowSchema[] taskSummarySchemaForBlock;
+    private CarbonRowSchema[] taskSummarySchemaForBlocklet;
+    private CarbonRowSchema[] fileFooterEntrySchemaForBlock;
+    private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet;
 
     public SegmentPropertiesWrapper(CarbonTable carbonTable,
         List<ColumnSchema> columnsInTable, int[] columnCardinality) {
@@ -314,8 +320,10 @@ public class SegmentPropertiesAndSchemaHolder {
       if (null != minMaxCacheColumns) {
         minMaxCacheColumns.clear();
       }
-      taskSummarySchema = null;
-      fileFooterEntrySchema = null;
+      taskSummarySchemaForBlock = null;
+      taskSummarySchemaForBlocklet = null;
+      fileFooterEntrySchemaForBlock = null;
+      fileFooterEntrySchemaForBlocklet = null;
     }
 
     @Override public boolean equals(Object obj) {
@@ -350,48 +358,62 @@ public class SegmentPropertiesAndSchemaHolder {
       return columnCardinality;
     }
 
-    public CarbonRowSchema[] getTaskSummarySchema(boolean storeBlockletCount,
+    public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean 
storeBlockletCount,
         boolean filePathToBeStored) throws MemoryException {
-      if (null == taskSummarySchema) {
+      if (null == taskSummarySchemaForBlock) {
         synchronized (taskSchemaLock) {
-          if (null == taskSummarySchema) {
-            taskSummarySchema = SchemaGenerator
+          if (null == taskSummarySchemaForBlock) {
+            taskSummarySchemaForBlock = SchemaGenerator
                 .createTaskSummarySchema(segmentProperties, 
minMaxCacheColumns, storeBlockletCount,
                     filePathToBeStored);
           }
         }
       }
-      return taskSummarySchema;
+      return taskSummarySchemaForBlock;
+    }
+
+    public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean 
storeBlockletCount,
+        boolean filePathToBeStored) throws MemoryException {
+      if (null == taskSummarySchemaForBlocklet) {
+        synchronized (taskSchemaLock) {
+          if (null == taskSummarySchemaForBlocklet) {
+            taskSummarySchemaForBlocklet = SchemaGenerator
+                .createTaskSummarySchema(segmentProperties, 
minMaxCacheColumns, storeBlockletCount,
+                    filePathToBeStored);
+          }
+        }
+      }
+      return taskSummarySchemaForBlocklet;
     }
 
     public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
-      return getOrCreateFileFooterEntrySchema(true);
+      if (null == fileFooterEntrySchemaForBlock) {
+        synchronized (fileFooterSchemaLock) {
+          if (null == fileFooterEntrySchemaForBlock) {
+            fileFooterEntrySchemaForBlock =
+                SchemaGenerator.createBlockSchema(segmentProperties, 
minMaxCacheColumns);
+          }
+        }
+      }
+      return fileFooterEntrySchemaForBlock;
     }
 
     public CarbonRowSchema[] getBlockletFileFooterEntrySchema() {
-      return getOrCreateFileFooterEntrySchema(false);
+      if (null == fileFooterEntrySchemaForBlocklet) {
+        synchronized (fileFooterSchemaLock) {
+          if (null == fileFooterEntrySchemaForBlocklet) {
+            fileFooterEntrySchemaForBlocklet =
+                SchemaGenerator.createBlockletSchema(segmentProperties, 
minMaxCacheColumns);
+          }
+        }
+      }
+      return fileFooterEntrySchemaForBlocklet;
     }
 
     public List<CarbonColumn> getMinMaxCacheColumns() {
       return minMaxCacheColumns;
     }
 
-    private CarbonRowSchema[] getOrCreateFileFooterEntrySchema(boolean 
isCacheLevelBlock) {
-      if (null == fileFooterEntrySchema) {
-        synchronized (fileFooterSchemaLock) {
-          if (null == fileFooterEntrySchema) {
-            if (isCacheLevelBlock) {
-              fileFooterEntrySchema =
-                  SchemaGenerator.createBlockSchema(segmentProperties, 
minMaxCacheColumns);
-            } else {
-              fileFooterEntrySchema =
-                  SchemaGenerator.createBlockletSchema(segmentProperties, 
minMaxCacheColumns);
-            }
-          }
-        }
-      }
-      return fileFooterEntrySchema;
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 3ab5923..67405f4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -1006,7 +1006,7 @@ public class BlockDataMap extends CoarseGrainDataMap
         SegmentPropertiesAndSchemaHolder.getInstance()
             .getSegmentPropertiesWrapper(segmentPropertiesIndex);
     try {
-      return segmentPropertiesWrapper.getTaskSummarySchema(true, 
isFilePathStored);
+      return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, 
isFilePathStored);
     } catch (MemoryException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 242fc9e..390e92f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -87,7 +87,7 @@ public class BlockletDataMap extends BlockDataMap implements 
Serializable {
         SegmentPropertiesAndSchemaHolder.getInstance()
             .getSegmentPropertiesWrapper(segmentPropertiesIndex);
     try {
-      return segmentPropertiesWrapper.getTaskSummarySchema(false, 
isFilePathStored);
+      return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, 
isFilePathStored);
     } catch (MemoryException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 10a3be8..1c54c48 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -92,7 +92,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
       expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = {
     val index = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
     val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
-      
.getSegmentPropertiesWrapper(index).getTaskSummarySchema(storeBlockletCount, 
false)
+      
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount,
 false)
     val minSchemas = 
summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
       .getChildSchemas
     minSchemas.length == expectedLength

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 6475ba8..83e630b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -30,9 +30,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
@@ -74,7 +72,6 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
     this.loadModel = loadModel;
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
 
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     String carbonStoreLocation;
     if (partitionSpec != null) {
       carbonStoreLocation =
@@ -86,7 +83,8 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
               loadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = 
CarbonFactDataHandlerModel
-        .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, 
tableName,
+        .getCarbonFactDataHandlerModel(loadModel,
+            loadModel.getCarbonDataLoadSchema().getCarbonTable(), segProp, 
tableName,
             tempStoreLocation, carbonStoreLocation);
     setDataFileAttributesInModel(loadModel, compactionType, 
carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);

Reply via email to