[CARBONDATA-2705][CarbonStore] CarbonStore Java API and Implementation

Support two implementations:
1.LocalCarbonStore for usage in local mode
2.DistributedCarbonStore leveraging multiple server (Master and Workers) via RPC

This closes #2473


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/044a995a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/044a995a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/044a995a

Branch: refs/heads/carbonstore
Commit: 044a995aa90220715daef83d6628aacea5dd5106
Parents: e4bfb57
Author: Jacky Li <jacky.li...@qq.com>
Authored: Mon Jul 9 12:23:49 2018 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Wed Jul 11 16:07:32 2018 +0800

----------------------------------------------------------------------
 .../core/datastore/impl/FileFactory.java        |   5 +
 .../schema/table/TableSchemaBuilder.java        |   3 +-
 .../detailquery/SearchModeTestCase.scala        |   3 +-
 .../carbondata/store/SparkCarbonStore.scala     | 203 --------
 .../apache/carbondata/store/WorkerManager.scala |  75 +++
 .../org/apache/spark/sql/CarbonSession.scala    |  59 ++-
 .../carbondata/store/SparkCarbonStoreTest.scala |  86 ---
 .../processing/loading/DataLoadExecutor.java    |   6 +-
 .../processing/util/CarbonLoaderUtil.java       |   2 +-
 store/core/pom.xml                              |   5 +
 .../carbondata/store/CarbonRowReadSupport.java  |  53 --
 .../apache/carbondata/store/CarbonStore.java    |  68 ---
 .../carbondata/store/LocalCarbonStore.java      | 130 -----
 .../carbondata/store/MetaCachedCarbonStore.java |  59 ---
 .../carbondata/store/api/CarbonStore.java       |  32 ++
 .../store/api/CarbonStoreFactory.java           |  93 ++++
 .../apache/carbondata/store/api/DataStore.java  |  51 ++
 .../apache/carbondata/store/api/MetaStore.java  |  50 ++
 .../apache/carbondata/store/api/SqlStore.java   |  34 ++
 .../carbondata/store/api/conf/StoreConf.java    | 191 +++++++
 .../store/api/descriptor/LoadDescriptor.java    | 114 ++++
 .../store/api/descriptor/SelectDescriptor.java  | 111 ++++
 .../store/api/descriptor/TableDescriptor.java   | 174 +++++++
 .../store/api/descriptor/TableIdentifier.java   |  37 ++
 .../exception/ExecutionTimeoutException.java    |  22 +
 .../store/api/exception/SchedulerException.java |  26 +
 .../store/api/exception/StoreException.java     |  33 ++
 .../apache/carbondata/store/conf/StoreConf.java | 185 -------
 .../exception/ExecutionTimeoutException.java    |  22 -
 .../store/exception/StoreException.java         |  29 --
 .../store/exception/WorkerTooBusyException.java |  26 -
 .../carbondata/store/impl/CarbonStoreBase.java  | 177 +++++++
 .../store/impl/DistributedCarbonStore.java      | 232 +++++++++
 .../store/impl/IndexedRecordReader.java         | 183 +++++++
 .../carbondata/store/impl/LocalCarbonStore.java | 164 ++++++
 .../carbondata/store/impl/MetaProcessor.java    | 170 ++++++
 .../store/impl/SegmentTxnManager.java           | 121 +++++
 .../apache/carbondata/store/impl/Status.java    |  28 +
 .../carbondata/store/impl/master/Master.java    | 161 ++++++
 .../store/impl/master/RegistryServiceImpl.java  |  53 ++
 .../store/impl/master/Schedulable.java          |  76 +++
 .../carbondata/store/impl/master/Scheduler.java | 137 +++++
 .../store/impl/rpc/RegistryService.java         |  32 ++
 .../store/impl/rpc/ServiceFactory.java          |  43 ++
 .../carbondata/store/impl/rpc/StoreService.java |  40 ++
 .../store/impl/rpc/model/BaseResponse.java      |  69 +++
 .../store/impl/rpc/model/LoadDataRequest.java   |  60 +++
 .../store/impl/rpc/model/QueryResponse.java     |  73 +++
 .../impl/rpc/model/RegisterWorkerRequest.java   |  73 +++
 .../impl/rpc/model/RegisterWorkerResponse.java  |  54 ++
 .../carbondata/store/impl/rpc/model/Scan.java   | 108 ++++
 .../store/impl/rpc/model/ShutdownRequest.java   |  53 ++
 .../store/impl/rpc/model/ShutdownResponse.java  |  61 +++
 .../store/impl/worker/RequestHandler.java       | 166 ++++++
 .../store/impl/worker/StoreServiceImpl.java     |  77 +++
 .../carbondata/store/impl/worker/Worker.java    | 166 ++++++
 .../apache/carbondata/store/master/Master.java  | 522 -------------------
 .../carbondata/store/rpc/RegistryService.java   |  32 --
 .../carbondata/store/rpc/ServiceFactory.java    |  43 --
 .../carbondata/store/rpc/StoreService.java      |  40 --
 .../store/rpc/impl/IndexedRecordReader.java     | 183 -------
 .../store/rpc/impl/RegistryServiceImpl.java     |  54 --
 .../store/rpc/impl/RequestHandler.java          | 218 --------
 .../carbondata/store/rpc/impl/Status.java       |  28 -
 .../store/rpc/impl/StoreServiceImpl.java        |  78 ---
 .../store/rpc/model/BaseResponse.java           |  69 ---
 .../store/rpc/model/LoadDataRequest.java        |  60 ---
 .../store/rpc/model/QueryRequest.java           | 108 ----
 .../store/rpc/model/QueryResponse.java          |  73 ---
 .../store/rpc/model/RegisterWorkerRequest.java  |  73 ---
 .../store/rpc/model/RegisterWorkerResponse.java |  54 --
 .../store/rpc/model/ShutdownRequest.java        |  53 --
 .../store/rpc/model/ShutdownResponse.java       |  61 ---
 .../carbondata/store/scheduler/Schedulable.java |  74 ---
 .../carbondata/store/scheduler/Scheduler.java   | 136 -----
 .../apache/carbondata/store/util/StoreUtil.java |   4 +-
 .../apache/carbondata/store/worker/Worker.java  | 166 ------
 .../store/DistributedCarbonStoreTest.java       | 142 +++++
 .../carbondata/store/LocalCarbonStoreTest.java  | 111 +++-
 .../org/apache/carbondata/store/TestUtil.java   | 101 +---
 store/core/src/test/resources/data1.csv         |  11 +
 store/horizon/pom.xml                           |  10 -
 store/horizon/src/main/anltr/Expression.g4      | 163 ------
 store/horizon/src/main/antlr/Expression.g4      | 163 ++++++
 .../apache/carbondata/horizon/antlr/Parser.java |  42 ++
 .../horizon/antlr/gen/ExpressionLexer.java      |   3 -
 .../horizon/antlr/gen/ExpressionParser.java     |   3 -
 .../horizon/rest/client/HorizonClient.java      |  77 +++
 .../rest/client/impl/SimpleHorizonClient.java   |  87 ++++
 .../horizon/rest/controller/Horizon.java        |   6 +-
 .../rest/controller/HorizonController.java      |  76 ++-
 .../rest/model/descriptor/LoadDescriptor.java   |  81 ---
 .../rest/model/descriptor/SelectDescriptor.java |  88 ----
 .../rest/model/descriptor/TableDescriptor.java  |  90 ----
 .../rest/model/validate/RequestValidator.java   |  15 +-
 .../rest/model/view/CreateTableRequest.java     |  13 +-
 .../rest/model/view/DropTableRequest.java       |  45 ++
 .../horizon/rest/model/view/LoadRequest.java    |   8 +-
 .../horizon/rest/model/view/Request.java        |  30 ++
 .../horizon/rest/model/view/Response.java       |  33 ++
 .../horizon/rest/model/view/SelectRequest.java  |  20 +-
 .../horizon/rest/model/view/SelectResponse.java |  18 +-
 .../horizon/rest/service/HorizonService.java    | 162 ------
 .../carbondata/horizon/FilterParseTest.java     |  18 +-
 .../apache/carbondata/horizon/HorizonTest.java  | 165 +++---
 store/horizon/src/test/resources/data1.csv      |  11 -
 .../sdk/file/CarbonWriterBuilder.java           |  11 +-
 .../org/apache/carbondata/sdk/file/Field.java   |  65 +--
 108 files changed, 4624 insertions(+), 3937 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 5c46bcf..bb5d449 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -220,6 +220,11 @@ public final class FileFactory {
       final FsPermission permission) throws IOException {
     return getCarbonFile(filePath).createNewFile(filePath, fileType, doAs, 
permission);
   }
+
+  public static boolean deleteFile(String filePath) throws IOException {
+    return deleteFile(filePath, getFileType(filePath));
+  }
+
   public static boolean deleteFile(String filePath, FileType fileType) throws 
IOException {
     return getCarbonFile(filePath).deleteFile(filePath, fileType);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index e4b072b..545e4ba 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -124,8 +124,9 @@ public class TableSchemaBuilder {
     return schema;
   }
 
-  public void setSortColumns(List<ColumnSchema> sortColumns) {
+  public TableSchemaBuilder setSortColumns(List<ColumnSchema> sortColumns) {
     this.sortColumns = sortColumns;
+    return this;
   }
 
   public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, 
boolean isSortColumn) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index af9e50f..0b9b6ba 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.detailquery
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonSession, Row, SaveMode}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, Ignore}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -28,7 +28,6 @@ import org.apache.carbondata.spark.util.DataGenerator
 /**
  * Test Suite for search mode
  */
-
 class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
   val numRows = 500 * 1000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
deleted file mode 100644
index a4124a2..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import java.io.IOException
-import java.net.InetAddress
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{CarbonInputMetrics, SparkConf, SparkEnv}
-import org.apache.spark.sql.CarbonSession._
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.store.conf.StoreConf
-import org.apache.carbondata.store.master.Master
-import org.apache.carbondata.store.worker.Worker
-
-/**
- * A CarbonStore implementation that uses Spark as underlying compute engine
- * with CarbonData query optimization capability
- */
-@InterfaceAudience.Internal
-class SparkCarbonStore extends MetaCachedCarbonStore {
-  private var session: SparkSession = _
-  private var master: Master = _
-  private final val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * Initialize SparkCarbonStore
-   * @param storeName store name
-   * @param storeLocation location to store data
-   */
-  def this(storeName: String, storeLocation: String) = {
-    this()
-    val sparkConf = new SparkConf(loadDefaults = true)
-    session = SparkSession.builder
-      .config(sparkConf)
-      .appName("SparkCarbonStore-" + storeName)
-      .config("spark.sql.warehouse.dir", storeLocation)
-      .getOrCreateCarbonSession()
-  }
-
-  def this(sparkSession: SparkSession) = {
-    this()
-    session = sparkSession
-  }
-
-  @throws[IOException]
-  override def scan(
-      path: String,
-      projectColumns: Array[String]): java.util.Iterator[CarbonRow] = {
-    require(path != null)
-    require(projectColumns != null)
-    scan(path, projectColumns, null)
-  }
-
-  @throws[IOException]
-  override def scan(
-      path: String,
-      projectColumns: Array[String],
-      filter: Expression): java.util.Iterator[CarbonRow] = {
-    require(path != null)
-    require(projectColumns != null)
-    val table = getTable(path)
-    val rdd = new CarbonScanRDD[CarbonRow](
-      spark = session,
-      columnProjection = new CarbonProjection(projectColumns),
-      filterExpression = filter,
-      identifier = table.getAbsoluteTableIdentifier,
-      serializedTableInfo = table.getTableInfo.serialize,
-      tableInfo = table.getTableInfo,
-      inputMetricsStats = new CarbonInputMetrics,
-      partitionNames = null,
-      dataTypeConverterClz = null,
-      readSupportClz = classOf[CarbonRowReadSupport])
-    rdd.collect
-      .iterator
-      .asJava
-  }
-
-  @throws[IOException]
-  override def sql(sqlString: String): java.util.Iterator[CarbonRow] = {
-    val df = session.sql(sqlString)
-    df.rdd
-      .map(row => new CarbonRow(row.toSeq.toArray.asInstanceOf[Array[Object]]))
-      .collect()
-      .iterator
-      .asJava
-  }
-
-  def startSearchMode(): Unit = {
-    LOG.info("Starting search mode master")
-    val conf = new StoreConf()
-    conf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress)
-    conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
-    conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
-    master = Master.getInstance(conf)
-    master.startService()
-    startAllWorkers()
-  }
-
-  def stopSearchMode(): Unit = {
-    if (master != null) {
-      LOG.info("Shutting down all workers...")
-      try {
-        master.stopAllWorkers()
-        LOG.info("All workers are shut down")
-      } catch {
-        case e: Exception =>
-          LOG.error(s"failed to shutdown worker: ${ e.toString }")
-      }
-      LOG.info("Stopping master...")
-      master.stopService()
-      LOG.info("Master stopped")
-      master = null
-    }
-  }
-
-  /** search mode */
-  def search(
-      table: CarbonTable,
-      projectColumns: Array[String],
-      filter: Expression,
-      globalLimit: Long,
-      localLimit: Long): java.util.Iterator[CarbonRow] = {
-    if (master == null) {
-      throw new IllegalStateException("search mode is not started")
-    }
-    master.search(table, projectColumns, filter, globalLimit, localLimit)
-      .iterator
-      .asJava
-  }
-
-  private def startAllWorkers(): Array[Int] = {
-    // TODO: how to ensure task is sent to every executor?
-    val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
-    val masterIp = InetAddress.getLocalHost.getHostAddress
-    val rows = session.sparkContext.parallelize(1 to numExecutors * 10, 
numExecutors)
-      .mapPartitions { f =>
-        // start worker
-        val conf = new StoreConf()
-        conf.conf(StoreConf.WORKER_HOST, 
InetAddress.getLocalHost.getHostAddress)
-        conf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort)
-        conf.conf(StoreConf.WORKER_CORE_NUM, 2)
-        conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
-        conf.conf(StoreConf.MASTER_HOST, masterIp)
-        conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
-
-        var storeLocation: String = null
-        val carbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", "false")
-        if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-          val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (null != storeLocations && storeLocations.nonEmpty) {
-            storeLocation = storeLocations.mkString(",")
-          }
-          if (storeLocation == null) {
-            storeLocation = System.getProperty("java.io.tmpdir")
-          }
-        } else {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-        conf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation)
-
-        val worker = new Worker(conf)
-        worker.start()
-        new Iterator[Int] {
-          override def hasNext: Boolean = false
-
-          override def next(): Int = 1
-        }
-      }.collect()
-    LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} 
" +
-             s"workers are started successfully")
-    rows
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
new file mode 100644
index 0000000..7fff2e5
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.net.InetAddress
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.store.api.CarbonStoreFactory
+import org.apache.carbondata.store.api.conf.StoreConf
+import org.apache.carbondata.store.impl.worker.Worker
+
+/**
+ * A CarbonStore implementation that uses Spark as underlying compute engine
+ * with CarbonData query optimization capability
+ */
+@InterfaceAudience.Internal
+object WorkerManager {
+  private final val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // TODO: how to ensure task is sent to every executor?
+  def startAllWorker(session: SparkSession, storeConf: StoreConf): Unit = {
+    val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
+    LOG.info("Starting search mode master")
+    val rows = session.sparkContext.parallelize(1 to numExecutors * 10, 
numExecutors)
+      .mapPartitions { f =>
+        // start worker
+        var storeLocation: String = null
+        val carbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false")
+        if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+          val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (null != storeLocations && storeLocations.nonEmpty) {
+            storeLocation = storeLocations.mkString(",")
+          }
+          if (storeLocation == null) {
+            storeLocation = System.getProperty("java.io.tmpdir")
+          }
+        } else {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+        storeConf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation)
+
+        val worker = new Worker(storeConf)
+        worker.start()
+        new Iterator[Int] {
+          override def hasNext: Boolean = false
+
+          override def next(): Int = 1
+        }
+      }.collect()
+    LOG.info(s"Tried to start $numExecutors workers, $rows workers started 
successfully")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index ee19503..cd939dc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql
 
 import java.io.File
+import java.net.InetAddress
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
@@ -41,7 +42,10 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.store.SparkCarbonStore
+import org.apache.carbondata.store.WorkerManager
+import org.apache.carbondata.store.api.{CarbonStore, CarbonStoreFactory}
+import org.apache.carbondata.store.api.conf.StoreConf
+import org.apache.carbondata.store.api.descriptor.{SelectDescriptor, 
TableIdentifier => CTableIdentifier}
 import org.apache.carbondata.streaming.CarbonStreamingQueryListener
 
 /**
@@ -120,7 +124,7 @@ class CarbonSession(@transient val sc: SparkContext,
     message(0).getString(0).contains(dataMapName)
   }
 
-  def isSearchModeEnabled: Boolean = carbonStore != null
+  def isSearchModeEnabled: Boolean = store != null
 
   /**
    * Run SparkSQL directly
@@ -192,24 +196,34 @@ class CarbonSession(@transient val sc: SparkContext,
     }
   }
 
-  @transient private var carbonStore: SparkCarbonStore = _
+  // variable that used in search mode
+  @transient private var store: CarbonStore = _
 
   def startSearchMode(): Unit = {
-    CarbonProperties.enableSearchMode(true)
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "false")
-    if (carbonStore == null) {
-      carbonStore = new SparkCarbonStore(this)
-      carbonStore.startSearchMode()
+    if (store == null) {
+      val storeConf = new StoreConf()
+      storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
+      storeConf.conf(StoreConf.MASTER_HOST, 
InetAddress.getLocalHost.getHostAddress)
+      storeConf.conf(StoreConf.MASTER_PORT, 
CarbonProperties.getSearchMasterPort)
+      storeConf.conf(StoreConf.WORKER_HOST, 
InetAddress.getLocalHost.getHostAddress)
+      storeConf.conf(StoreConf.WORKER_PORT, 
CarbonProperties.getSearchWorkerPort)
+      storeConf.conf(StoreConf.WORKER_CORE_NUM, 2)
+
+      store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf)
+      CarbonProperties.enableSearchMode(true)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
+      WorkerManager.startAllWorker(this, storeConf)
     }
   }
 
   def stopSearchMode(): Unit = {
-    CarbonProperties.enableSearchMode(false)
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
-    if (carbonStore != null) {
+    if (store != null) {
+      CarbonProperties.enableSearchMode(false)
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
       try {
-        carbonStore.stopSearchMode()
-        carbonStore = null
+        CarbonStoreFactory.removeDistributedStore("GlobalStore")
+        store = null
       } catch {
         case e: RuntimeException =>
           LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -226,16 +240,21 @@ class CarbonSession(@transient val sc: SparkContext,
       relation: LogicalRelation,
       maxRows: Option[Long] = None,
       localMaxRows: Option[Long] = None): DataFrame = {
-    val rows = carbonStore.search(
-        
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable,
-        columns.map(_.name).toArray,
-        if (expr != null) CarbonFilters.transformExpression(expr) else null,
-        maxRows.getOrElse(Long.MaxValue),
-        localMaxRows.getOrElse(Long.MaxValue))
+    val table = 
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+    val select = new SelectDescriptor(
+      new CTableIdentifier(table.getTableName, table.getDatabaseName),
+      columns.map(_.name).toArray,
+      if (expr != null) CarbonFilters.transformExpression(expr) else null,
+      localMaxRows.getOrElse(Long.MaxValue)
+    )
+    val rows = store.select(select).iterator()
     val output = new java.util.ArrayList[Row]()
-    while (rows.hasNext) {
+    val maxRowCount = maxRows.getOrElse(Long.MaxValue)
+    var rowCount = 0
+    while (rows.hasNext && rowCount < maxRowCount) {
       val row = rows.next()
       output.add(Row.fromSeq(row.getData))
+      rowCount = rowCount + 1
     }
     createDataFrame(output, logicalPlan.schema)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
deleted file mode 100644
index d389670..0000000
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/store/SparkCarbonStoreTest.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import org.apache.spark.sql.{CarbonEnv, Row}
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
-import org.apache.carbondata.core.scan.expression.{ColumnExpression, 
LiteralExpression}
-
-class SparkCarbonStoreTest extends QueryTest with BeforeAndAfterAll {
-
-  private var store: CarbonStore = _
-
-  override def beforeAll {
-    sql("DROP TABLE IF EXISTS t1")
-    sql("CREATE TABLE t1 (" +
-        "empno int, empname String, designation String, doj Timestamp, " +
-        "workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String," +
-        "projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp," +
-        "attendance int,utilization int,salary int)" +
-        "STORED BY 'org.apache.carbondata.format'")
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE t1 
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
-
-    store = new SparkCarbonStore("test", storeLocation)
-  }
-
-  test("test CarbonStore.get, compare projection result") {
-    val tablePath = CarbonEnv.getCarbonTable(None, 
"t1")(sqlContext.sparkSession).getTablePath
-    val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray)
-    val sparkResult: Array[Row] = sql("select empno, empname from 
t1").collect()
-    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
-      val carbonRow = rows.next()
-      assertResult(r.get(0))(carbonRow.getData()(0))
-      assertResult(r.get(1))(carbonRow.getData()(1))
-    }
-    assert(!rows.hasNext)
-  }
-
-  test("test CarbonStore.get, compare projection and filter result") {
-    val tablePath = CarbonEnv.getCarbonTable(None, 
"t1")(sqlContext.sparkSession).getTablePath
-    val filter = new EqualToExpression(
-      new ColumnExpression("empno", DataTypes.INT),
-      new LiteralExpression(10, DataTypes.INT))
-    val rows = store.scan(s"$tablePath", Seq("empno", "empname").toArray, 
filter)
-    val sparkResult: Array[Row] = sql("select empno, empname from t1 where 
empno = 10").collect()
-    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
-      val carbonRow = rows.next()
-      assertResult(r.get(0))(carbonRow.getData()(0))
-      assertResult(r.get(1))(carbonRow.getData()(1))
-    }
-    assert(!rows.hasNext)
-  }
-
-  test("test CarbonStore.sql") {
-    val rows = store.sql("select empno, empname from t1 where empno = 10")
-    val sparkResult: Array[Row] = sql("select empno, empname from t1 where 
empno = 10").collect()
-    sparkResult.zipWithIndex.foreach { case (r: Row, i: Int) =>
-      val carbonRow = rows.next()
-      assertResult(r.get(0))(carbonRow.getData()(0))
-      assertResult(r.get(1))(carbonRow.getData()(1))
-    }
-    assert(!rows.hasNext)
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS t1")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
index fc5c41f..65d72cc 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
@@ -39,11 +39,11 @@ public class DataLoadExecutor {
 
   private boolean isClosed;
 
-  public void execute(CarbonLoadModel loadModel, String[] storeLocation,
-      CarbonIterator<Object[]>[] inputIterators) throws Exception {
+  public void execute(CarbonLoadModel loadModel, String[] tempLocation,
+      CarbonIterator<Object[]>[] inputIterators) {
     try {
       loadProcessorStep =
-          new DataLoadProcessBuilder().build(loadModel, storeLocation, 
inputIterators);
+          new DataLoadProcessBuilder().build(loadModel, tempLocation, 
inputIterators);
       // 1. initialize
       loadProcessorStep.initialize();
       LOGGER.info("Data Loading is started for table " + 
loadModel.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 29ce2f4..0460d16 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -578,7 +578,7 @@ public final class CarbonLoaderUtil {
    *                       based on block location information
    * @param blockAssignmentStrategy strategy used to assign blocks
    * @param expectedMinSizePerNode expected minimum size per node
-   * @return a map that maps node to blocks
+   * @return a map that maps node (hostname) to blocks
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(
       List<Distributable> blockInfos, int noOfNodesInput, List<String> 
activeNodes,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
index 6b2703e..44d5ab1 100644
--- a/store/core/pom.xml
+++ b/store/core/pom.xml
@@ -26,6 +26,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr4-runtime</artifactId>
+      <version>4.7.1</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
 
b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
deleted file mode 100644
index bafbb9f..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import 
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-
-/**
- * ReadSupport that convert row object to CarbonRow
- */
-@InterfaceAudience.Internal
-public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> {
-  private CarbonReadSupport<Object[]> delegate;
-
-  public CarbonRowReadSupport() {
-    this.delegate = new DictionaryDecodeReadSupport<>();
-  }
-
-  @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable 
carbonTable)
-      throws IOException {
-    delegate.initialize(carbonColumns, carbonTable);
-  }
-
-  @Override public CarbonRow readRow(Object[] data) {
-    Object[] converted = delegate.readRow(data);
-    return new CarbonRow(converted);
-  }
-
-  @Override public void close() {
-    delegate.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
deleted file mode 100644
index c6b2fb8..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.scan.expression.Expression;
-
-/**
- * User can use {@link CarbonStore} to query data
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface CarbonStore extends Closeable {
-
-  /**
-   * Scan query on the data in the table path
-   * @param path table path
-   * @param projectColumns column names to read
-   * @return rows
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> scan(
-      String path,
-      String[] projectColumns) throws IOException;
-
-  /**
-   * Scan query with filter, on the data in the table path
-   * @param path table path
-   * @param projectColumns column names to read
-   * @param filter filter condition, can be null
-   * @return rows that satisfy filter condition
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> scan(
-      String path,
-      String[] projectColumns,
-      Expression filter) throws IOException;
-
-  /**
-   * SQL query, table should be created before calling this function
-   * @param sqlString SQL statement
-   * @return rows
-   * @throws IOException if unable to read files in table path
-   */
-  Iterator<CarbonRow> sql(String sqlString) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
deleted file mode 100644
index daa1447..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A CarbonStore implementation that works locally, without other compute 
framework dependency.
- * It can be used to read data in local disk.
- *
- * Note that this class is experimental, it is not intended to be used in 
production.
- */
-@InterfaceAudience.Internal
-class LocalCarbonStore extends MetaCachedCarbonStore {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
-
-  @Override
-  public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws 
IOException {
-    return scan(path, projectColumns, null);
-  }
-
-  @Override public Iterator<CarbonRow> scan(String path, String[] 
projectColumns, Expression filter)
-      throws IOException {
-    Objects.requireNonNull(path);
-    Objects.requireNonNull(projectColumns);
-
-    CarbonTable table = getTable(path);
-    if (table.isStreamingSink() || table.isHivePartitionTable()) {
-      throw new UnsupportedOperationException("streaming and partition table 
is not supported");
-    }
-    // TODO: use InputFormat to prune data and read data
-
-    final CarbonTableInputFormat format = new CarbonTableInputFormat();
-    final Job job = new Job(new Configuration());
-    CarbonInputFormat.setTableInfo(job.getConfiguration(), 
table.getTableInfo());
-    CarbonInputFormat.setTablePath(job.getConfiguration(), 
table.getTablePath());
-    CarbonInputFormat.setTableName(job.getConfiguration(), 
table.getTableName());
-    CarbonInputFormat.setDatabaseName(job.getConfiguration(), 
table.getDatabaseName());
-    CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), 
CarbonRowReadSupport.class);
-    CarbonInputFormat
-        .setColumnProjection(job.getConfiguration(), new 
CarbonProjection(projectColumns));
-    if (filter != null) {
-      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
-    }
-
-    final List<InputSplit> splits =
-        format.getSplits(new JobContextImpl(job.getConfiguration(), new 
JobID()));
-
-    List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
-
-    List<CarbonRow> rows = new ArrayList<>();
-
-    try {
-      for (InputSplit split : splits) {
-        TaskAttemptContextImpl attempt =
-            new TaskAttemptContextImpl(job.getConfiguration(), new 
TaskAttemptID());
-        RecordReader reader = format.createRecordReader(split, attempt);
-        reader.initialize(split, attempt);
-        readers.add(reader);
-      }
-
-      for (RecordReader<Void, Object> reader : readers) {
-        while (reader.nextKeyValue()) {
-          rows.add((CarbonRow) reader.getCurrentValue());
-        }
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOGGER.error(e);
-        }
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } finally {
-      for (RecordReader<Void, Object> reader : readers) {
-        try {
-          reader.close();
-        } catch (IOException e) {
-          LOGGER.error(e);
-        }
-      }
-    }
-    return rows.iterator();
-  }
-
-  @Override
-  public Iterator<CarbonRow> sql(String sqlString) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
 
b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
deleted file mode 100644
index e43f750..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * A CarbonStore base class that caches CarbonTable object
- */
-@InterfaceAudience.Internal
-abstract class MetaCachedCarbonStore implements CarbonStore {
-
-  // mapping of table path to CarbonTable object
-  private Map<String, CarbonTable> cache = new HashMap<>();
-
-  CarbonTable getTable(String path) throws IOException {
-    if (cache.containsKey(path)) {
-      return cache.get(path);
-    }
-    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .readSchemaFile(CarbonTablePath.getSchemaFilePath(path));
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    TableInfo tableInfo1 = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
-    tableInfo1.setTablePath(path);
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1);
-    cache.put(path, table);
-    return table;
-  }
-
-  @Override
-  public void close() throws IOException {
-    cache.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
new file mode 100644
index 0000000..3525389
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api;
+
+import java.io.Closeable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * Public Interface of CarbonStore
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends MetaStore, DataStore, Closeable {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
new file mode 100644
index 0000000..76ef450
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+public class CarbonStoreFactory {
+  private static Map<String, CarbonStore> distributedStores = new 
ConcurrentHashMap<>();
+  private static Map<String, CarbonStore> localStores = new 
ConcurrentHashMap<>();
+
+  private CarbonStoreFactory() {
+  }
+
+  public static CarbonStore getDistributedStore(String storeName, StoreConf 
storeConf)
+      throws StoreException {
+    if (distributedStores.containsKey(storeName)) {
+      return distributedStores.get(storeName);
+    }
+
+    // create a new instance
+    try {
+      String className = 
"org.apache.carbondata.store.impl.DistributedCarbonStore";
+      CarbonStore store = createCarbonStore(storeConf, className);
+      distributedStores.put(storeName, store);
+      return store;
+    } catch (ClassNotFoundException | IllegalAccessException | 
InvocationTargetException |
+        InstantiationException e) {
+      throw new StoreException(e);
+    }
+  }
+
+  public static void removeDistributedStore(String storeName) throws 
IOException {
+    if (distributedStores.containsKey(storeName)) {
+      distributedStores.get(storeName).close();
+      distributedStores.remove(storeName);
+    }
+  }
+
+  public static CarbonStore getLocalStore(String storeName, StoreConf 
storeConf)
+      throws StoreException {
+    if (localStores.containsKey(storeName)) {
+      return localStores.get(storeName);
+    }
+
+    // create a new instance
+    try {
+      String className = "org.apache.carbondata.store.impl.LocalCarbonStore";
+      CarbonStore store = createCarbonStore(storeConf, className);
+      localStores.put(storeName, store);
+      return store;
+    } catch (ClassNotFoundException | IllegalAccessException | 
InvocationTargetException |
+        InstantiationException e) {
+      throw new StoreException(e);
+    }
+  }
+
+  public static void removeLocalStore(String storeName) throws IOException {
+    if (localStores.containsKey(storeName)) {
+      localStores.get(storeName).close();
+      localStores.remove(storeName);
+    }
+  }
+
+  private static CarbonStore createCarbonStore(StoreConf storeConf, String 
className)
+      throws ClassNotFoundException, InstantiationException, 
IllegalAccessException,
+      InvocationTargetException {
+    Constructor[] constructor = 
Class.forName(className).getDeclaredConstructors();
+    constructor[0].setAccessible(true);
+    return (CarbonStore) constructor[0].newInstance(storeConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
new file mode 100644
index 0000000..d35c133
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+/**
+ * Public interface to write and read data in CarbonStore
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface DataStore {
+
+  /**
+   * Load data into a Table
+   * @param load descriptor for load operation
+   * @throws IOException if network or disk IO error occurs
+   */
+  void loadData(LoadDescriptor load) throws IOException, StoreException;
+
+  /**
+   * Scan a Table and return matched rows
+   * @param select descriptor for scan operation, including required column, 
filter, etc
+   * @return matched rows
+   * @throws IOException if network or disk IO error occurs
+   */
+  List<CarbonRow> select(SelectDescriptor select) throws IOException, 
StoreException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
new file mode 100644
index 0000000..dea6873
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+/**
+ * Public interface to manage table in CarbonStore
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface MetaStore {
+  /**
+   * Create a Table
+   * @param table descriptor for create table operation
+   * @throws IOException if network or disk IO error occurs
+   */
+  void createTable(TableDescriptor table) throws IOException, StoreException;
+
+  /**
+   * Drop a Table, and remove all data in it
+   * @param table table identifier
+   * @throws IOException if network or disk IO error occurs
+   */
+  void dropTable(TableIdentifier table) throws IOException;
+
+  CarbonTable getTable(TableIdentifier table) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java 
b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
new file mode 100644
index 0000000..3f52eed
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+
+public interface SqlStore {
+
+  /**
+   * Executor a SQL statement
+   * @param sqlString SQL statement
+   * @return matched rows
+   * @throws IOException if network or disk IO error occurs
+   */
+  List<CarbonRow> sql(String sqlString) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java 
b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
new file mode 100644
index 0000000..98a670a
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.conf;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class StoreConf implements Serializable, Writable {
+
+  public static final String SELECT_PROJECTION = "carbon.select.projection";
+  public static final String SELECT_FILTER = "carbon.select.filter";
+  public static final String SELECT_LIMIT = "carbon.select.limit";
+
+  public static final String SELECT_ID = "carbon.select.id";
+
+  public static final String WORKER_HOST = "carbon.worker.host";
+  public static final String WORKER_PORT = "carbon.worker.port";
+  public static final String WORKER_CORE_NUM = "carbon.worker.core.num";
+  public static final String MASTER_HOST = "carbon.master.host";
+  public static final String MASTER_PORT = "carbon.master.port";
+
+  public static final String STORE_TEMP_LOCATION = 
"carbon.store.temp.location";
+  public static final String STORE_LOCATION = "carbon.store.location";
+  public static final String STORE_NAME = "carbon.store.name";
+
+  private Map<String, String> conf = new HashMap<>();
+
+  public StoreConf() {
+  }
+
+  public StoreConf(String storeName, String storeLocation) {
+    conf.put(STORE_NAME, storeName);
+    conf.put(STORE_LOCATION, storeLocation);
+  }
+
+  public StoreConf(String confFilePath) {
+    load(confFilePath);
+  }
+
+  public StoreConf conf(String key, String value) {
+    conf.put(key, value);
+    return this;
+  }
+
+  public StoreConf conf(String key, int value) {
+    conf.put(key, "" + value);
+    return this;
+  }
+
+  public void load(String filePath) {
+    StoreUtil.loadProperties(filePath, this);
+  }
+
+  public void conf(StoreConf conf) {
+    this.conf.putAll(conf.conf);
+  }
+
+  public Object conf(String key) {
+    return conf.get(key);
+  }
+
+  public String[] projection() {
+    return stringArrayValue(SELECT_PROJECTION);
+  }
+
+  public String filter() {
+    return stringValue(SELECT_FILTER);
+  }
+
+  public int limit() {
+    return intValue(SELECT_LIMIT);
+  }
+
+  public String masterHost() {
+    return stringValue(MASTER_HOST);
+  }
+
+  public int masterPort() {
+    return intValue(MASTER_PORT);
+  }
+
+  public String workerHost() {
+    return stringValue(WORKER_HOST);
+  }
+
+  public int workerPort() {
+    return intValue(WORKER_PORT);
+  }
+
+  public int workerCoreNum() {
+    return intValue(WORKER_CORE_NUM);
+  }
+
+  public String storeLocation() {
+    return stringValue(STORE_LOCATION);
+  }
+
+  public String[] storeTempLocation() {
+    return stringArrayValue(STORE_TEMP_LOCATION);
+  }
+
+  public String selectId() {
+    return stringValue(SELECT_ID);
+  }
+
+  public Configuration newHadoopConf() {
+    Configuration hadoopConf = FileFactory.getConfiguration();
+    for (Map.Entry<String, String> entry : conf.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key != null && value != null && key.startsWith("carbon.hadoop.")) {
+        hadoopConf.set(key.substring("carbon.hadoop.".length()), value);
+      }
+    }
+    return hadoopConf;
+  }
+
+  private String stringValue(String key) {
+    Object obj = conf.get(key);
+    if (obj == null) {
+      return null;
+    }
+    return obj.toString();
+  }
+
+  private int intValue(String key) {
+    String value = conf.get(key);
+    if (value == null) {
+      return -1;
+    }
+    return Integer.parseInt(value);
+  }
+
+  private String[] stringArrayValue(String key) {
+    String value = conf.get(key);
+    if (value == null) {
+      return null;
+    }
+    return value.split(",", -1);
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    Set<Map.Entry<String, String>> entries = conf.entrySet();
+    WritableUtils.writeVInt(out, conf.size());
+    for (Map.Entry<String, String> entry : entries) {
+      WritableUtils.writeString(out, entry.getKey());
+      WritableUtils.writeString(out, entry.getValue());
+    }
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    if (conf == null) {
+      conf = new HashMap<>();
+    }
+
+    int size = WritableUtils.readVInt(in);
+    String key, value;
+    for (int i = 0; i < size; i++) {
+      key = WritableUtils.readString(in);
+      value = WritableUtils.readString(in);
+      conf.put(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
new file mode 100644
index 0000000..c3a4ff7
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.descriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class LoadDescriptor {
+
+  private TableIdentifier table;
+  private String inputPath;
+  private Map<String, String> options;
+  private boolean isOverwrite;
+
+  private LoadDescriptor() {
+  }
+
+  public LoadDescriptor(TableIdentifier table, String inputPath,
+      Map<String, String> options, boolean isOverwrite) {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(inputPath);
+    this.table = table;
+    this.inputPath = inputPath;
+    this.options = options;
+    this.isOverwrite = isOverwrite;
+  }
+
+  public TableIdentifier getTable() {
+    return table;
+  }
+
+  public void setTable(TableIdentifier table) {
+    this.table = table;
+  }
+
+  public String getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(String inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+
+  public boolean isOverwrite() {
+    return isOverwrite;
+  }
+
+  public void setOverwrite(boolean overwrite) {
+    isOverwrite = overwrite;
+  }
+
+  public static class Builder {
+    private LoadDescriptor load;
+    private Map<String, String> options;
+
+    private Builder() {
+      load = new LoadDescriptor();
+      options = new HashMap<>();
+    }
+
+    public Builder table(TableIdentifier tableIdentifier) {
+      load.setTable(tableIdentifier);
+      return this;
+    }
+
+    public Builder overwrite(boolean isOverwrite) {
+      load.setOverwrite(isOverwrite);
+      return this;
+    }
+
+    public Builder inputPath(String inputPath) {
+      load.setInputPath(inputPath);
+      return this;
+    }
+
+    public Builder options(String key, String value) {
+      options.put(key, value);
+      return this;
+    }
+
+    public LoadDescriptor create() {
+      load.setOptions(options);
+      return load;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
new file mode 100644
index 0000000..c3627a9
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.descriptor;
+
+import java.util.Objects;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+
+public class SelectDescriptor {
+
+  private TableIdentifier table;
+  private String[] projection;
+  private Expression filter;
+  private long limit;
+
+  private SelectDescriptor() {
+  }
+
+  public SelectDescriptor(TableIdentifier table, String[] projection,
+      Expression filter, long limit) {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(projection);
+    this.table = table;
+    this.projection = projection;
+    this.filter = filter;
+    this.limit = limit;
+  }
+
+  public TableIdentifier getTable() {
+    return table;
+  }
+
+  public void setTable(TableIdentifier table) {
+    this.table = table;
+  }
+
+  public String[] getProjection() {
+    return projection;
+  }
+
+  public void setProjection(String[] projection) {
+    this.projection = projection;
+  }
+
+  public Expression getFilter() {
+    return filter;
+  }
+
+  public void setFilter(Expression filter) {
+    this.filter = filter;
+  }
+
+  public long getLimit() {
+    return limit;
+  }
+
+  public void setLimit(long limit) {
+    this.limit = limit;
+  }
+
+  public static class Builder {
+    private SelectDescriptor select;
+
+    private Builder() {
+      select = new SelectDescriptor();
+    }
+
+    public Builder table(TableIdentifier tableIdentifier) {
+      select.setTable(tableIdentifier);
+      return this;
+    }
+
+    public Builder select(String... columnNames) {
+      select.setProjection(columnNames);
+      return this;
+    }
+
+    public Builder filter(Expression filter) {
+      select.setFilter(filter);
+      return this;
+    }
+
+    public Builder limit(long limit) {
+      select.setLimit(limit);
+      return this;
+    }
+
+    public SelectDescriptor create() {
+      return select;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
new file mode 100644
index 0000000..2d677a8
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.descriptor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+public class TableDescriptor {
+
+  private boolean ifNotExists;
+  private TableIdentifier table;
+  private String tablePath;
+  private Schema schema;
+  private Map<String, String> properties;
+  private String comment;
+
+  private TableDescriptor() {
+  }
+
+  public TableDescriptor(TableIdentifier table, Schema schema,
+      Map<String, String> properties, String tablePath, String comment, 
boolean ifNotExists) {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(schema);
+    this.table = table;
+    this.ifNotExists = ifNotExists;
+    this.schema = schema;
+    this.properties = properties;
+    this.tablePath = tablePath;
+    this.comment = comment;
+  }
+
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
+  public void setIfNotExists(boolean ifNotExists) {
+    this.ifNotExists = ifNotExists;
+  }
+
+  public TableIdentifier getTable() {
+    return table;
+  }
+
+  public void setTable(TableIdentifier table) {
+    this.table = table;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public void setSchema(Schema schema) {
+    this.schema = schema;
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public void setProperties(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
+  public String getComment() {
+    return comment;
+  }
+
+  public void setComment(String comment) {
+    this.comment = comment;
+  }
+
+  public void setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public String getTablePath() {
+    return tablePath;
+  }
+
+  public static class Builder {
+
+    private TableDescriptor table;
+    private List<Field> fields;
+    private Map<String, String> tblProperties;
+
+    private Builder() {
+      table = new TableDescriptor();
+      fields = new ArrayList<>();
+      tblProperties = new HashMap<>();
+    }
+
+    public Builder ifNotExists() {
+      table.setIfNotExists(true);
+      return this;
+    }
+
+    public Builder table(TableIdentifier tableId) {
+      table.setTable(tableId);
+      return this;
+    }
+
+    public Builder tablePath(String tablePath) {
+      table.setTablePath(tablePath);
+      return this;
+    }
+
+    public Builder comment(String tableComment) {
+      table.setComment(tableComment);
+      return this;
+    }
+
+    public Builder column(String name, DataType dataType) {
+      fields.add(new Field(name, dataType));
+      return this;
+    }
+
+    public Builder column(String name, DataType dataType, String comment) {
+      Field field = new Field(name, dataType);
+      field.setColumnComment(comment);
+      fields.add(field);
+      return this;
+    }
+
+    public Builder column(String name, DataType dataType, int precision, int 
scale, String comment)
+    {
+      Field field = new Field(name, dataType);
+      field.setColumnComment(comment);
+      field.setScale(scale);
+      field.setPrecision(precision);
+      fields.add(field);
+      return this;
+    }
+
+    public Builder tblProperties(String key, String value) {
+      tblProperties.put(key, value);
+      return this;
+    }
+
+    public TableDescriptor create() {
+      Field[] fieldArray = new Field[fields.size()];
+      fieldArray = fields.toArray(fieldArray);
+      Schema schema = new Schema(fieldArray);
+      table.setSchema(schema);
+      table.setProperties(tblProperties);
+      return table;
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
new file mode 100644
index 0000000..ab8edf8
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.descriptor;
+
+public class TableIdentifier {
+  private String tableName;
+  private String databaseName;
+
+  public TableIdentifier(String tableName, String databaseName) {
+    this.tableName = tableName;
+    this.databaseName = databaseName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
new file mode 100644
index 0000000..728837d
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.exception;
+
+public class ExecutionTimeoutException extends RuntimeException {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
new file mode 100644
index 0000000..28b8a50
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.exception;
+
+public class SchedulerException extends RuntimeException {
+
+  public SchedulerException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
new file mode 100644
index 0000000..315a09b
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.api.exception;
+
+public class StoreException extends Exception {
+
+  public StoreException() {
+    super();
+  }
+
+  public StoreException(String message) {
+    super(message);
+  }
+
+  public StoreException(Exception e) {
+    super(e);
+  }
+}

Reply via email to