This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 10030da  [CARBONDATA-3548]Renamed index_handler to spatial_index 
property and indexColumn is changed to spatialColumn
10030da is described below

commit 10030da58633e48413e7fbe71deb98d0a5ac6a17
Author: Venu Reddy <k.venureddy2...@gmail.com>
AuthorDate: Thu May 7 00:22:04 2020 +0530

    [CARBONDATA-3548]Renamed index_handler to spatial_index property and 
indexColumn is changed to spatialColumn
    
    Why is this PR needed?
    Current code base has many types of indexes. To avoid the confusion and be 
more specific, have changed the index_handler property to spatial_index. Also 
changed the isIndexColumn/setIndexColumn to isSpatialColumn/setSpatialColumn 
respectively.
    
    What changes were proposed in this PR?
    Have changed the index_handler property to spatial_index. Also changed the 
isIndexColumn/setIndexColumn to isSpatialColumn/setSpatialColumn respectively.
    Documentation is updated accordingly.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3750
---
 .../core/constants/CarbonCommonConstants.java      | 17 ++---
 .../ThriftWrapperSchemaConverterImpl.java          |  4 +-
 .../core/metadata/schema/table/CarbonTable.java    |  2 +-
 .../metadata/schema/table/column/CarbonColumn.java |  8 +--
 .../metadata/schema/table/column/ColumnSchema.java | 34 ++++++---
 .../core/util/AbstractDataFileFooterConverter.java |  2 +-
 .../apache/carbondata/core/util/CarbonUtil.java    |  2 +-
 .../apache/carbondata/core/util/CustomIndex.java   | 26 +++----
 .../ThriftWrapperSchemaConverterImplTest.java      |  3 +-
 docs/spatial-index-guide.md                        | 46 ++++++------
 format/src/main/thrift/schema.thrift               |  4 +-
 .../org/apache/carbondata/geo/GeoConstants.java    | 29 ++++++++
 .../geo/{GeoHashImpl.java => GeoHashIndex.java}    | 68 +++++++++---------
 .../org/apache/carbondata/geo/QuadTreeCls.java     |  2 +-
 .../geo/scan/expression/PolygonExpression.java     | 14 ++--
 .../org/apache/carbondata/spark/util/Util.java     |  2 +-
 .../scala/org/apache/carbondata/geo/GeoUtils.scala | 23 +++---
 .../spark/load/DataLoadProcessorStepOnSpark.scala  |  4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |  4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala    | 45 ++++++------
 .../carbondata/spark/util/CarbonSparkUtil.scala    |  2 +-
 .../spark/sql/catalyst/CarbonParserUtil.scala      | 81 +++++++++++-----------
 .../command/carbonTableSchemaCommon.scala          |  4 +-
 .../management/CarbonInsertIntoCommand.scala       |  4 +-
 .../command/management/CommonLoadUtils.scala       |  2 +-
 .../schema/CarbonAlterTableAddColumnCommand.scala  |  2 +-
 ...nAlterTableColRenameDataTypeChangeCommand.scala |  4 +-
 .../schema/CarbonAlterTableDropColumnCommand.scala |  4 +-
 .../table/CarbonDescribeFormattedCommand.scala     | 14 ++--
 .../datasources/SparkCarbonTableFormat.scala       | 12 ++--
 .../org/apache/spark/sql/hive/CarbonRelation.scala |  5 +-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |  4 +-
 .../org/apache/spark/util/AlterTableUtil.scala     | 30 ++++----
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 64 ++++++++---------
 .../loading/CarbonDataLoadConfiguration.java       | 24 +++----
 .../processing/loading/DataLoadProcessBuilder.java |  2 +-
 .../converter/impl/FieldEncoderFactory.java        |  4 +-
 .../loading/converter/impl/RowConverterImpl.java   | 10 +--
 ...pl.java => SpatialIndexFieldConverterImpl.java} | 10 +--
 .../processing/loading/model/CarbonLoadModel.java  | 24 +++----
 .../loading/parser/impl/RowParserImpl.java         |  4 +-
 .../InputProcessorStepWithNoConverterImpl.java     |  8 +--
 .../processing/util/CarbonDataProcessorUtil.java   |  4 +-
 43 files changed, 355 insertions(+), 306 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ccdbb5a..43965a2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -460,14 +460,15 @@ public final class CarbonCommonConstants {
   public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
 
   /**
-   * Index handler table property. It allows user to create a new sort column 
from the set of
-   * existing schema columns. And can generate value for the new column after 
parsing each row
-   * through custom handler.
-   */
-  public static final String INDEX_HANDLER = "index_handler";
-
-  // GeoHash index handler type
-  public static final String GEOHASH = "geohash";
+   * Spatial index table property. It allows user to create a new index column 
implicitly from the
+   * set of existing table schema columns(specified with the sourcecolumns 
sub-property). Newly
+   * created column is implicitly treated as a sort column. Row value for the 
new column is
+   * generated from the corresponding row values of its sourcecolumns during 
the data load process.
+   * CarbonCore provides an abstract class {@link 
org.apache.carbondata.core.util.CustomIndex} such
+   * that different types of index implementations adhere to the contracts and 
still have their
+   * customized behavior.
+   */
+  public static final String SPATIAL_INDEX = "spatial_index";
 
   public static final String SORT_COLUMNS = "sort_columns";
   public static final String SORT_SCOPE = "sort_scope";
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 8cfd48f..406e30d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -208,7 +208,7 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     thriftColumnSchema.setInvisible(wrapperColumnSchema.isInvisible());
     
thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     
thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
-    thriftColumnSchema.setIndexColumn(wrapperColumnSchema.isIndexColumn());
+    thriftColumnSchema.setSpatialColumn(wrapperColumnSchema.isSpatialColumn());
     if (wrapperColumnSchema.isSortColumn()) {
       Map<String, String> properties = 
wrapperColumnSchema.getColumnProperties();
       if (null == properties) {
@@ -509,7 +509,7 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     wrapperColumnSchema.setInvisible(externalColumnSchema.isInvisible());
     
wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId());
     
wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
+    
wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     wrapperColumnSchema.setSortColumn(false);
     Map<String, String> properties = 
externalColumnSchema.getColumnProperties();
     if (properties != null) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 11e1c53..c678c73 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -301,7 +301,7 @@ public class CarbonTable implements Serializable, Writable {
   private void fillCreateOrderColumn() {
     List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
     for (CarbonDimension dimension : visibleDimensions) {
-      if (!dimension.getColumnSchema().isIndexColumn()) {
+      if (!dimension.getColumnSchema().isSpatialColumn()) {
         columns.add(dimension);
       }
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 862148f..b095cb2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -167,11 +167,11 @@ public class CarbonColumn implements Serializable {
   }
 
   /**
-   * Checks if it is index column
-   * @return Returns True if the column is an index column. Otherwise returns 
false.
+   * Checks if it is spatial index column
+   * @return Returns True if the column is an spatial index column. Otherwise 
returns False.
    */
-  public boolean isIndexColumn() {
-    return columnSchema.isIndexColumn();
+  public boolean isSpatialColumn() {
+    return columnSchema.isSpatialColumn();
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index b21add4..c43b76f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -121,7 +121,10 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
 
   private boolean isSortColumn = false;
 
-  private boolean indexColumn = false;
+  /**
+   *  Whether it is a spatial index column
+   */
+  private boolean spatialColumn = false;
 
   /**
    * aggregate function used in pre aggregate table
@@ -535,7 +538,7 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       }
     }
     out.writeBoolean(isLocalDictColumn);
-    out.writeBoolean(indexColumn);
+    out.writeBoolean(spatialColumn);
   }
 
   @Override
@@ -585,7 +588,7 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       }
     }
     this.isLocalDictColumn = in.readBoolean();
-    this.indexColumn = in.readBoolean();
+    this.spatialColumn = in.readBoolean();
   }
 
   /**
@@ -597,14 +600,6 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
         .contains(".val") || this.getColumnName().contains(".");
   }
 
-  public boolean isIndexColumn() {
-    return indexColumn;
-  }
-
-  public void setIndexColumn(boolean indexColumn) {
-    this.indexColumn = indexColumn;
-  }
-
   public ColumnSchema clone() {
     try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos)) {
@@ -617,4 +612,21 @@ public class ColumnSchema implements Serializable, 
Writable, Cloneable {
       throw new RuntimeException("Error occur while cloning ColumnSchema", e);
     }
   }
+
+  /**
+   * Checks whether it is a spatial index column.
+   * @return Returns True if the column is a spatial index column. Otherwise 
returns False.
+   */
+  public boolean isSpatialColumn() {
+    return spatialColumn;
+  }
+
+  /**
+   * Set the column spatial index property. True or False to indicate column 
is a spatial index
+   * column or not respectively.
+   * @param spatialColumn True or False
+   */
+  public void setSpatialColumn(boolean spatialColumn) {
+    this.spatialColumn = spatialColumn;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index e49aacc..5e505b3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -308,7 +308,7 @@ public abstract class AbstractDataFileFooterConverter {
         wrapperColumnSchema.setSortColumn(true);
       }
     }
-    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
+    
wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     
wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
     List<org.apache.carbondata.format.ParentColumnTableRelation> 
parentColumnTableRelation =
         externalColumnSchema.getParentColumnTableRelations();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 57bb093..55864aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1989,7 +1989,7 @@ public final class CarbonUtil {
     wrapperColumnSchema.setScale(externalColumnSchema.getScale());
     
wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
     
wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
-    wrapperColumnSchema.setIndexColumn(externalColumnSchema.isIndexColumn());
+    
wrapperColumnSchema.setSpatialColumn(externalColumnSchema.isSpatialColumn());
     Map<String, String> properties = 
externalColumnSchema.getColumnProperties();
     if (properties != null) {
       if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java 
b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
index d570e1a..36d0a79 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
@@ -23,23 +23,25 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Abstract class for custom index handler. When index_handler property is 
configured on table, a
+ * Abstract class for Custom Index implementation. When index property is 
configured on table, a
  * new column is created within carbon layer from the set of schema columns in 
the table.
- * A custom implementation need to be provided to extract the sub-properties 
of index handler such
- * as type, source columns etc, generate the value for the new column from the 
source column values,
- * query processor to handle the custom UDF filter queries based on source 
columns.
- * This class is an abstract for the custom implementation.
+ * An Index implementation class must extend this class and provide the 
concrete implementation
+ * for following abstract methods:
+ * 1. Init method to extract and store the sub-properties of index property. 
Such as index type,
+ *    sourcecolumns etc.
+ * 2. Generate method to generate the row value for the index column from 
corresponding row values
+ *    of its source columns.
+ * 3. Query method to process the custom UDF filter queries based on source 
columns.
  * @param <ReturnType>
  */
 public abstract class CustomIndex<ReturnType> implements Serializable {
-  public static final String CUSTOM_INDEX_DEFAULT_IMPL = 
"org.apache.carbondata.geo.GeoHashImpl";
   /**
-   * Initialize the custom index handler instance.
-   * @param handlerName
+   * Initialize the custom index instance.
+   * @param indexName
    * @param properties
    * @throws Exception
    */
-  public abstract void init(String handlerName, Map<String, String> 
properties) throws Exception;
+  public abstract void init(String indexName, Map<String, String> properties) 
throws Exception;
 
   /**
    * Generates the custom index column value from the given source columns.
@@ -50,7 +52,7 @@ public abstract class CustomIndex<ReturnType> implements 
Serializable {
   public abstract String generate(List<?> columns) throws Exception;
 
   /**
-   * Query processor for custom index handler.
+   * Query processor for custom index.
    * @param query
    * @return Returns list of ranges to be fetched
    * @throws Exception
@@ -58,7 +60,7 @@ public abstract class CustomIndex<ReturnType> implements 
Serializable {
   public abstract ReturnType query(String query) throws Exception;
 
   /**
-   * Deserializes and returns the custom handler instance
+   * Deserializes and returns the custom index instance
    * @param serializedInstance
    * @return
    * @throws IOException
@@ -69,7 +71,7 @@ public abstract class CustomIndex<ReturnType> implements 
Serializable {
   }
 
   /**
-   * Serializes the custom handler instance
+   * Serializes the custom index instance
    * @param instance
    * @return
    * @throws IOException
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 70e8645..fadc2ad 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -158,7 +158,8 @@ public class ThriftWrapperSchemaConverterImplTest {
         return thriftColumnSchema;
       }
 
-      @Mock public org.apache.carbondata.format.ColumnSchema 
setIndexColumn(boolean indexColumn) {
+      @Mock public org.apache.carbondata.format.ColumnSchema setSpatialColumn(
+          boolean spatialColumn) {
         return thriftColumnSchema;
       }
 
diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index a9010a0..f97ee38 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -23,15 +23,15 @@
 
 # How does CarbonData implement spatial index
 
-There are many open source implementations for spatial indexing and to process 
spatial queries. CarbonData implements a different way of spatial index. Its 
core idea is to use the raster data. Raster is made up of matrix of cells 
organized into rows and columns(called a grid). Each cell represents a 
coordinate. And the index for that coodrinate is generated using longitude and 
latitude, like the [Z order curve](https://en.wikipedia.org/wiki/Z-order_curve).
+There are many open source implementations for spatial indexing and to process 
spatial queries. CarbonData implements a different way of spatial index. Its 
core idea is to use the raster data. Raster is made up of matrix of cells 
organized into rows and columns(called a grid). Each cell represents a 
coordinate. The index for the coordinate is generated using longitude and 
latitude, like the [Z order curve](https://en.wikipedia.org/wiki/Z-order_curve).
 
 CarbonData rasterize the user data during data load into segments. A set of 
latitude and longitude represents a grid range. The size of the grid can be 
configured. Hence, the coordinates loaded are often discrete and not continuous.
 
-Below figure shows the relationship between the grid and the points residing 
in it. Black point represents the center point of the grid, and the red points 
are the coordinates at the arbitrary positions inside the grid. The red points 
can be replaced by the center point of the grid to indicate that the points 
lies within the grid. During data load, CarbonData generates an index for 
coordinate according to row and column of the grid(in the raster) where that 
coordinate lies. These indices [...]
+Below figure shows the relationship between the grid and the points residing 
in it. Black point represents the center point of the grid, and the red points 
are the coordinates at the arbitrary positions inside the grid. The red points 
can be replaced by the center point of the grid to indicate that the points 
lies within the grid. During data load, CarbonData generates an index for the 
coordinate according to row and column of the grid(in the raster) where that 
coordinate lies. These ind [...]
 
 ![File Directory Structure](../docs/images/spatial-index-1.png?raw=true)
 
-Carbon supports Polygon User Defined Function(UDF) as filter condition in the 
query to return all the data points lying within it. Polygon UDF takes multiple 
points(i.e., pair of longitude and latitude) separated by a comma. Longitude 
and latitude in the pair are separated by a space. And the first and last 
points in the polygon must be form a closed loop. CarbonData builds a quad tree 
using this polygon and spatial region information passed while creating a 
table. The nodes in the quad  [...]
+Carbon supports Polygon User Defined Function(UDF) as filter condition in the 
query to return all the data points lying within it. Polygon UDF takes multiple 
points(i.e., pair of longitude and latitude) separated by a comma. Longitude 
and latitude in the pair are separated by a space. The first and last points in 
the polygon must be same to form a closed loop. CarbonData builds a quad tree 
using this polygon and spatial region information passed while creating a 
table. The nodes in the q [...]
 The main reasons for faster query response are as follows :
 * Data is sorted based on the index values.
 * Polygon UDF filter is pushed down from engine to the carbon layer such that 
CarbonData scans only matched blocklets avoiding full scan.
@@ -51,16 +51,16 @@ Create table with spatial index table properties
 
 ```
 create table source_index(id BIGINT, latitude long, longitude long) stored by 
'carbondata' TBLPROPERTIES (
-'INDEX_HANDLER'='mygeohash',  
-'INDEX_HANDLER.mygeohash.type'='geohash',   
-'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',   
-'INDEX_HANDLER.mygeohash.originLatitude'='19.832277',   
-'INDEX_HANDLER.mygeohash.gridSize'='50',   
-'INDEX_HANDLER.mygeohash.minLongitude'='1.811865',   
-'INDEX_HANDLER.mygeohash.maxLongitude'='2.782233',   
-'INDEX_HANDLER.mygeohash.minLatitude'='19.832277',   
-'INDEX_HANDLER.mygeohash.maxLatitude'='20.225281',   
-'INDEX_HANDLER.mygeohash.conversionRatio'='1000000');
+'SPATIAL_INDEX'='mygeohash',
+'SPATIAL_INDEX.mygeohash.type'='geohash',
+'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+'SPATIAL_INDEX.mygeohash.originLatitude'='19.832277',
+'SPATIAL_INDEX.mygeohash.gridSize'='50',
+'SPATIAL_INDEX.mygeohash.minLongitude'='1.811865',
+'SPATIAL_INDEX.mygeohash.maxLongitude'='2.782233',
+'SPATIAL_INDEX.mygeohash.minLatitude'='19.832277',
+'SPATIAL_INDEX.mygeohash.maxLatitude'='20.225281',
+'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000');
 ```
 Note: `mygeohash` in the above example represent the index name.
 
@@ -68,16 +68,16 @@ Note: `mygeohash` in the above example represent the index 
name.
 
 |Name|Description|
 
|-----------------------------------|-----------------------------------------------------------------------------------------|
-| INDEX_HANDLER | Used to configure Index name. This name is appended to 
`INDEX_HANDLER` in the subsequent sub-property configurations. `xxx` in the 
below sub-properties refer to index name.|
-| INDEX_HANDLER.xxx.type | Type of algorithm for processing spatial data. 
Currently, supports only 'geohash'.|
-| INDEX_HANDLER.xxx.sourcecolumns | longitude and latitude column names as in 
the table. These columns are used to generate index value for each row.|
-| INDEX_HANDLER.xxx.gridSize | Grid size of raster data in metres. Currently, 
spatial index supports raster data.|
-| INDEX_HANDLER.xxx.minLongitude | Minimum longitude of the gridded 
rectangular area.|
-| INDEX_HANDLER.xxx.maxLongitude | Maximum longitude of the gridded 
rectangular area.|
-| INDEX_HANDLER.xxx.minLatitude | Minimum latitude of the gridded rectangular 
area.|
-| INDEX_HANDLER.xxx.maxLatitude | Maximum latitude of the gridded rectangular 
area.|
-| INDEX_HANDLER.xxx.conversionRatio | Conversion factor. It allows user to 
translate longitude and latitude to long. For example, if the data to load is 
longitude = 13.123456, latitude = 101.12356. User can configure conversion 
ratio sub-property value as 1000000, and change data to load as longitude = 
13123456 and latitude = 10112356. Operations on long is much faster compared to 
floating-point numbers.|
-| INDEX_HANDLER.xxx.class | Optional user custom implementation class. Value 
is fully qualified class name.|
+| SPATIAL_INDEX | Used to configure Spatial Index name. This name is appended 
to `SPATIAL_INDEX` in the subsequent sub-property configurations. `xxx` in the 
below sub-properties refer to index name.|
+| SPATIAL_INDEX.xxx.type | Type of algorithm for processing spatial data. 
Currently, supports only 'geohash'.|
+| SPATIAL_INDEX.xxx.sourcecolumns | longitude and latitude column names as in 
the table. These columns are used to generate index value for each row.|
+| SPATIAL_INDEX.xxx.gridSize | Grid size of raster data in metres. Currently, 
spatial index supports raster data.|
+| SPATIAL_INDEX.xxx.minLongitude | Minimum longitude of the gridded 
rectangular area.|
+| SPATIAL_INDEX.xxx.maxLongitude | Maximum longitude of the gridded 
rectangular area.|
+| SPATIAL_INDEX.xxx.minLatitude | Minimum latitude of the gridded rectangular 
area.|
+| SPATIAL_INDEX.xxx.maxLatitude | Maximum latitude of the gridded rectangular 
area.|
+| SPATIAL_INDEX.xxx.conversionRatio | Conversion factor. It allows user to 
translate longitude and latitude to long. For example, if the data to load is 
longitude = 13.123456, latitude = 101.12356. User can configure conversion 
ratio sub-property value as 1000000, and change data to load as longitude = 
13123456 and latitude = 10112356. Operations on long is much faster compared to 
floating-point numbers.|
+| SPATIAL_INDEX.xxx.class | Optional user custom implementation class. Value 
is fully qualified class name.|
 
 
 ### Select Query
diff --git a/format/src/main/thrift/schema.thrift 
b/format/src/main/thrift/schema.thrift
index 9e7f248..b7fabef 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -135,9 +135,9 @@ struct ColumnSchema{
        17: optional list<ParentColumnTableRelation> parentColumnTableRelations;
 
   /**
-   * To specify if it is an index column. Its Default value is false
+   * To specify if it is a spatial index column. Its Default value is false
         */
-       18: optional bool indexColumn;
+       18: optional bool spatialColumn;
 }
 
 /**
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
new file mode 100644
index 0000000..d9641cf
--- /dev/null
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo;
+
+/**
+ * Geo Constants
+ */
+public class GeoConstants {
+  private GeoConstants() {
+  }
+
+  // GeoHash type Spatial Index
+  public static final String GEOHASH = "geohash";
+}
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java 
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
similarity index 88%
rename from geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java
rename to geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
index a2fa7ce..b361e03 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashImpl.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
@@ -31,18 +31,18 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /**
- * GeoHash custom implementation.
+ * GeoHash Type Spatial Index Custom Implementation.
  * This class extends {@link CustomIndex}. It provides methods to
- * 1. Extracts the sub-properties of geohash type index handler such as type, 
source columns,
+ * 1. Extracts the sub-properties of geohash type spatial index such as type, 
source columns,
  * grid size, origin, min and max longitude and latitude of data. Validates 
and stores them in
  * instance.
  * 2. Generates column value from the longitude and latitude column values.
  * 3. Query processor to handle the custom UDF filter queries based on 
longitude and latitude
  * columns.
  */
-public class GeoHashImpl extends CustomIndex<List<Long[]>> {
+public class GeoHashIndex extends CustomIndex<List<Long[]>> {
   private static final Logger LOGGER =
-      LogServiceFactory.getLogService(GeoHashImpl.class.getName());
+      LogServiceFactory.getLogService(GeoHashIndex.class.getName());
 
   // conversion factor of angle to radian
   private static final double CONVERT_FACTOR = 180.0;
@@ -86,54 +86,54 @@ public class GeoHashImpl extends CustomIndex<List<Long[]>> {
 
 
   /**
-   * Initialize the geohash index handler instance.
+   * Initialize the geohash spatial index instance.
    * the properties is like that:
-   * TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
-   * 'INDEX_HANDLER.mygeohash.type'='geohash',
-   * 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
-   * 'INDEX_HANDLER.mygeohash.gridSize'=''
-   * 'INDEX_HANDLER.mygeohash.minLongitude'=''
-   * 'INDEX_HANDLER.mygeohash.maxLongitude'=''
-   * 'INDEX_HANDLER.mygeohash.minLatitude'=''
-   * 'INDEX_HANDLER.mygeohash.maxLatitude'=''
-   * 'INDEX_HANDLER.mygeohash.orilatitude''')
-   * @param handlerName the class name of generating algorithm
+   * TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+   * 'SPATIAL_INDEX.mygeohash.type'='geohash',
+   * 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+   * 'SPATIAL_INDEX.mygeohash.gridSize'=''
+   * 'SPATIAL_INDEX.mygeohash.minLongitude'=''
+   * 'SPATIAL_INDEX.mygeohash.maxLongitude'=''
+   * 'SPATIAL_INDEX.mygeohash.minLatitude'=''
+   * 'SPATIAL_INDEX.mygeohash.maxLatitude'=''
+   * 'SPATIAL_INDEX.mygeohash.orilatitude''')
+   * @param indexName index name. Implicitly a column is created with index 
name.
    * @param properties input properties,please check the describe
    * @throws Exception
    */
   @Override
-  public void init(String handlerName, Map<String, String> properties) throws 
Exception {
-    String options = properties.get(CarbonCommonConstants.INDEX_HANDLER);
+  public void init(String indexName, Map<String, String> properties) throws 
Exception {
+    String options = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     if (StringUtils.isEmpty(options)) {
       throw new MalformedCarbonCommandException(
-              String.format("%s property is invalid.", 
CarbonCommonConstants.INDEX_HANDLER));
+              String.format("%s property is invalid.", 
CarbonCommonConstants.SPATIAL_INDEX));
     }
     options = options.toLowerCase();
-    if (!options.contains(handlerName.toLowerCase())) {
+    if (!options.contains(indexName.toLowerCase())) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. %s is not present.",
-                      CarbonCommonConstants.INDEX_HANDLER, handlerName));
+                      CarbonCommonConstants.SPATIAL_INDEX, indexName));
     }
-    String commonKey = CarbonCommonConstants.INDEX_HANDLER + 
CarbonCommonConstants.POINT +
-            handlerName + CarbonCommonConstants.POINT;
+    String commonKey = CarbonCommonConstants.SPATIAL_INDEX + 
CarbonCommonConstants.POINT + indexName
+        + CarbonCommonConstants.POINT;
     String TYPE = commonKey + "type";
     String type = properties.get(TYPE);
-    if (!CarbonCommonConstants.GEOHASH.equalsIgnoreCase(type)) {
+    if (!GeoConstants.GEOHASH.equalsIgnoreCase(type)) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. %s property must be %s 
for this class.",
-                      CarbonCommonConstants.INDEX_HANDLER, TYPE, 
CarbonCommonConstants.GEOHASH));
+                      CarbonCommonConstants.SPATIAL_INDEX, TYPE, 
GeoConstants.GEOHASH));
     }
     String SOURCE_COLUMNS = commonKey + "sourcecolumns";
     String sourceColumnsOption = properties.get(SOURCE_COLUMNS);
     if (StringUtils.isEmpty(sourceColumnsOption)) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. Must specify %s 
property.",
-                      CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+                      CarbonCommonConstants.SPATIAL_INDEX, SOURCE_COLUMNS));
     }
     if (sourceColumnsOption.split(",").length != 2) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. %s property must have 2 
columns.",
-                      CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+                      CarbonCommonConstants.SPATIAL_INDEX, SOURCE_COLUMNS));
     }
     String SOURCE_COLUMN_TYPES = commonKey + "sourcecolumntypes";
     String sourceDataTypes = properties.get(SOURCE_COLUMN_TYPES);
@@ -142,7 +142,7 @@ public class GeoHashImpl extends CustomIndex<List<Long[]>> {
       if (!"bigint".equalsIgnoreCase(srcdataType)) {
         throw new MalformedCarbonCommandException(
                 String.format("%s property is invalid. %s datatypes must be 
long.",
-                        CarbonCommonConstants.INDEX_HANDLER, SOURCE_COLUMNS));
+                        CarbonCommonConstants.SPATIAL_INDEX, SOURCE_COLUMNS));
       }
     }
     // Set the generated column data type as long
@@ -153,7 +153,7 @@ public class GeoHashImpl extends CustomIndex<List<Long[]>> {
     if (StringUtils.isEmpty(originLatitude)) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. Must specify %s 
property.",
-                      CarbonCommonConstants.INDEX_HANDLER, ORIGIN_LATITUDE));
+                      CarbonCommonConstants.SPATIAL_INDEX, ORIGIN_LATITUDE));
     }
     String MIN_LONGITUDE = commonKey + "minlongitude";
     String MAX_LONGITUDE = commonKey + "maxlongitude";
@@ -166,36 +166,36 @@ public class GeoHashImpl extends 
CustomIndex<List<Long[]>> {
     if (StringUtils.isEmpty(minLongitude)) {
       throw new MalformedCarbonCommandException(
           String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.INDEX_HANDLER, MIN_LONGITUDE));
+              CarbonCommonConstants.SPATIAL_INDEX, MIN_LONGITUDE));
     }
     if (StringUtils.isEmpty(minLatitude)) {
       throw new MalformedCarbonCommandException(
           String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.INDEX_HANDLER, MIN_LATITUDE));
+              CarbonCommonConstants.SPATIAL_INDEX, MIN_LATITUDE));
     }
     if (StringUtils.isEmpty(maxLongitude)) {
       throw new MalformedCarbonCommandException(
           String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.INDEX_HANDLER, MAX_LONGITUDE));
+              CarbonCommonConstants.SPATIAL_INDEX, MAX_LONGITUDE));
     }
     if (StringUtils.isEmpty(maxLatitude)) {
       throw new MalformedCarbonCommandException(
           String.format("%s property is invalid. Must specify %s property.",
-              CarbonCommonConstants.INDEX_HANDLER, MAX_LATITUDE));
+              CarbonCommonConstants.SPATIAL_INDEX, MAX_LATITUDE));
     }
     String GRID_SIZE = commonKey + "gridsize";
     String gridSize = properties.get(GRID_SIZE);
     if (StringUtils.isEmpty(gridSize)) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. %s property must be 
specified.",
-                      CarbonCommonConstants.INDEX_HANDLER, GRID_SIZE));
+                      CarbonCommonConstants.SPATIAL_INDEX, GRID_SIZE));
     }
     String CONVERSION_RATIO = commonKey + "conversionratio";
     String conversionRatio = properties.get(CONVERSION_RATIO);
     if (StringUtils.isEmpty(conversionRatio)) {
       throw new MalformedCarbonCommandException(
               String.format("%s property is invalid. %s property must be 
specified.",
-                      CarbonCommonConstants.INDEX_HANDLER, CONVERSION_RATIO));
+                      CarbonCommonConstants.SPATIAL_INDEX, CONVERSION_RATIO));
     }
 
     // Fill the values to the instance fields
diff --git a/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java 
b/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
index fdd5c8f..86dfea6 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/QuadTreeCls.java
@@ -402,7 +402,7 @@ class GridData {
  */
 class QuadNode {
   private static final Logger LOGGER =
-      LogServiceFactory.getLogService(GeoHashImpl.class.getName());
+      LogServiceFactory.getLogService(QuadNode.class.getName());
   // The range Z order of region hashid represented by quadtree is a 
continuous range
   private QuadRect rect;
   // Grid data, actually representing hashid
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index 5f27b9a..ee9971a 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -43,15 +43,15 @@ import org.apache.carbondata.core.util.CustomIndex;
 @InterfaceAudience.Internal
 public class PolygonExpression extends UnknownExpression implements 
ConditionalExpression {
   private String polygon;
-  private CustomIndex<List<Long[]>> handler;
+  private CustomIndex<List<Long[]>> instance;
   private List<Long[]> ranges = new ArrayList<Long[]>();
   private ColumnExpression column;
   private ExpressionResult trueExpRes;
   private ExpressionResult falseExpRes;
 
-  public PolygonExpression(String polygon, String columnName, CustomIndex 
handler) {
+  public PolygonExpression(String polygon, String columnName, CustomIndex 
indexInstance) {
     this.polygon = polygon;
-    this.handler = handler;
+    this.instance = indexInstance;
     this.column = new ColumnExpression(columnName, DataTypes.LONG);
     this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
     this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
@@ -61,7 +61,7 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
     // Validate the ranges
     for (Long[] range : ranges) {
       if (range.length != 2) {
-        throw new RuntimeException("Handler query must return list of ranges 
with each range "
+        throw new RuntimeException("Query processor must return list of ranges 
with each range "
             + "containing minimum and maximum values");
       }
     }
@@ -72,7 +72,7 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
    */
   private void processExpression() {
     try {
-      ranges = handler.query(polygon);
+      ranges = instance.query(polygon);
       validate(ranges);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -138,13 +138,13 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
 
   private void writeObject(ObjectOutputStream out) throws IOException {
     out.writeObject(polygon);
-    out.writeObject(handler);
+    out.writeObject(instance);
     out.writeObject(column);
   }
 
   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
     polygon = (String) in.readObject();
-    handler = (CustomIndex<List<Long[]>>) in.readObject();
+    instance = (CustomIndex<List<Long[]>>) in.readObject();
     column = (ColumnExpression) in.readObject();
     ranges = new ArrayList<Long[]>();
     trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java 
b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
index 4dc1277..0fce43a 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -111,7 +111,7 @@ public class Util {
     List<ColumnSchema> columns = 
table.getTableInfo().getFactTable().getListOfColumns();
     List<ColumnSchema> validColumnSchema = new ArrayList<>();
     for (ColumnSchema column : columns) {
-      if (!column.isInvisible() && !column.isIndexColumn() && 
!column.isComplexColumn()) {
+      if (!column.isInvisible() && !column.isSpatialColumn() && 
!column.isComplexColumn()) {
         validColumnSchema.add(column);
       }
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala
index 7c9edd0..2032b8e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtils.scala
@@ -23,27 +23,28 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CustomIndex
+import org.apache.carbondata.geo.GeoConstants
 
 object GeoUtils {
   def getGeoHashHandler(tableProperties: mutable.Map[String, String])
                         : (String, CustomIndex[_]) = {
-    val indexProperty = 
tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val indexProperty = 
tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProperty.isEmpty || indexProperty.get.trim.isEmpty) {
       CarbonException.analysisException(
-        s"Table do not have ${CarbonCommonConstants.INDEX_HANDLER} property " +
-        s"with ${CarbonCommonConstants.GEOHASH} type handler")
+        s"Table do not have ${CarbonCommonConstants.SPATIAL_INDEX} property " +
+        s"with ${GeoConstants.GEOHASH} type")
     }
-    val handler = indexProperty.get.split(",").map(_.trim).filter(handler =>
-      CarbonCommonConstants.GEOHASH.equalsIgnoreCase(
-        
tableProperties.getOrElse(s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.type",
 "")))
-      .map(handler => (handler,
-        
tableProperties.get(s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.instance")))
+    val handler = indexProperty.get.split(",").map(_.trim).filter(indexName =>
+      GeoConstants.GEOHASH.equalsIgnoreCase(
+        
tableProperties.getOrElse(s"${CarbonCommonConstants.SPATIAL_INDEX}.$indexName.type",
 "")))
+      .map(indexName => (indexName,
+        
tableProperties.get(s"${CarbonCommonConstants.SPATIAL_INDEX}.$indexName.instance")))
     if (handler.isEmpty || handler.length != 1 || handler(0)._1.isEmpty
       || handler(0)._2.isEmpty) {
       CarbonException.analysisException(
-        s"Table do not have ${CarbonCommonConstants.INDEX_HANDLER} property " +
-        s"with ${CarbonCommonConstants.GEOHASH} type handler")
+        s"Table do not have ${CarbonCommonConstants.SPATIAL_INDEX} property " +
+        s"with ${GeoConstants.GEOHASH} type index")
     }
-    (handler(0) _1, CustomIndex.getCustomInstance(handler(0)._2.get))
+    (handler(0)._1, CustomIndex.getCustomInstance(handler(0)._2.get))
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f013e3a..40b8ad0 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -211,10 +211,10 @@ object DataLoadProcessorStepOnSpark {
         row = rowConverter.convert(row)
         if (row != null) {
           // In case of partition, after Input processor and converter steps, 
all the rows are given
-          // to hive to create partition folders. As hive is unaware of 
non-schema index columns,
+          // to hive to create partition folders. As hive is unaware of 
non-schema columns,
           // should discard those columns from rows and return.
           val schemaColumnValues = row.getData.zipWithIndex.collect {
-            case (data, index) if 
!conf.getDataFields()(index).getColumn.isIndexColumn =>
+            case (data, index) if 
!conf.getDataFields()(index).getColumn.isSpatialColumn =>
               data
           }
           row.setData(schemaColumnValues)
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cf1c07c..ad8d982 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -375,13 +375,13 @@ object CarbonDataRDDFactory {
               .getFactTable
               .getListOfColumns
               .asScala
-              .filterNot(col => col.isInvisible || col.isIndexColumn || 
col.isComplexColumn)
+              .filterNot(col => col.isInvisible || col.isSpatialColumn || 
col.isComplexColumn)
             val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
               colSchema,
               scanResultRdd.get,
               isGlobalSortPartition = false)
             if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) &&
-                !carbonLoadModel.isIndexColumnsPresent) {
+                !carbonLoadModel.isNonSchemaColumnsPresent) {
               
DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
                 .sparkSession,
                 convertedRdd,
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 2cb2e92..0433d37 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -707,40 +707,39 @@ object CarbonScalaUtil {
   }
 
   /**
-   * The method adds the index handler to sort columns if it is not already 
present as sort column
-   * @param handler Index handler name
-   * @param sourceColumns Source columns of index handler
+   * The method inserts the column to sort columns
+   * @param column Name of the column to be inserted to sort columns
+   * @param insertBefore Columns before which given column should be inserted
    * @param tableProperties Table properties
    */
-  def addIndexHandlerToSortColumns(handler: String, sourceColumns: 
Array[String],
+  def insertColumnToSortColumns(column: String, insertBefore: Array[String],
       tableProperties: mutable.Map[String, String]): Unit = {
-    // Add handler into sort columns
+    // Insert the column into sort columns
     val sortKey = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
-    var sortColumnsString = handler
-    // If sort columns are not configured, simply use handler as a sort column.
+    var sortColumnsString = column
+    // If sort columns are not configured, simply use column as a sort column.
     if (sortKey.isDefined && !sortKey.get.isEmpty) {
       sortColumnsString = sortKey.get
       val sortColumns = sortColumnsString.split(",").map(_.trim)
-      // If sort columns already contains handler, use sort columns as is.
-      if (!sortColumns.contains(handler)) {
-        // If sort columns do not contain handler as one of the sort column 
then check if
-        // any of handler's source columns are present as sort columns. If so, 
insert handler
-        // into sort columns such that it is just before its source columns. 
Thus, sorting of
-        // data happens w.r.t handler before any of its source columns.
-        val sourceIndex = new Array[Int](sourceColumns.length)
-        sourceColumns.zipWithIndex.foreach {
-          case (source, index) => sourceIndex(index) = 
sortColumns.indexWhere(_.equals(source))
+      // If sort columns already contains column, use sort columns as is.
+      if (!sortColumns.contains(column)) {
+        // If sort columns do not contain this column as one of the sort 
column then check if
+        // any of the insertBefore columns are present as sort columns. If so, 
insert this column
+        // into sort columns such that it is just before them. Thus, sorting 
of data happens w.r.t
+        // this column before any of the insertBefore columns.
+        val columnsIndex = new Array[Int](insertBefore.length)
+        insertBefore.zipWithIndex.foreach {
+          case (colName, index) => columnsIndex(index) = 
sortColumns.indexWhere(_.equals(colName))
         }
-        val posIdx = sourceIndex.filter(_ >= 0)
+        val posIdx = columnsIndex.filter(_ >= 0)
         if (posIdx.nonEmpty) {
-          // Found index of first source column in the sort columns. Insert 
handler just
-          // before it.
-          sortColumnsString = (sortColumns.slice(0, posIdx.min) ++ 
Array(handler) ++
+          // Found index of first column in the sort columns. Insert this 
column just before it
+          sortColumnsString = (sortColumns.slice(0, posIdx.min) ++ 
Array(column) ++
                                sortColumns.slice(posIdx.min, 
sortColumns.length)).mkString(",")
         } else {
-          // None of the source columns of handler are not present as sort 
columns. Just append
-          // handler to existing sort columns.
-          sortColumnsString = sortColumnsString + s",$handler"
+          // None of the insertBefore column are not present as sort columns. 
Just append
+          // column to existing sort columns.
+          sortColumnsString = sortColumnsString + s",$column"
         }
       }
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 24cc323..b30f104 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -74,7 +74,7 @@ object CarbonSparkUtil {
     val columnSchemas: mutable.Buffer[ColumnSchema] = 
carbonTable.getTableInfo.getFactTable.
       getListOfColumns.asScala
       .filter(cSchema => !cSchema.isInvisible && cSchema.getSchemaOrdinal != 
-1 &&
-                         !cSchema.isIndexColumn).sortWith(_.getSchemaOrdinal < 
_.getSchemaOrdinal)
+                         !cSchema.isSpatialColumn).sortWith(_.getSchemaOrdinal 
< _.getSchemaOrdinal)
     val columnList = columnSchemas.toList.asJava
     carbonRelation.dimensionsAttr.foreach(attr => {
       val carbonColumn = carbonTable.getColumnByName(attr.name)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 6b92ffa..135a143 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -40,6 +40,7 @@ import 
org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CustomIndex}
+import org.apache.carbondata.geo.GeoConstants
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
DataTypeConverterUtil}
 
@@ -90,45 +91,45 @@ object CarbonParserUtil {
   }
 
   /**
-   * The method parses, validates and processes the index_handler property.
+   * The method parses, validates and processes the spatial_index property.
    * @param tableProperties Table properties
    * @param tableFields Sequence of table fields
    * @return <Seq[Field]> Sequence of index fields to add to table fields
    */
-  private def processIndexProperty(tableProperties: mutable.Map[String, 
String],
+  private def processSpatialIndexProperty(tableProperties: mutable.Map[String, 
String],
       tableFields: Seq[Field]): Seq[Field] = {
-    val option = tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val option = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
     val fields = ListBuffer[Field]()
     if (option.isDefined) {
       if (option.get.trim.isEmpty) {
         throw new MalformedCarbonCommandException(
-          s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
+          s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
           s"Option value is empty.")
       }
-      option.get.split(",").map(_.trim).foreach { handler =>
-        // Validate target column name
-        if (tableFields.exists(_.column.equalsIgnoreCase(handler))) {
+      option.get.split(",").map(_.trim).foreach { indexName =>
+        // Validate index column name
+        if (tableFields.exists(_.column.equalsIgnoreCase(indexName))) {
           throw new MalformedCarbonCommandException(
-            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
-            s"handler: $handler must not match with any other column name in 
the table")
+            s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
+            s"index: $indexName must not match with any other column name in 
the table")
         }
-        val TYPE = s"${ CarbonCommonConstants.INDEX_HANDLER }.$handler.type"
-        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$handler.sourcecolumns"
+        val TYPE = s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.type"
+        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$indexName.sourcecolumns"
         val SOURCE_COLUMN_TYPES
-        = s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$handler.sourcecolumntypes"
-        val HANDLER_CLASS = s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$handler.class"
-        val HANDLER_INSTANCE = s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$handler.instance"
+        = s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$indexName.sourcecolumntypes"
+        val SPATIAL_INDEX_CLASS = s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$indexName.class"
+        val SPATIAL_INDEX_INSTANCE = s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$indexName.instance"
 
-        val handlerType = tableProperties.get(TYPE)
-        if (handlerType.isEmpty || handlerType.get.trim.isEmpty) {
+        val spatialIndexType = tableProperties.get(TYPE)
+        if (spatialIndexType.isEmpty || spatialIndexType.get.trim.isEmpty) {
           throw new MalformedCarbonCommandException(
-            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
+            s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
             s"$TYPE property must be specified.")
         }
         val sourceColumnsOption = tableProperties.get(SOURCE_COLUMNS)
         if (sourceColumnsOption.isEmpty || 
sourceColumnsOption.get.trim.isEmpty) {
           throw new MalformedCarbonCommandException(
-            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
+            s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
             s"$SOURCE_COLUMNS property must be specified.")
         }
         val sourcesWithoutSpaces = sourceColumnsOption.get.replaceAll("\\s", 
"")
@@ -136,7 +137,7 @@ object CarbonParserUtil {
         val sources = sourcesWithoutSpaces.split(",")
         if (sources.distinct.length != sources.size) {
           throw new MalformedCarbonCommandException(
-            s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
+            s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
             s"$SOURCE_COLUMNS property cannot have duplicate columns.")
         }
         val sourceTypes = StringBuilder.newBuilder
@@ -145,44 +146,44 @@ object CarbonParserUtil {
             case Some(field) => 
sourceTypes.append(field.dataType.get).append(",")
             case None =>
               throw new MalformedCarbonCommandException(
-                s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
+                s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
                 s"Source column: $column in property " +
                 s"$SOURCE_COLUMNS must be a column in the table.")
           }
         }
         tableProperties.put(SOURCE_COLUMNS, sourcesWithoutSpaces)
         tableProperties.put(SOURCE_COLUMN_TYPES, 
sourceTypes.dropRight(1).toString())
-        val handlerClass = tableProperties.get(HANDLER_CLASS)
-        val handlerClassName: String = handlerClass match {
+        val spatialIndexClass = tableProperties.get(SPATIAL_INDEX_CLASS)
+        val spatialIndexClassName: String = spatialIndexClass match {
           case Some(className) => className.trim
           case None =>
-            // use handler type to find the default implementation
-            if 
(handlerType.get.trim.equalsIgnoreCase(CarbonCommonConstants.GEOHASH)) {
+            // use spatial index type to find the default implementation
+            if 
(spatialIndexType.get.trim.equalsIgnoreCase(GeoConstants.GEOHASH)) {
               // Use GeoHash default implementation
-              val className = CustomIndex.CUSTOM_INDEX_DEFAULT_IMPL
-              tableProperties.put(HANDLER_CLASS, className)
+              val className = "org.apache.carbondata.geo.GeoHashIndex"
+              tableProperties.put(SPATIAL_INDEX_CLASS, className)
               className
             } else {
               throw new MalformedCarbonCommandException(
-                s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } property is 
invalid. " +
-                s"Unsupported value: ${ handlerType.get } specified for 
property $TYPE.")
+                s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property is 
invalid. " +
+                s"Unsupported value: ${ spatialIndexType.get } specified for 
property $TYPE.")
             }
         }
         try {
-          val handlerClass: Class[_] = 
java.lang.Class.forName(handlerClassName)
-          val instance = 
handlerClass.newInstance().asInstanceOf[CustomIndex[_]]
-          instance.init(handler, tableProperties.asJava)
-          tableProperties.put(HANDLER_INSTANCE, 
CustomIndex.getCustomInstance(instance))
+          val spatialIndexClass : Class[_] = 
java.lang.Class.forName(spatialIndexClassName)
+          val instance = 
spatialIndexClass.newInstance().asInstanceOf[CustomIndex[_]]
+          instance.init(indexName, tableProperties.asJava)
+          tableProperties.put(SPATIAL_INDEX_INSTANCE, 
CustomIndex.getCustomInstance(instance))
         } catch {
           case ex@(_: ClassNotFoundException | _: InstantiationError | _: 
IllegalAccessException |
                    _: ClassCastException) =>
-            val err = s"Carbon ${ CarbonCommonConstants.INDEX_HANDLER } 
property process failed. "
+            val err = s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } 
property process failed. "
             LOGGER.error(err, ex)
             throw new MalformedCarbonCommandException(err, ex)
         }
-        // Add index handler as a sort column if it is not already present in 
it.
-        CarbonScalaUtil.addIndexHandlerToSortColumns(handler, sources, 
tableProperties)
-        fields += Field(handler, Some("BigInt"), Some(handler), Some(null), 
index = true)
+        // Insert spatial column as a sort column if it is not already present 
in it.
+        CarbonScalaUtil.insertColumnToSortColumns(indexName, sources, 
tableProperties)
+        fields += Field(indexName, Some("BigInt"), Some(indexName), 
Some(null), spatialIndex = true)
       }
     }
     fields
@@ -210,8 +211,8 @@ object CarbonParserUtil {
       isAlterFlow: Boolean = false,
       tableComment: Option[String] = None): TableModel = {
 
-    // Process index handler property
-    val indexFields = processIndexProperty(tableProperties, fields)
+    // Process spatial index property
+    val indexFields = processSpatialIndexProperty(tableProperties, fields)
     val allFields = fields ++ indexFields
 
     // do not allow below key words as column name
@@ -336,7 +337,7 @@ object CarbonParserUtil {
 
     if 
(tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       // validate the column_meta_cache option
-      val tableColumns = dims.view.filterNot(_.index).map(x => x.name.get) ++
+      val tableColumns = dims.view.filterNot(_.spatialIndex).map(x => 
x.name.get) ++
                          msrs.map(x => x.name.get)
       CommonUtil.validateColumnMetaCacheFields(
         dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
@@ -1102,7 +1103,7 @@ object CarbonParserUtil {
           field.precision, field.scale, field.rawSchema, field.columnComment)
       case "bigint" => Field(field.column, Some("BigInt"), field.name, 
Some(null), field.parent,
         field.storeType, field.schemaOrdinal, field.precision, field.scale, 
field.rawSchema,
-        field.columnComment, field.index)
+        field.columnComment, field.spatialIndex)
       case "decimal" => Field(field.column, Some("Decimal"), field.name, 
Some(null), field.parent,
         field.storeType, field.schemaOrdinal, field.precision, field.scale, 
field.rawSchema,
         field.columnComment)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 2ede202..751a6c3 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -69,7 +69,7 @@ case class Field(column: String, var dataType: 
Option[String], name: Option[Stri
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
     var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "",
-    var columnComment: String = "", var index: Boolean = false) {
+    var columnComment: String = "", var spatialIndex: Boolean = false) {
   override def equals(o: Any) : Boolean = o match {
     case that: Field =>
       that.column.equalsIgnoreCase(this.column)
@@ -648,7 +648,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
-    columnSchema.setIndexColumn(field.index)
+    columnSchema.setSpatialColumn(field.spatialIndex)
     if (isVarcharColumn(colName)) {
       columnSchema.setDataType(DataTypes.VARCHAR)
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 5fb19ae..8dfad76 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -496,8 +496,8 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
     }
     columnSchema.foreach {
       col =>
-        if (col.isIndexColumn) {
-          carbonLoadModel.setIndexColumnsPresent(true)
+        if (col.isSpatialColumn) {
+          carbonLoadModel.setNonSchemaColumnsPresent(true)
         } else {
           var skipPartitionColumn = false
           if (partitionColumnNames != null &&
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 6f16f30..ce699d6 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -848,7 +848,7 @@ object CommonLoadUtils {
       // input data from csv files. Convert to logical plan
       val allCols = new ArrayBuffer[String]()
       // get only the visible dimensions from table
-      allCols ++= 
table.getVisibleDimensions.asScala.filterNot(_.isIndexColumn).map(_.getColName)
+      allCols ++= 
table.getVisibleDimensions.asScala.filterNot(_.isSpatialColumn).map(_.getColName)
       allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
       StructType(
         
allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index f8df2a8..75c21ae 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -64,7 +64,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       }
       val alterColumns =
         (alterTableAddColumnsModel.dimCols ++ 
alterTableAddColumnsModel.msrCols).map(_.column)
-      AlterTableUtil.validateForIndexHandlerName(carbonTable, alterColumns)
+      AlterTableUtil.validateSpatialIndexColumn(carbonTable, alterColumns)
       val operationContext = new OperationContext
       val alterTableAddColumnListener = 
AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index acfd386..ae80a12 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -127,8 +127,8 @@ private[sql] case class 
CarbonAlterTableColRenameDataTypeChangeCommand(
         throw new MalformedCarbonCommandException(
           "alter table column rename is not supported for index indexSchema")
       }
-      // Do not allow index handler's source columns to be changed.
-      AlterTableUtil.validateForIndexHandlerSources(carbonTable,
+      // Do not allow spatial index source columns to be changed.
+      AlterTableUtil.validateSpatialIndexSources(carbonTable,
         List(alterTableColRenameAndDataTypeChangeModel.columnName))
       val operationContext = new OperationContext
       operationContext.setProperty("childTableColumnRename", 
childTableColumnRename)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 004f53a..fd7c147 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -62,8 +62,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         throw new MalformedCarbonCommandException(
           "alter table drop column is not supported for index indexSchema")
       }
-      // Do not allow index handler's source columns to be dropped.
-      AlterTableUtil.validateForIndexHandlerSources(carbonTable, 
alterTableDropColumnModel.columns)
+      // Do not allow spatial index source columns to be dropped.
+      AlterTableUtil.validateSpatialIndexSources(carbonTable, 
alterTableDropColumnModel.columns)
       val partitionInfo = carbonTable.getPartitionInfo()
       val tableColumns = carbonTable.getCreateOrderColumn().asScala
       if (partitionInfo != null) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 85b6f72..9969942 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -56,24 +56,24 @@ private[sql] case class CarbonDescribeFormattedCommand(
 
     val carbonTable = relation.carbonTable
     val tblProps = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
-    // Append index handler columns
-    val indexes = tblProps.get(CarbonCommonConstants.INDEX_HANDLER)
+    // Append spatial index columns
+    val indexes = tblProps.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexes.isDefined) {
       results ++= Seq(
         ("", "", ""),
-        ("## Custom Index Information", "", "")
+        ("## Spatial Index Information", "", "")
       )
       val indexList = indexes.get.split(",").map(_.trim)
       indexList.zip(Stream from 1).foreach {
         case(index, count) =>
           results ++= Seq(
-            ("Type", tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$index.type"), ""),
-            ("Class", tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$index.class"), ""),
+            ("Type", tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.type"), ""),
+            ("Class", tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.class"), ""),
             ("Column Name", index, ""),
             ("Column Data Type",
-              tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$index.datatype"), ""),
+              tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.datatype"), ""),
             ("Sources Columns",
-              tblProps(s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$index.sourcecolumns"), ""))
+              tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$index.sourcecolumns"), ""))
           if (indexList.length != count) {
             results ++= Seq(("", "", ""))
           }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 0c8ab14..f1f0b80 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -104,18 +104,18 @@ with Serializable {
       
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
         carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-    // If index handler property is configured, set flag to indicate index 
columns are present.
+    // If spatial index property is configured, set flag to indicate spatial 
columns are present.
     // So that InputProcessorStepWithNoConverterImpl can generate the values 
for those columns,
     // convert them and then apply sort/write steps.
-    val handler =
-    
table.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
-    if (handler != null) {
+    val spatialIndex =
+    
table.getTableInfo.getFactTable.getTableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    if (spatialIndex != null) {
       val sortScope = optionsFinal.get("sort_scope")
       if 
(sortScope.equalsIgnoreCase(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) {
-        // Index handler non-schema column must be sorted
+        // Spatial Index non-schema column must be sorted
         optionsFinal.put("sort_scope", "LOCAL_SORT")
       }
-      model.setIndexColumnsPresent(true)
+      model.setNonSchemaColumnsPresent(true)
     }
     optionsFinal
       .put("bad_record_path", 
CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index a8e9344..02dc50f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -49,7 +49,10 @@ case class CarbonRelation(
   }
 
   val dimensionsAttr: Seq[AttributeReference] = {
-    val sett = new 
LinkedHashSet(carbonTable.getVisibleDimensions.asScala.filterNot(_.isIndexColumn)
+    val sett = new LinkedHashSet(carbonTable
+      .getVisibleDimensions
+      .asScala
+      .filterNot(_.isSpatialColumn)
       .asJava)
     sett.asScala.toSeq.map(dim => {
       val dimval = carbonTable.getDimensionByName(dim.getColName)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c7c74de..4929a4c 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -145,8 +145,8 @@ object CarbonFilters {
         case TextMatchLimit(queryString, maxDoc) =>
           Some(new MatchExpression(queryString, 
Try(maxDoc.toInt).getOrElse(Integer.MAX_VALUE)))
         case InPolygon(queryString) =>
-          val (columnName, handler) = 
GeoUtils.getGeoHashHandler(tableProperties)
-          Some(new CarbonPolygonExpression(queryString, columnName, handler))
+          val (columnName, instance) = 
GeoUtils.getGeoHashHandler(tableProperties)
+          Some(new CarbonPolygonExpression(queryString, columnName, instance))
         case _ => None
       }
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 3106a38..a77ab19 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -623,7 +623,7 @@ object AlterTableUtil {
     if (propertiesMap.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
       val schemaList: util.List[ColumnSchema] = CarbonUtil
         .getColumnSchemaList(carbonTable.getVisibleDimensions.asScala
-          .filterNot(_.getColumnSchema.isIndexColumn).asJava, 
carbonTable.getVisibleMeasures)
+          .filterNot(_.getColumnSchema.isSpatialColumn).asJava, 
carbonTable.getVisibleMeasures)
       val tableColumns: Seq[String] = schemaList.asScala
         .map(columnSchema => columnSchema.getColumnName)
       CommonUtil
@@ -689,13 +689,13 @@ object AlterTableUtil {
                                                ): Unit = {
     CommonUtil.validateSortScope(propertiesMap)
     CommonUtil.validateSortColumns(carbonTable, propertiesMap)
-    val indexProp = tblPropertiesMap.get(CarbonCommonConstants.INDEX_HANDLER)
+    val indexProp = tblPropertiesMap.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProp.isDefined) {
-      indexProp.get.split(",").map(_.trim).foreach { handler =>
-        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.INDEX_HANDLER 
}.$handler.sourcecolumns"
+      indexProp.get.split(",").map(_.trim).foreach { indexName =>
+        val SOURCE_COLUMNS = s"${ CarbonCommonConstants.SPATIAL_INDEX 
}.$indexName.sourcecolumns"
         val sourceColumns = 
tblPropertiesMap(SOURCE_COLUMNS).split(",").map(_.trim)
-        // Add index handler as a sort column if it is not already present in 
it.
-        CarbonScalaUtil.addIndexHandlerToSortColumns(handler, sourceColumns, 
propertiesMap)
+        // Add spatial index column as a sort column if it is not already 
present in it.
+        CarbonScalaUtil.insertColumnToSortColumns(indexName, sourceColumns, 
propertiesMap)
       }
     }
     // match SORT_SCOPE and SORT_COLUMNS
@@ -1058,31 +1058,31 @@ object AlterTableUtil {
     }
   }
 
-  def validateForIndexHandlerName(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
-    // Do not allow columns to be added with index handler name
+  def validateSpatialIndexColumn(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
+    // Do not allow columns to be added with spatial index column name
     val properties = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
-    val indexProperty = properties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProperty.isDefined) {
       indexProperty.get.split(",").map(_.trim).foreach(element =>
         if (alterColumns.contains(element)) {
           throw new MalformedCarbonCommandException(s"Column: $element is not 
allowed. " +
-            s"This column is present in ${CarbonCommonConstants.INDEX_HANDLER} 
table property.")
+            s"This column is present in ${CarbonCommonConstants.SPATIAL_INDEX} 
table property.")
         })
       }
   }
 
-  def validateForIndexHandlerSources(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
-    // Do not allow index handler source columns to be altered
+  def validateSpatialIndexSources(carbonTable: CarbonTable, alterColumns: 
Seq[String]): Unit = {
+    // Do not allow spatial index source columns to be altered
     val properties = 
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
-    val indexProperty = properties.get(CarbonCommonConstants.INDEX_HANDLER)
+    val indexProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX)
     if (indexProperty.isDefined) {
       indexProperty.get.split(",").map(_.trim).foreach { element =>
         val srcColumns
-        = properties.get(CarbonCommonConstants.INDEX_HANDLER + 
s".$element.sourcecolumns")
+        = properties.get(CarbonCommonConstants.SPATIAL_INDEX + 
s".$element.sourcecolumns")
         val common = 
alterColumns.intersect(srcColumns.get.split(",").map(_.trim))
         if (common.nonEmpty) {
           throw new MalformedCarbonCommandException(s"Columns present in " +
-            s"${CarbonCommonConstants.INDEX_HANDLER} table property cannot be 
altered.")
+            s"${CarbonCommonConstants.SPATIAL_INDEX} table property cannot be 
altered.")
         }
       }
     }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala 
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 57778a8..0c72e5c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -14,18 +14,18 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
     drop()
   }
 
-  test("Invalid geo index handler property") {
-    // Handler name must not match with table column name.  Fails to create 
table.
+  test("Invalid spatial index property") {
+    // Index name must not match with table column name.  Fails to create 
table.
     var exception = intercept[MalformedCarbonCommandException](sql(
       s"""
          | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude 
LONG)
          | COMMENT "This is a malformed table"
          | STORED AS carbondata
-         | TBLPROPERTIES ('INDEX_HANDLER'='longitude')
+         | TBLPROPERTIES ('SPATIAL_INDEX'='longitude')
       """.stripMargin))
 
     assert(exception.getMessage.contains(
-      "handler: longitude must not match with any other column name in the 
table"))
+      "index: longitude must not match with any other column name in the 
table"))
 
     // Type property is not configured. Fails to create table.
     exception = intercept[MalformedCarbonCommandException](sql(
@@ -33,11 +33,11 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
          | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude 
LONG)
          | COMMENT "This is a malformed table"
          | STORED AS carbondata
-         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash')
+         | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash')
       """.stripMargin))
 
     assert(exception.getMessage.contains(
-      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.type property must be 
specified"))
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.type property must be 
specified"))
 
     // Source columns are not configured. Fails to create table.
     exception = intercept[MalformedCarbonCommandException](sql(
@@ -45,11 +45,11 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
          | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude 
LONG)
          | COMMENT "This is a malformed table"
          | STORED AS carbondata
-         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 
'INDEX_HANDLER.mygeohash.type'='geohash')
+         | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash', 
'SPATIAL_INDEX.mygeohash.type'='geohash')
       """.stripMargin))
 
     assert(exception.getMessage.contains(
-      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns 
property must be " +
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.sourcecolumns 
property must be " +
       s"specified."))
 
     // Source columns must be present in the table. Fails to create table.
@@ -58,20 +58,20 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
          | CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude 
LONG)
          | COMMENT "This is a malformed table"
          | STORED AS carbondata
-         | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 
'INDEX_HANDLER.mygeohash.type'='geohash',
-         | 'INDEX_HANDLER.mygeohash.sourcecolumns'='unknown1, unknown2')
+         | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash', 
'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='unknown1, unknown2')
       """.stripMargin))
 
     assert(exception.getMessage.contains(
       s"Source column: unknown1 in property " +
-      s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns must be 
a column in the " +
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.sourcecolumns must be 
a column in the " +
       s"table."))
   }
 
   test("test geo table create and load and check describe formatted") {
     createTable()
     loadData()
-    // Test if index handler column is added as a sort column
+    // Test if spatial index column is added as a sort column
     val descTable = sql(s"describe formatted $table1").collect
     descTable.find(_.get(0).toString.contains("Sort Scope")) match {
       case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
@@ -141,16 +141,16 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
            | longitude LONG,
            | latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY 
(timevalue BIGINT)
            | STORED AS carbondata
-           | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
-           | 'INDEX_HANDLER.mygeohash.type'='geohash',
-           | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
-           | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
-           | 'INDEX_HANDLER.mygeohash.gridSize'='50',
-           | 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865',
-           | 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233',
-           | 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277',
-           | 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281',
-           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
+           | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+           | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+           | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+           | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+           | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+           | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
     loadData()
     checkAnswer(
@@ -183,16 +183,16 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
            | longitude LONG,
            | latitude LONG) COMMENT "This is a GeoTable"
            | STORED AS carbondata
-           | TBLPROPERTIES ($customProperties 'INDEX_HANDLER'='mygeohash',
-           | 'INDEX_HANDLER.mygeohash.type'='geohash',
-           | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
-           | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
-           | 'INDEX_HANDLER.mygeohash.gridSize'='50',
-           | 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865',
-           | 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233',
-           | 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277',
-           | 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281',
-           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
+           | TBLPROPERTIES ($customProperties 'SPATIAL_INDEX'='mygeohash',
+           | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+           | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+           | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+           | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+           | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 9d30331..b44d722 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -101,11 +101,11 @@ public class CarbonDataLoadConfiguration {
   private DataLoadMetrics metrics;
 
   /**
-   * Whether index columns are present. This flag should be set only when all 
the schema
-   * columns are already converted. Now, just need to generate and convert 
index columns present in
-   * data fields.
+   * Whether non-schema columns are present. This flag should be set only when 
all the schema
+   * columns are already converted. Now, just need to generate and convert 
non-schema columns
+   * present in data fields.
    */
-  private boolean isIndexColumnsPresent;
+  private boolean nonSchemaColumnsPresent;
 
   private boolean skipParsers = false;
 
@@ -386,14 +386,6 @@ public class CarbonDataLoadConfiguration {
     this.metrics = metrics;
   }
 
-  public boolean isIndexColumnsPresent() {
-    return isIndexColumnsPresent;
-  }
-
-  public void setIndexColumnsPresent(boolean indexColumnsPresent) {
-    isIndexColumnsPresent = indexColumnsPresent;
-  }
-
   public String getBucketHashMethod() {
     return bucketHashMethod;
   }
@@ -401,4 +393,12 @@ public class CarbonDataLoadConfiguration {
   public void setBucketHashMethod(String bucketHashMethod) {
     this.bucketHashMethod = bucketHashMethod;
   }
+
+  public boolean isNonSchemaColumnsPresent() {
+    return nonSchemaColumnsPresent;
+  }
+
+  public void setNonSchemaColumnsPresent(boolean nonSchemaColumnsPresent) {
+    this.nonSchemaColumnsPresent = nonSchemaColumnsPresent;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 100f772..7935a5a 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -198,7 +198,7 @@ public final class DataLoadProcessBuilder {
     
configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
-    configuration.setIndexColumnsPresent(loadModel.isIndexColumnsPresent());
+    
configuration.setNonSchemaColumnsPresent(loadModel.isNonSchemaColumnsPresent());
     List<LoadMetadataDetails> loadMetadataDetails = 
loadModel.getLoadMetadataDetails();
     if (loadMetadataDetails != null) {
       for (LoadMetadataDetails detail : loadMetadataDetails) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index b9c2038..5a82e7a 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -72,8 +72,8 @@ public class FieldEncoderFactory {
       boolean isConvertToBinary, String binaryDecoder, 
CarbonDataLoadConfiguration configuration) {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
-      if (dataField.getColumn().isIndexColumn()) {
-        return new IndexFieldConverterImpl(dataField, nullFormat, index, 
isEmptyBadRecord,
+      if (dataField.getColumn().isSpatialColumn()) {
+        return new SpatialIndexFieldConverterImpl(dataField, nullFormat, 
index, isEmptyBadRecord,
             configuration);
       } else if (dataField.getColumn().getDataType() == DataTypes.DATE &&
           !dataField.getColumn().isComplex()) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index e036adf..2b6657c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -90,7 +90,7 @@ public class RowConverterImpl implements RowConverter {
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isIndexColumn()) {
+      if (fields[i].getColumn().isSpatialColumn()) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
@@ -108,9 +108,9 @@ public class RowConverterImpl implements RowConverter {
     logHolder.setLogged(false);
     logHolder.clear();
     for (int i = 0; i < fieldConverters.length; i++) {
-      if (configuration.isIndexColumnsPresent() && 
!fieldConverters[i].getDataField().getColumn()
-          .isIndexColumn()) {
-        // Skip the conversion for schema columns if the conversion is 
required only for index
+      if (configuration.isNonSchemaColumnsPresent() && 
!fieldConverters[i].getDataField()
+          .getColumn().isSpatialColumn()) {
+        // Skip the conversion for schema columns if the conversion is 
required only for non-schema
         // columns
         continue;
       }
@@ -162,7 +162,7 @@ public class RowConverterImpl implements RowConverter {
               (String) configuration.getDataLoadProperty(
                   CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER),
               configuration);
-      if (fields[i].getColumn().isIndexColumn()) {
+      if (fields[i].getColumn().isSpatialColumn()) {
         nonSchemaFieldConverterList.add(fieldConverter);
       } else {
         fieldConverterList.add(fieldConverter);
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
similarity index 91%
rename from 
processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java
rename to 
processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
index 062c251..6f05930 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/IndexFieldConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/SpatialIndexFieldConverterImpl.java
@@ -34,16 +34,16 @@ import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
 import org.apache.log4j.Logger;
 
 /**
- * Converter for Index handler columns
+ * Converter for spatial index columns
  */
-public class IndexFieldConverterImpl extends MeasureFieldConverterImpl {
+public class SpatialIndexFieldConverterImpl extends MeasureFieldConverterImpl {
   private static final Logger LOGGER =
       
LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
   private int index;
   private int[] sourceIndexes;
   CustomIndex instance;
 
-  public IndexFieldConverterImpl(DataField dataField, String nullFormat, int 
index,
+  public SpatialIndexFieldConverterImpl(DataField dataField, String 
nullFormat, int index,
       boolean isEmptyBadRecord, CarbonDataLoadConfiguration configuration) {
     super(dataField, nullFormat, index, isEmptyBadRecord);
     this.index = index;
@@ -52,14 +52,14 @@ public class IndexFieldConverterImpl extends 
MeasureFieldConverterImpl {
             .getTableProperties();
     try {
       instance = CustomIndex.getCustomInstance(properties.get(
-          CarbonCommonConstants.INDEX_HANDLER + "." + 
dataField.getColumn().getColName()
+          CarbonCommonConstants.SPATIAL_INDEX + "." + 
dataField.getColumn().getColName()
               + ".instance"));
     } catch (IOException e) {
       LOGGER.error("Failed to get the custom instance", e);
       throw new RuntimeException(e);
     }
     String sourceColumns = properties.get(
-        CarbonCommonConstants.INDEX_HANDLER + "." + 
dataField.getColumn().getColName()
+        CarbonCommonConstants.SPATIAL_INDEX + "." + 
dataField.getColumn().getColName()
             + ".sourcecolumns");
     String[] sources = sourceColumns.split(",");
     sourceIndexes = new int[sources.length];
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 9d8d792..050b538 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -167,11 +167,11 @@ public class CarbonLoadModel implements Serializable {
   private boolean isLoadWithoutConverterStep;
 
   /**
-   * Whether index columns are present. This flag should be set only when all 
the schema
-   * columns are already converted. Now, just need to generate and convert 
index columns present in
-   * data fields.
+   * Whether non-schema columns are present. This flag should be set only when 
all the schema
+   * columns are already converted. Now, just need to generate and convert 
non-schema columns
+   * present in data fields.
    */
-  private boolean isIndexColumnsPresent;
+  private boolean nonSchemaColumnsPresent;
 
   /**
    * for insert into flow, schema is already re-arranged. No need to 
re-arrange the data
@@ -889,14 +889,6 @@ public class CarbonLoadModel implements Serializable {
     this.metrics = metrics;
   }
 
-  public boolean isIndexColumnsPresent() {
-    return isIndexColumnsPresent;
-  }
-
-  public void setIndexColumnsPresent(boolean indexColumnsPresent) {
-    isIndexColumnsPresent = indexColumnsPresent;
-  }
-
   public boolean isLoadWithoutConverterWithoutReArrangeStep() {
     return isLoadWithoutConverterWithoutReArrangeStep;
   }
@@ -905,4 +897,12 @@ public class CarbonLoadModel implements Serializable {
       boolean loadWithoutConverterWithoutReArrangeStep) {
     isLoadWithoutConverterWithoutReArrangeStep = 
loadWithoutConverterWithoutReArrangeStep;
   }
+
+  public boolean isNonSchemaColumnsPresent() {
+    return nonSchemaColumnsPresent;
+  }
+
+  public void setNonSchemaColumnsPresent(boolean nonSchemaColumnsPresent) {
+    this.nonSchemaColumnsPresent = nonSchemaColumnsPresent;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 12c78eb..45d62c9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -74,8 +74,8 @@ public class RowParserImpl implements RowParser {
     inputMapping = new int[input.length];
     int k = 0;
     for (int i = 0; i < fields.length; i++) {
-      if (fields[i].getColumn().isIndexColumn()) {
-        // Index handler columns are non-schema fields. They are not present 
in the header. So set
+      if (fields[i].getColumn().isSpatialColumn()) {
+        // Index columns are non-schema fields. They are not present in the 
header. So set
         // the input mapping as -1 for the field and continue
         input[k] = fields[i];
         inputMapping[k] = -1;
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 04d8669..21a4bcc 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -332,7 +332,7 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
         while (internalHasNext() && count < batchSize) {
           CarbonRow carbonRow =
               new 
CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields));
-          if (configuration.isIndexColumnsPresent()) {
+          if (configuration.isNonSchemaColumnsPresent()) {
             carbonRow = converter.convert(carbonRow);
           }
           if (isBucketColumnEnabled) {
@@ -346,7 +346,7 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
         while (internalHasNext() && count < batchSize) {
           CarbonRow carbonRow = new CarbonRow(
               
convertToNoDictionaryToBytesWithoutReArrange(currentIterator.next(), 
dataFields));
-          if (configuration.isIndexColumnsPresent()) {
+          if (configuration.isNonSchemaColumnsPresent()) {
             carbonRow = converter.convert(carbonRow);
           }
           if (isBucketColumnEnabled) {
@@ -367,7 +367,7 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] 
dataFields) {
       Object[] newData = new Object[dataFields.length];
       for (int i = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isIndexColumn()) {
+        if (dataFields[i].getColumn().isSpatialColumn()) {
           continue;
         }
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
@@ -412,7 +412,7 @@ public class InputProcessorStepWithNoConverterImpl extends 
AbstractDataLoadProce
       Object[] newData = new Object[dataFields.length];
       // now dictionary is removed, no need of no dictionary mapping
       for (int i = 0, index = 0; i < dataFields.length; i++) {
-        if (dataFields[i].getColumn().isIndexColumn()) {
+        if (dataFields[i].getColumn().isSpatialColumn()) {
           continue;
         }
         if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index a3fbb0d..d8a22a7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -334,8 +334,8 @@ public final class CarbonDataProcessorUtil {
     List<CarbonDimension> dimensions =
         schema.getCarbonTable().getVisibleDimensions();
     for (CarbonDimension dimension : dimensions) {
-      if (!dimension.isIndexColumn()) {
-        // skip the non-schema index column
+      if (!dimension.isSpatialColumn()) {
+        // skip the non-schema column
         columnNames.add(dimension.getColName());
       }
     }

Reply via email to