[CARBONDATA-2690][CarbonStore] implement RESTful API: create table, load data 
and select

This PR adds:
1.basic framework
rewrite the carbon store's Master, Worker and Scheduler code in Java

2.RESTful API
support create a table by using file meta store
support load data to a table in single work
support select data with a filter

This closes #2440


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4437920a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4437920a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4437920a

Branch: refs/heads/carbonstore
Commit: 4437920a3174921d6397c4f596ff941e2cd0faa0
Parents: d9b40bf
Author: QiangCai <qiang...@qq.com>
Authored: Tue Jul 3 20:21:18 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Jul 18 10:10:15 2018 +0800

----------------------------------------------------------------------
 .../schema/table/CarbonTableBuilder.java        |    9 +-
 .../schema/table/TableSchemaBuilder.java        |   27 +-
 .../expression/conditional/ListExpression.java  |    8 +-
 .../conditional/NotEqualsExpression.java        |    7 +-
 .../scan/expression/logical/OrExpression.java   |    2 +-
 .../expression/logical/RangeExpression.java     |    2 +-
 .../carbondata/core/scan/model/QueryModel.java  |    3 +-
 dev/findbugs-exclude.xml                        |    4 +
 dev/javastyle-suppressions.xml                  |    2 +
 .../carbondata/hadoop/CarbonRecordReader.java   |    1 +
 .../hadoop/util/CarbonInputFormatUtil.java      |   20 +-
 .../preaggregate/TestPreAggregateDrop.scala     |    3 +-
 .../TestPreAggregateTableSelection.scala        |    8 +-
 .../carbondata/store/SparkCarbonStore.scala     |   40 +-
 pom.xml                                         |    6 +
 .../loading/model/CarbonLoadModelBuilder.java   |   16 +-
 .../processing/util/CarbonLoaderUtil.java       |    2 +-
 store/conf/log4j.properties                     |   10 +
 store/conf/store.conf                           |   10 +
 store/core/pom.xml                              |    8 +-
 .../apache/carbondata/store/conf/StoreConf.java |  185 +++
 .../exception/ExecutionTimeoutException.java    |   22 +
 .../store/exception/StoreException.java         |   29 +
 .../store/exception/WorkerTooBusyException.java |   26 +
 .../apache/carbondata/store/master/Master.java  |  522 ++++++++
 .../carbondata/store/rpc/QueryService.java      |   33 -
 .../carbondata/store/rpc/RegistryService.java   |    4 +-
 .../carbondata/store/rpc/ServiceFactory.java    |    4 +-
 .../carbondata/store/rpc/StoreService.java      |   40 +
 .../store/rpc/impl/IndexedRecordReader.java     |   22 +
 .../store/rpc/impl/QueryServiceImpl.java        |   56 -
 .../store/rpc/impl/RegistryServiceImpl.java     |    4 +-
 .../store/rpc/impl/RequestHandler.java          |  105 +-
 .../store/rpc/impl/StoreServiceImpl.java        |   78 ++
 .../store/rpc/model/BaseResponse.java           |   69 ++
 .../store/rpc/model/LoadDataRequest.java        |   60 +
 .../store/rpc/model/QueryResponse.java          |   21 +-
 .../store/rpc/model/RegisterWorkerRequest.java  |    4 +
 .../carbondata/store/scheduler/Schedulable.java |   74 ++
 .../carbondata/store/scheduler/Scheduler.java   |  136 +++
 .../apache/carbondata/store/util/StoreUtil.java |  132 +++
 .../apache/carbondata/store/worker/Worker.java  |  166 +++
 .../org/apache/carbondata/store/Master.scala    |  283 -----
 .../org/apache/carbondata/store/Scheduler.scala |  147 ---
 .../org/apache/carbondata/store/Worker.scala    |  113 --
 .../carbondata/store/SchedulerSuite.scala       |  155 ---
 store/horizon/pom.xml                           |   95 ++
 store/horizon/src/main/anltr/Expression.g4      |  163 +++
 .../horizon/antlr/ANTLRNoCaseStringStream.java  |   38 +
 .../carbondata/horizon/antlr/FilterVisitor.java |  215 ++++
 .../horizon/antlr/gen/Expression.tokens         |   51 +
 .../antlr/gen/ExpressionBaseVisitor.java        |  168 +++
 .../horizon/antlr/gen/ExpressionLexer.java      |  228 ++++
 .../horizon/antlr/gen/ExpressionLexer.tokens    |   51 +
 .../horizon/antlr/gen/ExpressionParser.java     | 1117 ++++++++++++++++++
 .../horizon/antlr/gen/ExpressionVisitor.java    |  162 +++
 .../horizon/rest/controller/Horizon.java        |   36 +
 .../rest/controller/HorizonController.java      |   92 ++
 .../rest/model/descriptor/LoadDescriptor.java   |   81 ++
 .../rest/model/descriptor/SelectDescriptor.java |   88 ++
 .../rest/model/descriptor/TableDescriptor.java  |   90 ++
 .../rest/model/validate/RequestValidator.java   |   70 ++
 .../rest/model/view/CreateTableRequest.java     |  174 +++
 .../horizon/rest/model/view/FieldRequest.java   |  114 ++
 .../horizon/rest/model/view/LoadRequest.java    |  132 +++
 .../horizon/rest/model/view/SelectRequest.java  |  130 ++
 .../horizon/rest/model/view/SelectResponse.java |   49 +
 .../horizon/rest/service/HorizonService.java    |  162 +++
 .../carbondata/horizon/FilterParseTest.java     |  161 +++
 .../apache/carbondata/horizon/HorizonTest.java  |  153 +++
 store/horizon/src/test/resources/data1.csv      |   11 +
 .../carbondata/sdk/file/CSVCarbonWriter.java    |    4 +-
 .../sdk/file/CarbonWriterBuilder.java           |    2 +-
 .../org/apache/carbondata/sdk/file/Schema.java  |   67 ++
 74 files changed, 5717 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
index e1d2162..cd27cf1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
@@ -59,12 +59,11 @@ public class CarbonTableBuilder {
     return this;
   }
 
-  public CarbonTable build() {
+  public TableInfo buildTableInfo() {
     Objects.requireNonNull(tablePath, "tablePath should not be null");
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
     Objects.requireNonNull(isTransactionalTable, "Transactional Table should 
not be null");
 
-
     TableInfo tableInfo = new TableInfo();
     tableInfo.setDatabaseName(databaseName);
     tableInfo.setTableUniqueName(databaseName + "_" + tableName);
@@ -73,6 +72,10 @@ public class CarbonTableBuilder {
     tableInfo.setTransactionalTable(isTransactionalTable);
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
-    return CarbonTable.buildFromTableInfo(tableInfo);
+    return tableInfo;
+  }
+
+  public CarbonTable build() {
+    return CarbonTable.buildFromTableInfo(buildTableInfo());
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 4780e60..6f575a4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -54,6 +54,8 @@ public class TableSchemaBuilder {
 
   private List<ColumnSchema> measures = new LinkedList<>();
 
+  private Map<String, String> tableProperties;
+
   private int blockSize;
 
   private int blockletSize;
@@ -62,6 +64,14 @@ public class TableSchemaBuilder {
   private boolean isLocalDictionaryEnabled;
   private String localDictionaryThreshold;
 
+  public TableSchemaBuilder properties(Map<String, String> tableProperties) {
+    if (tableProperties == null) {
+      throw new IllegalArgumentException("blockSize should not be null");
+    }
+    this.tableProperties = tableProperties;
+    return this;
+  }
+
   public TableSchemaBuilder blockSize(int blockSize) {
     if (blockSize <= 0) {
       throw new IllegalArgumentException("blockSize should be greater than 0");
@@ -111,22 +121,25 @@ public class TableSchemaBuilder {
     allColumns.addAll(measures);
     schema.setListOfColumns(allColumns);
 
-    Map<String, String> property = new HashMap<>();
+    if (tableProperties == null) {
+      tableProperties = new HashMap<>();
+    }
     if (blockSize > 0) {
-      property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, 
String.valueOf(blockSize));
+      tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, 
String.valueOf(blockSize));
     }
     if (blockletSize > 0) {
-      property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, 
String.valueOf(blockletSize));
+      tableProperties.put(
+          CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, 
String.valueOf(blockletSize));
     }
 
     // Adding local dictionary, applicable only for String(dictionary exclude)
     if (isLocalDictionaryEnabled) {
-      property.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
           String.valueOf(isLocalDictionaryEnabled));
       String localdictionaryThreshold = 
localDictionaryThreshold.equalsIgnoreCase("0") ?
           CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT :
           localDictionaryThreshold;
-      property.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, 
localdictionaryThreshold);
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, 
localdictionaryThreshold);
       for (int index = 0; index < allColumns.size(); index++) {
         ColumnSchema colSchema = allColumns.get(index);
         if (colSchema.getDataType() == DataTypes.STRING
@@ -136,8 +149,8 @@ public class TableSchemaBuilder {
         }
       }
     }
-    if (property.size() != 0) {
-      schema.setTableProperties(property);
+    if (tableProperties.size() != 0) {
+      schema.setTableProperties(tableProperties);
     }
     return schema;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
index 32b5028..a686543 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
@@ -69,8 +69,14 @@ public class ListExpression extends Expression {
   public String getStatement() {
     StringBuffer value = new StringBuffer();
     value.append("(");
+    boolean start = false;
     for (Expression expr : children) {
-      value.append(expr.getString()).append(";");
+      if (start) {
+        value.append(", ");
+      } else {
+        start = true;
+      }
+      value.append(expr.getStatement());
     }
     value.append(')');
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index eae8019..438337c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -100,6 +100,11 @@ public class NotEqualsExpression extends 
BinaryConditionalExpression {
 
   @Override
   public String getStatement() {
-    return left.getStatement() + " <> " + right.getStatement();
+    if (isNotNull) {
+      return left.getStatement() + " is not " + right.getStatement();
+    } else {
+      return left.getStatement() + " <> " + right.getStatement();
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
index 148081a..5b7dda0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
@@ -59,6 +59,6 @@ public class OrExpression extends BinaryLogicalExpression {
 
   @Override
   public String getStatement() {
-    return "(" + left.getString() + " or " + right.getString() + ")";
+    return "(" + left.getStatement() + " or " + right.getStatement() + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
index b49b6f9..f411722 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
@@ -65,7 +65,7 @@ public class RangeExpression extends 
BinaryConditionalExpression {
 
   @Override
   public String getStatement() {
-    return left.getStatement() + " between " + right.getStatement();
+    return left.getStatement() + " and " + right.getStatement();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index b15ce02..72666e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -376,6 +376,7 @@ public class QueryModel {
     return String.format("scan on table %s.%s, %d projection columns with 
filter (%s)",
         table.getDatabaseName(), table.getTableName(),
         projection.getDimensions().size() + projection.getMeasures().size(),
-        filterExpressionResolverTree.getFilterExpression().toString());
+        filterExpressionResolverTree == null ? "" :
+            filterExpressionResolverTree.getFilterExpression().toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/dev/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index 63b6bd5..e3f0523 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -21,6 +21,10 @@
   </Match>
 
   <Match>
+    <Package name="org.apache.carbondata.horizon.antlr.gen"/>
+  </Match>
+
+  <Match>
     <Source name="~.*\.scala" />
   </Match>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/dev/javastyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/dev/javastyle-suppressions.xml b/dev/javastyle-suppressions.xml
index 9780dcb..bb4e6f1 100644
--- a/dev/javastyle-suppressions.xml
+++ b/dev/javastyle-suppressions.xml
@@ -32,4 +32,6 @@
               
files="org/apache/carbondata/processing/newflow/sort/unsafe/sort/TimSort.java"/>
     <suppress checks=".*"
               files="org/apache/carbondata/core/memory/HeapMemoryAllocator"/>
+    <suppress checks=".*"
+              files="org/apache/carbondata/horizon/antlr/gen"/>
 </suppressions>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 6c796b5..2e3d7aa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -129,6 +129,7 @@ public class CarbonRecordReader<T> extends 
AbstractRecordReader<T> {
       DataMapStoreManager.getInstance().clearDataMaps(
           queryModel.getTable().getAbsoluteTableIdentifier());
     }
+
     // close read support
     readSupport.close();
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index af7397b..3076cd7 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -92,6 +92,19 @@ public class CarbonInputFormatUtil {
       Expression filterExpression,
       List<PartitionSpec> partitionNames,
       DataMapJob dataMapJob) throws IOException, InvalidConfigurationException 
{
+    return createCarbonTableInputFormat(job, carbonTable, projectionColumns, 
filterExpression,
+        partitionNames, dataMapJob, false);
+  }
+
+
+  public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
+      Job job,
+      CarbonTable carbonTable,
+      String[] projectionColumns,
+      Expression filterExpression,
+      List<PartitionSpec> partitionNames,
+      DataMapJob dataMapJob,
+      boolean isSearchMode) throws IOException, InvalidConfigurationException {
     Configuration conf = job.getConfiguration();
     CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
     CarbonInputFormat.setDatabaseName(conf, 
carbonTable.getTableInfo().getDatabaseName());
@@ -103,7 +116,7 @@ public class CarbonInputFormatUtil {
         .setTransactionalTable(conf, 
carbonTable.getTableInfo().isTransactionalTable());
     CarbonProjection columnProjection = new 
CarbonProjection(projectionColumns);
     return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
-        filterExpression, columnProjection, dataMapJob);
+        filterExpression, columnProjection, dataMapJob, isSearchMode);
   }
 
   private static <V> CarbonTableInputFormat<V> createInputFormat(
@@ -111,7 +124,8 @@ public class CarbonInputFormatUtil {
       AbsoluteTableIdentifier identifier,
       Expression filterExpression,
       CarbonProjection columnProjection,
-      DataMapJob dataMapJob) throws InvalidConfigurationException, IOException 
{
+      DataMapJob dataMapJob,
+      boolean isSearchMode) throws InvalidConfigurationException, IOException {
     CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
     CarbonInputFormat.setTablePath(
         conf,
@@ -121,7 +135,7 @@ public class CarbonInputFormatUtil {
     CarbonInputFormat.setColumnProjection(conf, columnProjection);
     if (dataMapJob != null) {
       DataMapUtil.setDataMapJob(conf, dataMapJob);
-    } else {
+    } else if (!isSearchMode) {
       setDataMapJobIfConfigured(conf);
     }
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index f73a587..2a9fdcd 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -91,8 +91,7 @@ class TestPreAggregateDrop extends QueryTest with 
BeforeAndAfterAll {
     sql(
       "create datamap preagg2 on table maintable using 'preaggregate' as 
select" +
       " a,sum(c) from maintable group by a")
-    sql("drop table if exists maintable")
-    checkExistence(sql("show tables"), false, "maintable_preagg1", 
"maintable", "maintable_preagg2")
+    checkExistence(sql("show tables").select("database", "tableName"), false, 
"defaultmaintable_preagg1", "defaultmaintable", "defaultmaintable_preagg2")
   }
 
   test("drop datamap with 'if exists' when datamap not exists") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index ebb2491..9e4a0d0 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
-import java.io.File
-
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
@@ -416,7 +414,7 @@ class TestPreAggregateTableSelection extends SparkQueryTest 
with BeforeAndAfterA
         | - agg1 (preaggregate)
         |Table Scan on maintable_agg1
         | - total blocklets: 1
-        | - filter: (maintable_name <> null and maintable_name = a)
+        | - filter: (maintable_name is not null and maintable_name = a)
         | - pruned by Main DataMap
         |    - skipped blocklets: 1
         |""".stripMargin)(rows(0).getString(0))
@@ -432,14 +430,14 @@ class TestPreAggregateTableSelection extends 
SparkQueryTest with BeforeAndAfterA
       """
         |Table Scan on maintable
         | - total blocklets: 1
-        | - filter: ((id <> null and id < 3) and name <> null)
+        | - filter: ((id is not null and id < 3) and name is not null)
         | - pruned by Main DataMap
         |    - skipped blocklets: 0""".stripMargin))
     assert(rows(0).getString(0).contains(
       """
         |Table Scan on maintableavg
         | - total blocklets: 1
-        | - filter: name <> null
+        | - filter: name is not null
         | - pruned by Main DataMap
         |    - skipped blocklets: 0""".stripMargin))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index d99081d..a4124a2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -22,7 +22,7 @@ import java.net.InetAddress
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{CarbonInputMetrics, SparkConf}
+import org.apache.spark.{CarbonInputMetrics, SparkConf, SparkEnv}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.SparkSession
 
@@ -34,6 +34,10 @@ import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.store.conf.StoreConf
+import org.apache.carbondata.store.master.Master
+import org.apache.carbondata.store.worker.Worker
 
 /**
  * A CarbonStore implementation that uses Spark as underlying compute engine
@@ -110,7 +114,11 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
 
   def startSearchMode(): Unit = {
     LOG.info("Starting search mode master")
-    master = new Master()
+    val conf = new StoreConf()
+    conf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress)
+    conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
+    conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
+    master = Master.getInstance(conf)
     master.startService()
     startAllWorkers()
   }
@@ -154,7 +162,33 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
     val rows = session.sparkContext.parallelize(1 to numExecutors * 10, 
numExecutors)
       .mapPartitions { f =>
         // start worker
-        Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
+        val conf = new StoreConf()
+        conf.conf(StoreConf.WORKER_HOST, 
InetAddress.getLocalHost.getHostAddress)
+        conf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort)
+        conf.conf(StoreConf.WORKER_CORE_NUM, 2)
+        conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
+        conf.conf(StoreConf.MASTER_HOST, masterIp)
+        conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
+
+        var storeLocation: String = null
+        val carbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false")
+        if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+          val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (null != storeLocations && storeLocations.nonEmpty) {
+            storeLocation = storeLocations.mkString(",")
+          }
+          if (storeLocation == null) {
+            storeLocation = System.getProperty("java.io.tmpdir")
+          }
+        } else {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+        conf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation)
+
+        val worker = new Worker(conf)
+        worker.start()
         new Iterator[Int] {
           override def hasNext: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aeb87d3..b1cf347 100644
--- a/pom.xml
+++ b/pom.xml
@@ -585,6 +585,12 @@
       </build>
     </profile>
     <profile>
+      <id>horizon</id>
+      <modules>
+        <module>store/horizon</module>
+      </modules>
+    </profile>
+    <profile>
       <id>include-all</id>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 0b60fa5..1ff7752 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -51,10 +51,20 @@ public class CarbonLoadModelBuilder {
 
   private CarbonTable table;
 
+  private String inputPath;
+
   public CarbonLoadModelBuilder(CarbonTable table) {
     this.table = table;
   }
 
+  public String getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(String inputPath) {
+    this.inputPath = inputPath;
+  }
+
   /**
    * build CarbonLoadModel for data loading
    * @param options Load options from user input
@@ -65,7 +75,9 @@ public class CarbonLoadModelBuilder {
       throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = 
LoadOption.fillOptionWithDefaultValue(options);
 
-    if (!options.containsKey("fileheader")) {
+    if (inputPath == null &&
+        !options.containsKey("header") &&
+        !options.containsKey("fileheader")) {
       List<CarbonColumn> csvHeader = 
table.getCreateOrderColumn(table.getTableName());
       String[] columns = new String[csvHeader.size()];
       for (int i = 0; i < columns.length; i++) {
@@ -74,10 +86,12 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     optionsFinal.put("bad_record_path", 
CarbonBadRecordUtil.getBadRecordsPath(options, table));
+
     CarbonLoadModel model = new CarbonLoadModel();
     model.setCarbonTransactionalTable(table.isTransactionalTable());
     model.setFactTimeStamp(UUID);
     model.setTaskNo(taskNo);
+    model.setFactFilePath(inputPath);
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 8a0c2b6..0ff71c7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -577,7 +577,7 @@ public final class CarbonLoaderUtil {
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
    * @param blockAssignmentStrategy strategy used to assign blocks
-   * @param loadMinSize the property load_min_size_inmb specified by the user
+   * @param expectedMinSizePerNode expected minimum size per node
    * @return a map that maps node to blocks
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/store/conf/log4j.properties b/store/conf/log4j.properties
new file mode 100755
index 0000000..101ef37
--- /dev/null
+++ b/store/conf/log4j.properties
@@ -0,0 +1,10 @@
+log4j.rootLogger=INFO, C1, F1
+log4j.appender.C1=org.apache.log4j.ConsoleAppender
+log4j.appender.C1.layout=org.apache.log4j.PatternLayout
+log4j.appender.C1.layout.ConversionPattern=%d %p %m%n
+
+log4j.appender.F1=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.F1.file=${log.path}
+log4j.appender.F1.append=true
+log4j.appender.F1.layout=org.apache.log4j.PatternLayout
+log4j.appender.F1.layout.ConversionPattern=%d %p %m%n

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/conf/store.conf
----------------------------------------------------------------------
diff --git a/store/conf/store.conf b/store/conf/store.conf
new file mode 100644
index 0000000..7f18076
--- /dev/null
+++ b/store/conf/store.conf
@@ -0,0 +1,10 @@
+# worker
+carbon.worker.host=127.0.0.1
+carbon.worker.port=10021
+carbon.worker.core.num=2
+carbon.store.temp.location=/tmp/carbon.store.temp
+
+# worker and master
+carbon.master.host=127.0.0.1
+carbon.master.port=10020
+carbon.store.location=/tmp/carbon.store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
index 0bee84f..6b2703e 100644
--- a/store/core/pom.xml
+++ b/store/core/pom.xml
@@ -16,19 +16,14 @@
 
   <properties>
     <dev.path>${basedir}/../../dev</dev.path>
+    <spring.version>2.0.2.RELEASE</spring.version>
   </properties>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-hadoop</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-store-sdk</artifactId>
       <version>${project.version}</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
@@ -43,7 +38,6 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java 
b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
new file mode 100644
index 0000000..da2a697
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.conf;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class StoreConf implements Serializable, Writable {
+
+  public static final String SELECT_PROJECTION = "carbon.select.projection";
+  public static final String SELECT_FILTER = "carbon.select.filter";
+  public static final String SELECT_LIMIT = "carbon.select.limit";
+
+  public static final String SELECT_ID = "carbon.select.id";
+
+  public static final String WORKER_HOST = "carbon.worker.host";
+  public static final String WORKER_PORT = "carbon.worker.port";
+  public static final String WORKER_CORE_NUM = "carbon.worker.core.num";
+  public static final String MASTER_HOST = "carbon.master.host";
+  public static final String MASTER_PORT = "carbon.master.port";
+
+  public static final String STORE_TEMP_LOCATION = 
"carbon.store.temp.location";
+  public static final String STORE_LOCATION = "carbon.store.location";
+
+  private Map<String, String> conf = new HashMap<>();
+
+  public StoreConf() {
+  }
+
+  public StoreConf(String filePath) {
+    load(filePath);
+  }
+
+  public StoreConf conf(String key, String value) {
+    conf.put(key, value);
+    return this;
+  }
+
+  public StoreConf conf(String key, int value) {
+    conf.put(key, "" + value);
+    return this;
+  }
+
+  public void load(String filePath) {
+    StoreUtil.loadProperties(filePath, this);
+  }
+
+  public void conf(StoreConf conf) {
+    this.conf.putAll(conf.conf);
+  }
+
+  public Object conf(String key) {
+    return conf.get(key);
+  }
+
+  public String[] projection() {
+    return stringArrayValue(SELECT_PROJECTION);
+  }
+
+  public String filter() {
+    return stringValue(SELECT_FILTER);
+  }
+
+  public int limit() {
+    return intValue(SELECT_LIMIT);
+  }
+
+  public String masterHost() {
+    return stringValue(MASTER_HOST);
+  }
+
+  public int masterPort() {
+    return intValue(MASTER_PORT);
+  }
+
+  public String workerHost() {
+    return stringValue(WORKER_HOST);
+  }
+
+  public int workerPort() {
+    return intValue(WORKER_PORT);
+  }
+
+  public int workerCoreNum() {
+    return intValue(WORKER_CORE_NUM);
+  }
+
+  public String storeLocation() {
+    return stringValue(STORE_LOCATION);
+  }
+
+  public String[] storeTempLocation() {
+    return stringArrayValue(STORE_TEMP_LOCATION);
+  }
+
+  public String selectId() {
+    return stringValue(SELECT_ID);
+  }
+
+  public Configuration newHadoopConf() {
+    Configuration hadoopConf = FileFactory.getConfiguration();
+    for (Map.Entry<String, String> entry : conf.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key != null && value != null && key.startsWith("carbon.hadoop.")) {
+        hadoopConf.set(key.substring("carbon.hadoop.".length()), value);
+      }
+    }
+    return hadoopConf;
+  }
+
+  private String stringValue(String key) {
+    Object obj = conf.get(key);
+    if (obj == null) {
+      return null;
+    }
+    return obj.toString();
+  }
+
+  private int intValue(String key) {
+    String value = conf.get(key);
+    if (value == null) {
+      return -1;
+    }
+    return Integer.parseInt(value);
+  }
+
+  private String[] stringArrayValue(String key) {
+    String value = conf.get(key);
+    if (value == null) {
+      return null;
+    }
+    return value.split(",", -1);
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    Set<Map.Entry<String, String>> entries = conf.entrySet();
+    WritableUtils.writeVInt(out, conf.size());
+    for (Map.Entry<String, String> entry : entries) {
+      WritableUtils.writeString(out, entry.getKey());
+      WritableUtils.writeString(out, entry.getValue());
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    if (conf == null) {
+      conf = new HashMap<>();
+    }
+
+    int size = WritableUtils.readVInt(in);
+    String key, value;
+    for (int i = 0; i < size; i++) {
+      key = WritableUtils.readString(in);
+      value = WritableUtils.readString(in);
+      conf.put(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
new file mode 100644
index 0000000..c7a4d6b
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.exception;
+
+public class ExecutionTimeoutException extends RuntimeException {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
new file mode 100644
index 0000000..c55fa7c
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.exception;
+
+public class StoreException extends Exception {
+
+  public StoreException() {
+    super();
+  }
+
+  public StoreException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
new file mode 100644
index 0000000..b366a67
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.exception;
+
+public class WorkerTooBusyException extends RuntimeException {
+
+  public WorkerTooBusyException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java 
b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
new file mode 100644
index 0000000..0a724d9
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.master;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.store.exception.ExecutionTimeoutException;
+import org.apache.carbondata.store.exception.StoreException;
+import org.apache.carbondata.store.rpc.RegistryService;
+import org.apache.carbondata.store.rpc.ServiceFactory;
+import org.apache.carbondata.store.rpc.StoreService;
+import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl;
+import org.apache.carbondata.store.rpc.impl.Status;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.scheduler.Schedulable;
+import org.apache.carbondata.store.scheduler.Scheduler;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Master of CarbonSearch.
+ * It provides a Registry service for worker to register.
+ * And it provides search API to fire RPC call to workers.
+ */
+
+public class Master {
+
+  private static Master instance = null;
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Master.class.getName());
+
+  private Map<String, SoftReference<CarbonTable>> cacheTables;
+
+  // worker host address map to EndpointRef
+  private StoreConf conf;
+  private Configuration hadoopConf;
+  private Random random = new Random();
+  private RPC.Server registryServer = null;
+  private Scheduler scheduler = new Scheduler();
+
+  private Master(StoreConf conf) {
+    cacheTables = new HashMap<>();
+    this.conf = conf;
+    this.hadoopConf = this.conf.newHadoopConf();
+  }
+
+  /**
+   * start service and listen on port passed in constructor
+   */
+  public void startService() throws IOException {
+    if (registryServer == null) {
+
+      BindException exception;
+      // we will try to create service at worse case 100 times
+      int numTry = 100;
+      String host = conf.masterHost();
+      int port = conf.masterPort();
+      LOGGER.info("building registry-service on " + host + ":" + port);
+
+      RegistryService registryService = new RegistryServiceImpl(this);
+      do {
+        try {
+          registryServer = new 
RPC.Builder(hadoopConf).setBindAddress(host).setPort(port)
+              
.setProtocol(RegistryService.class).setInstance(registryService).build();
+
+          registryServer.start();
+          numTry = 0;
+          exception = null;
+        } catch (BindException e) {
+          // port is occupied, increase the port number and try again
+          exception = e;
+          LOGGER.error(e, "start registry-service failed");
+          port = port + 1;
+          numTry = numTry - 1;
+        }
+      } while (numTry > 0);
+      if (exception != null) {
+        // we have tried many times, but still failed to find an available port
+        throw exception;
+      }
+      LOGGER.info("registry-service started");
+    } else {
+      LOGGER.info("Search mode master has already started");
+    }
+  }
+
+  public void stopService() throws InterruptedException {
+    if (registryServer != null) {
+      registryServer.stop();
+      registryServer.join();
+      registryServer = null;
+    }
+  }
+
+  public void stopAllWorkers() throws IOException {
+    for (Schedulable worker : getWorkers()) {
+      try {
+        worker.service.shutdown(new ShutdownRequest("user"));
+      } catch (Throwable throwable) {
+        throw new IOException(throwable);
+      }
+      scheduler.removeWorker(worker.getAddress());
+    }
+  }
+
+  /**
+   * A new searcher is trying to register, add it to the map and connect to 
this searcher
+   */
+  public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) 
throws IOException {
+    LOGGER.info(
+        "Receive Register request from worker " + request.getHostAddress() + 
":" + request.getPort()
+            + " with " + request.getCores() + " cores");
+    String workerId = UUID.randomUUID().toString();
+    String workerAddress = request.getHostAddress();
+    int workerPort = request.getPort();
+    LOGGER.info(
+        "connecting to worker " + request.getHostAddress() + ":" + 
request.getPort() + ", workerId "
+            + workerId);
+
+    StoreService searchService = 
ServiceFactory.createStoreService(workerAddress, workerPort);
+    scheduler.addWorker(
+        new Schedulable(workerId, workerAddress, workerPort, 
request.getCores(), searchService));
+    LOGGER.info("worker " + request + " registered");
+    return new RegisterWorkerResponse(workerId);
+  }
+
+  private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> 
output, long globalLimit)
+      throws IOException {
+    // in case of RPC success, collect all rows in response message
+    if (result.getQueryId() != queryId) {
+      throw new IOException(
+          "queryId in response does not match request: " + result.getQueryId() 
+ " != " + queryId);
+    }
+    if (result.getStatus() != Status.SUCCESS.ordinal()) {
+      throw new IOException("failure in worker: " + result.getMessage());
+    }
+    int rowCount = 0;
+    Object[][] rows = result.getRows();
+    for (Object[] row : rows) {
+      output.add(new CarbonRow(row));
+      rowCount++;
+      if (rowCount >= globalLimit) {
+        break;
+      }
+    }
+    LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + 
rowCount);
+    return rowCount;
+  }
+
+  private void onFailure(Throwable e) throws IOException {
+    throw new IOException("exception in worker: " + e.getMessage());
+  }
+
+  private void onTimeout() {
+    throw new ExecutionTimeoutException();
+  }
+
+  public String getTableFolder(String database, String tableName) {
+    return conf.storeLocation() + File.separator + database + File.separator + 
tableName;
+  }
+
+  public CarbonTable getTable(String database, String tableName) throws 
StoreException {
+    String tablePath = getTableFolder(database, tableName);
+    CarbonTable carbonTable;
+    SoftReference<CarbonTable> reference = cacheTables.get(tablePath);
+    if (reference != null) {
+      carbonTable = reference.get();
+      if (carbonTable != null) {
+        return carbonTable;
+      }
+    }
+
+    try {
+      org.apache.carbondata.format.TableInfo tableInfo =
+          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo tableInfo1 = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
+      tableInfo1.setTablePath(tablePath);
+      carbonTable = CarbonTable.buildFromTableInfo(tableInfo1);
+      cacheTables.put(tablePath, new SoftReference<>(carbonTable));
+      return carbonTable;
+    } catch (IOException e) {
+      String message = "Failed to get table from " + tablePath;
+      LOGGER.error(e, message);
+      throw new StoreException(message);
+    }
+  }
+
+  public boolean createTable(TableInfo tableInfo, boolean ifNotExists) throws 
IOException {
+    AbsoluteTableIdentifier identifier = 
tableInfo.getOrCreateAbsoluteTableIdentifier();
+    boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
+    if (tableExists) {
+      if (ifNotExists) {
+        return true;
+      } else {
+        throw new IOException(
+            "car't create table " + tableInfo.getDatabaseName() + "." + 
tableInfo.getFactTable()
+                .getTableName() + ", because it already exists");
+      }
+    }
+
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    String databaseName = tableInfo.getDatabaseName();
+    String tableName = tableInfo.getFactTable().getTableName();
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, 
databaseName, tableName);
+
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+    String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
+    FileFactory.FileType fileType = 
FileFactory.getFileType(schemaMetadataPath);
+    try {
+      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, 
fileType);
+        if (!isDirCreated) {
+          throw new IOException("Failed to create the metadata directory " + 
schemaMetadataPath);
+        }
+      }
+      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+      thriftWriter.open(FileWriteOperation.OVERWRITE);
+      thriftWriter.write(thriftTableInfo);
+      thriftWriter.close();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle create table");
+      throw e;
+    }
+  }
+
+  private void openSegment(CarbonLoadModel loadModel, boolean 
isOverwriteTable) throws IOException {
+    try {
+      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, 
isOverwriteTable);
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle load data");
+      throw e;
+    }
+  }
+
+  private void closeSegment(CarbonLoadModel loadModel) throws IOException {
+    try {
+      CarbonLoaderUtil.updateTableStatusForFailure(loadModel, "");
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to close segment");
+      throw e;
+    }
+  }
+
+  private void commitSegment(CarbonLoadModel loadModel) throws IOException {
+    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    String segmentId = loadModel.getSegmentId();
+    String segmentFileName = SegmentFileStore
+        .writeSegmentFile(carbonTable, segmentId, 
String.valueOf(loadModel.getFactTimeStamp()));
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier();
+    String tablePath = absoluteTableIdentifier.getTablePath();
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+    try {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table 
status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metadataPath);
+        LoadMetadataDetails loadMetadataDetails = null;
+        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+          // if the segments is in the list of marked for delete then update 
the status.
+          if (segmentId.equals(detail.getLoadName())) {
+            loadMetadataDetails = detail;
+            detail.setSegmentFile(segmentFileName);
+            break;
+          }
+        }
+        if (loadMetadataDetails == null) {
+          throw new IOException("can not find segment: " + segmentId);
+        }
+
+        CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, 
SegmentStatus.SUCCESS,
+            loadModel.getFactTimeStamp(), true);
+        CarbonLoaderUtil
+            .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, 
carbonTable);
+
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table 
path " + tablePath);
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" 
+ tablePath);
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + tablePath + " during 
table status updation");
+      }
+    }
+  }
+
+  public boolean loadData(CarbonLoadModel loadModel, boolean isOverwrite) 
throws IOException {
+    Schedulable worker = scheduler.pickNexWorker();
+    try {
+      if (loadModel.getFactTimeStamp() == 0) {
+        loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
+      }
+      openSegment(loadModel, isOverwrite);
+      LoadDataRequest request = new LoadDataRequest(loadModel);
+      BaseResponse response = scheduler.sendRequest(worker, request);
+      if (Status.SUCCESS.ordinal() == response.getStatus()) {
+        commitSegment(loadModel);
+        return true;
+      } else {
+        closeSegment(loadModel);
+        throw new IOException(response.getMessage());
+      }
+    } finally {
+      worker.workload.decrementAndGet();
+    }
+  }
+
+  /**
+   * Execute search by firing RPC call to worker, return the result rows
+   *
+   * @param table       table to search
+   * @param columns     projection column names
+   * @param filter      filter expression
+   * @param globalLimit max number of rows required in Master
+   * @param localLimit  max number of rows required in Worker
+   * @return CarbonRow array
+   */
+  public CarbonRow[] search(CarbonTable table, String[] columns, Expression 
filter,
+      long globalLimit, long localLimit) throws IOException {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(columns);
+    if (globalLimit < 0 || localLimit < 0) {
+      throw new IllegalArgumentException("limit should be positive");
+    }
+
+    int queryId = random.nextInt();
+
+    List<CarbonRow> output = new ArrayList<>();
+
+    // prune data and get a mapping of worker hostname to list of blocks,
+    // then add these blocks to the QueryRequest and fire the RPC call
+    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, 
columns, filter);
+    Set<Map.Entry<String, List<Distributable>>> entries = 
nodeBlockMapping.entrySet();
+    List<Future<QueryResponse>> futures = new ArrayList<>(entries.size());
+    List<Schedulable> workers = new ArrayList<>(entries.size());
+    for (Map.Entry<String, List<Distributable>> entry : entries) {
+      CarbonMultiBlockSplit split = new 
CarbonMultiBlockSplit(entry.getValue(), entry.getKey());
+      QueryRequest request =
+          new QueryRequest(queryId, split, table.getTableInfo(), columns, 
filter, localLimit);
+
+      // Find an Endpoind and send the request to it
+      // This RPC is non-blocking so that we do not need to wait before send 
to next worker
+      Schedulable worker = scheduler.pickWorker(entry.getKey());
+      workers.add(worker);
+      futures.add(scheduler.sendRequestAsync(worker, request));
+    }
+
+    int rowCount = 0;
+    int length = futures.size();
+    for (int i = 0; i < length; i++) {
+      Future<QueryResponse> future = futures.get(i);
+      Schedulable worker = workers.get(i);
+      if (rowCount < globalLimit) {
+        // wait for worker
+        QueryResponse response = null;
+        try {
+          response = future
+              .get((long) (CarbonProperties.getInstance().getQueryTimeout()), 
TimeUnit.SECONDS);
+        } catch (ExecutionException | InterruptedException e) {
+          onFailure(e);
+        } catch (TimeoutException t) {
+          onTimeout();
+        } finally {
+          worker.workload.decrementAndGet();
+        }
+        LOGGER.info("[QueryId: " + queryId + "] receive search response from 
worker " + worker);
+        rowCount += onSuccess(queryId, response, output, globalLimit);
+      }
+    }
+    CarbonRow[] rows = new CarbonRow[output.size()];
+    return output.toArray(rows);
+  }
+
+  /**
+   * Prune data by using CarbonInputFormat.getSplit
+   * Return a mapping of host address to list of block
+   */
+  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, 
String[] columns,
+      Expression filter) throws IOException {
+    JobConf jobConf = new JobConf(new Configuration());
+    Job job = new Job(jobConf);
+    CarbonTableInputFormat format;
+    try {
+      format = CarbonInputFormatUtil
+          .createCarbonTableInputFormat(job, table, columns, filter, null, 
null, true);
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    // We will do FG pruning in reader side, so don't do it here
+    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
+    List<InputSplit> splits = format.getSplits(job);
+    List<Distributable> blockInfos = new ArrayList<>(splits.size());
+    for (InputSplit split : splits) {
+      blockInfos.add((Distributable) split);
+    }
+    return CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, 
getWorkerAddresses(),
+        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
+  }
+
+  /**
+   * return hostname of all workers
+   */
+  public List<Schedulable> getWorkers() {
+    return scheduler.getAllWorkers();
+  }
+
+  private List<String> getWorkerAddresses() {
+    return scheduler.getAllWorkerAddresses();
+  }
+
+  public static synchronized Master getInstance(StoreConf conf) {
+    if (instance == null) {
+      instance = new Master(conf);
+    }
+    return instance;
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    if (args.length != 2) {
+      System.err.println("Usage: Master <log4j file> <properties file>");
+      return;
+    }
+
+    StoreUtil.initLog4j(args[0]);
+    StoreConf conf = new StoreConf(args[1]);
+    Master master = getInstance(conf);
+    master.stopService();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
deleted file mode 100644
index faaa746..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-@InterfaceAudience.Internal
-public interface QueryService extends VersionedProtocol {
-  long versionID = 1L;
-  QueryResponse query(QueryRequest request);
-  ShutdownResponse shutdown(ShutdownRequest request);
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
index 4d17686..08a0e97 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.store.rpc;
 
+import java.io.IOException;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
 import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
@@ -26,5 +28,5 @@ import org.apache.hadoop.ipc.VersionedProtocol;
 @InterfaceAudience.Internal
 public interface RegistryService extends VersionedProtocol {
   long versionID = 1L;
-  RegisterWorkerResponse registerWorker(RegisterWorkerRequest request);
+  RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
index a50ab8b..d9d0f3e 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
@@ -29,10 +29,10 @@ import org.apache.hadoop.ipc.RPC;
 @InterfaceAudience.Internal
 public class ServiceFactory {
 
-  public static QueryService createSearchService(String host, int port) throws 
IOException {
+  public static StoreService createStoreService(String host, int port) throws 
IOException {
     InetSocketAddress address = new 
InetSocketAddress(InetAddress.getByName(host), port);
     return RPC.getProxy(
-        QueryService.class, QueryService.versionID, address, new 
Configuration());
+        StoreService.class, StoreService.versionID, address, new 
Configuration());
   }
 
   public static RegistryService createRegistryService(String host, int port) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
new file mode 100644
index 0000000..48dec79
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface StoreService extends VersionedProtocol {
+
+  long versionID = 1L;
+
+  BaseResponse loadData(LoadDataRequest request);
+
+  QueryResponse query(QueryRequest request);
+
+  ShutdownResponse shutdown(ShutdownRequest request);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
index 2c768d1..45e2dce 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.Segment;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
@@ -158,4 +161,23 @@ public class IndexedRecordReader extends 
CarbonRecordReader<CarbonRow> {
     queryModel.setTableBlockInfos(blockToRead);
     return queryModel;
   }
+
+  @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = 
queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : 
columnToDictionaryMapping.entrySet()) {
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+
+    // close read support
+    readSupport.close();
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
deleted file mode 100644
index b191331..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.rpc.QueryService;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-
-@InterfaceAudience.Internal
-public class QueryServiceImpl implements QueryService {
-
-  @Override
-  public QueryResponse query(QueryRequest request) {
-    RequestHandler handler = new RequestHandler();
-    return handler.handleSearch(request);
-  }
-
-  @Override
-  public ShutdownResponse shutdown(ShutdownRequest request) {
-    RequestHandler handler = new RequestHandler();
-    return handler.handleShutdown(request);
-  }
-
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
-    return versionID;
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
-      int clientMethodsHash) throws IOException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
index 12f48ba..03f9b2c 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
@@ -20,7 +20,7 @@ package org.apache.carbondata.store.rpc.impl;
 import java.io.IOException;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.Master;
+import org.apache.carbondata.store.master.Master;
 import org.apache.carbondata.store.rpc.RegistryService;
 import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
 import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
@@ -37,7 +37,7 @@ public class RegistryServiceImpl implements RegistryService {
   }
 
   @Override
-  public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) {
+  public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) 
throws IOException {
     return master.addWorker(request);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
index 29ee546..3b98019 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
@@ -18,10 +18,12 @@
 package org.apache.carbondata.store.rpc.impl;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -37,10 +39,29 @@ import org.apache.carbondata.core.util.CarbonTaskInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import 
org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
 import org.apache.carbondata.store.rpc.model.QueryRequest;
 import org.apache.carbondata.store.rpc.model.QueryResponse;
 import org.apache.carbondata.store.rpc.model.ShutdownRequest;
 import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 /**
  * It handles request from master.
@@ -48,27 +69,35 @@ import 
org.apache.carbondata.store.rpc.model.ShutdownResponse;
 @InterfaceAudience.Internal
 class RequestHandler {
 
-  private static final LogService LOG =
+  private StoreConf conf;
+  private Configuration hadoopConf;
+
+  public RequestHandler(StoreConf conf, Configuration hadoopConf) {
+    this.conf = conf;
+    this.hadoopConf = hadoopConf;
+  }
+
+  private static final LogService LOGGER =
       LogServiceFactory.getLogService(RequestHandler.class.getName());
 
   QueryResponse handleSearch(QueryRequest request) {
     try {
-      LOG.info(String.format("[QueryId:%d] receive search request", 
request.getRequestId()));
+      LOGGER.info(String.format("[QueryId:%d] receive search request", 
request.getRequestId()));
       List<CarbonRow> rows = handleRequest(request);
-      LOG.info(String.format("[QueryId:%d] sending success response", 
request.getRequestId()));
+      LOGGER.info(String.format("[QueryId:%d] sending success response", 
request.getRequestId()));
       return createSuccessResponse(request, rows);
     } catch (IOException e) {
-      LOG.error(e);
-      LOG.info(String.format("[QueryId:%d] sending failure response", 
request.getRequestId()));
+      LOGGER.error(e);
+      LOGGER.info(String.format("[QueryId:%d] sending failure response", 
request.getRequestId()));
       return createFailureResponse(request, e);
     }
   }
 
   ShutdownResponse handleShutdown(ShutdownRequest request) {
-    LOG.info("Shutting down worker...");
+    LOGGER.info("Shutting down worker...");
     SearchModeDetailQueryExecutor.shutdownThreadPool();
     SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
-    LOG.info("Worker shut down");
+    LOGGER.info("Worker shut down");
     return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
   }
 
@@ -85,13 +114,13 @@ class RequestHandler {
     CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
     QueryModel queryModel = createQueryModel(table, request);
 
-    LOG.info(String.format("[QueryId:%d] %s, number of block: %d",
-        request.getRequestId(), queryModel.toString(), 
mbSplit.getAllSplits().size()));
+    LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", 
request.getRequestId(),
+        queryModel.toString(), mbSplit.getAllSplits().size()));
 
     // read all rows by the reader
     List<CarbonRow> rows = new LinkedList<>();
-    try (CarbonRecordReader<CarbonRow> reader =
-        new IndexedRecordReader(request.getRequestId(), table, queryModel)) {
+    try (CarbonRecordReader<CarbonRow> reader = new 
IndexedRecordReader(request.getRequestId(),
+        table, queryModel)) {
       reader.initialize(mbSplit, null);
 
       // loop to read required number of rows.
@@ -104,22 +133,18 @@ class RequestHandler {
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
-    LOG.info(String.format("[QueryId:%d] scan completed, return %d rows",
+    LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
         request.getRequestId(), rows.size()));
     return rows;
   }
 
-
-
   private QueryModel createQueryModel(CarbonTable table, QueryRequest request) 
{
     String[] projectColumns = request.getProjectColumns();
     Expression filter = null;
     if (request.getFilterExpression() != null) {
       filter = request.getFilterExpression();
     }
-    return new QueryModelBuilder(table)
-        .projectColumns(projectColumns)
-        .filterExpression(filter)
+    return new 
QueryModelBuilder(table).projectColumns(projectColumns).filterExpression(filter)
         .build();
   }
 
@@ -144,4 +169,50 @@ class RequestHandler {
     return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), 
"", output);
   }
 
+  public BaseResponse handleLoadData(LoadDataRequest request) {
+    DataLoadExecutor executor = null;
+    try {
+      CarbonLoadModel model = request.getModel();
+
+      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+      CarbonInputFormatUtil.createJobTrackerID(new Date());
+      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
+      Configuration configuration = new Configuration(hadoopConf);
+      StoreUtil.configureCSVInputFormat(configuration, model);
+      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
+      // Set up the attempt context required to use in the output committer.
+      TaskAttemptContext hadoopAttemptContext =
+          new TaskAttemptContextImpl(configuration, taskAttemptId);
+
+      CSVInputFormat format = new CSVInputFormat();
+      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
+
+      CarbonIterator<Object[]>[] readerIterators = new 
CSVRecordReaderIterator[splits.size()];
+      for (int index = 0; index < splits.size(); index++) {
+        readerIterators[index] = new CSVRecordReaderIterator(
+            format.createRecordReader(splits.get(index), 
hadoopAttemptContext), splits.get(index),
+            hadoopAttemptContext);
+      }
+
+      executor = new DataLoadExecutor();
+      executor.execute(model, conf.storeTempLocation(), readerIterators);
+
+      return new BaseResponse(Status.SUCCESS.ordinal(), "");
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle load data");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (InterruptedException e) {
+      LOGGER.error(e, "Interrupted handle load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed to execute load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } finally {
+      if (executor != null) {
+        executor.close();
+        
StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+      }
+    }
+  }
 }

Reply via email to