http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
new file mode 100644
index 0000000..ac3b199
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.rpc.StoreService;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.rpc.model.ShutdownResponse;
+import org.apache.carbondata.store.worker.Worker;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class StoreServiceImpl implements StoreService {
+
+  private Worker worker;
+  RequestHandler handler;
+
+  public StoreServiceImpl(Worker worker) {
+    this.worker = worker;
+    this.handler = new RequestHandler(worker.getConf(), 
worker.getHadoopConf());
+  }
+
+  @Override
+  public BaseResponse loadData(LoadDataRequest request) {
+    return handler.handleLoadData(request);
+  }
+
+  @Override
+  public QueryResponse query(QueryRequest request) {
+    return handler.handleSearch(request);
+  }
+
+  @Override
+  public ShutdownResponse shutdown(ShutdownRequest request) {
+    return handler.handleShutdown(request);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+
+  public Worker getWorker() {
+    return worker;
+  }
+
+  public void setWorker(Worker worker) {
+    this.worker = worker;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
new file mode 100644
index 0000000..d826b32
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class BaseResponse implements Serializable, Writable {
+  private int status;
+  private String message;
+
+  public BaseResponse() {
+  }
+
+  public BaseResponse(int status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public void setStatus(int status) {
+    this.status = status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(status);
+    out.writeUTF(message);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    message = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
new file mode 100644
index 0000000..e79fad2
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class LoadDataRequest implements Serializable, Writable {
+
+  private CarbonLoadModel model;
+
+  public LoadDataRequest() {
+  }
+
+  public LoadDataRequest(CarbonLoadModel model) {
+    this.model = model;
+  }
+
+  public CarbonLoadModel getModel() {
+    return model;
+  }
+
+  public void setModel(CarbonLoadModel model) {
+    this.model = model;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model));
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] bytes = WritableUtils.readCompressedByteArray(in);
+    model = (CarbonLoadModel) StoreUtil.deserialize(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
index 033f1a5..7ad9210 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
@@ -29,19 +29,17 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 @InterfaceAudience.Internal
-public class QueryResponse implements Serializable, Writable {
+public class QueryResponse extends BaseResponse implements Serializable, 
Writable {
   private int queryId;
-  private int status;
-  private String message;
   private Object[][] rows;
 
   public QueryResponse() {
+    super();
   }
 
   public QueryResponse(int queryId, int status, String message, Object[][] 
rows) {
+    super(status, message);
     this.queryId = queryId;
-    this.status = status;
-    this.message = message;
     this.rows = rows;
   }
 
@@ -49,13 +47,6 @@ public class QueryResponse implements Serializable, Writable 
{
     return queryId;
   }
 
-  public int getStatus() {
-    return status;
-  }
-
-  public String getMessage() {
-    return message;
-  }
 
   public Object[][] getRows() {
     return rows;
@@ -63,17 +54,15 @@ public class QueryResponse implements Serializable, 
Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
+    super.write(out);
     out.writeInt(queryId);
-    out.writeInt(status);
-    out.writeUTF(message);
     WritableUtils.writeCompressedByteArray(out, 
ObjectSerializationUtil.serialize(rows));
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
     queryId = in.readInt();
-    status = in.readInt();
-    message = in.readUTF();
     try {
       rows = (Object[][])ObjectSerializationUtil.deserialize(
           WritableUtils.readCompressedByteArray(in));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
index 894948b..2131e3b 100644
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
@@ -66,4 +66,8 @@ public class RegisterWorkerRequest implements Serializable, 
Writable {
     port = in.readInt();
     cores = in.readInt();
   }
+
+  @Override public String toString() {
+    return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + 
", port=" + port + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
new file mode 100644
index 0000000..65d0786
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.scheduler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.store.rpc.StoreService;
+
+public class Schedulable {
+
+  private String id;
+  private String address;
+  private int port;
+  private int cores;
+  public StoreService service;
+  public AtomicInteger workload;
+
+  public Schedulable(String id, String address, int port, int cores, 
StoreService service) {
+    this.id = id;
+    this.address = address;
+    this.port = port;
+    this.cores = cores;
+    this.service = service;
+    this.workload = new AtomicInteger();
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public void setAddress(String address) {
+    this.address = address;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  int getCores() {
+    return cores;
+  }
+
+  @Override public String toString() {
+    return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + 
'\'' + ", port=" + port
+        + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
new file mode 100644
index 0000000..1b4cdde
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.scheduler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.store.exception.WorkerTooBusyException;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+
+/**
+ * [[Master]] uses Scheduler to pick a Worker to send request
+ */
+public class Scheduler {
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Scheduler.class.getName());
+
+  // mapping of worker IP address to worker instance
+  private Map<String, Schedulable> ipMapWorker = new HashMap<>();
+  private List<Schedulable> workers = new ArrayList<>();
+  private AtomicLong index = new AtomicLong(0);
+  private ExecutorService executors = Executors.newCachedThreadPool();
+
+  /**
+   * Pick a Worker according to the address and workload of the Worker
+   * Invoke the RPC and return Future result
+   */
+  public Future<QueryResponse> sendRequestAsync(final Schedulable worker,
+      final QueryRequest request) {
+
+    LOGGER.info("sending search request to worker " + worker);
+    worker.workload.incrementAndGet();
+    return executors.submit(new Callable<QueryResponse>() {
+      @Override public QueryResponse call() {
+        return worker.service.query(request);
+      }
+    });
+  }
+
+  public BaseResponse sendRequest(final Schedulable worker,
+      final LoadDataRequest request) {
+
+    LOGGER.info("sending load data request to worker " + worker);
+    worker.workload.incrementAndGet();
+    return worker.service.loadData(request);
+  }
+
+  public Schedulable pickWorker(String splitAddress) {
+    Schedulable worker = ipMapWorker.get(splitAddress);
+    // no local worker available, choose one worker randomly
+    if (worker == null) {
+      worker = pickNexWorker();
+    }
+    // check whether worker exceed max workload, if exceeded, pick next worker
+    int maxWorkload = 
CarbonProperties.getMaxWorkloadForWorker(worker.getCores());
+    int numTry = workers.size();
+    do {
+      if (worker.workload.get() >= maxWorkload) {
+        LOGGER.info("worker " + worker + " reach limit, re-select worker...");
+        worker = pickNexWorker();
+        numTry = numTry - 1;
+      } else {
+        numTry = -1;
+      }
+    } while (numTry > 0);
+    if (numTry == 0) {
+      // tried so many times and still not able to find Worker
+      throw new WorkerTooBusyException(
+          "All workers are busy, number of workers: " + workers.size() + ", 
workload limit:"
+              + maxWorkload);
+    }
+
+    return worker;
+  }
+
+  public Schedulable pickNexWorker() {
+    return workers.get((int) (index.get() % workers.size()));
+  }
+
+  /**
+   * A new searcher is trying to register, add it to the map and connect to 
this searcher
+   */
+  public void addWorker(Schedulable schedulable) {
+    workers.add(schedulable);
+    ipMapWorker.put(schedulable.getAddress(), schedulable);
+  }
+
+  public void removeWorker(String address) {
+    Schedulable schedulable = ipMapWorker.get(address);
+    if (schedulable != null) {
+      ipMapWorker.remove(address);
+      workers.remove(schedulable);
+    }
+  }
+
+  public List<Schedulable> getAllWorkers() {
+    return workers;
+  }
+
+  public List<String> getAllWorkerAddresses() {
+    List<String> addresses = new ArrayList<>(workers.size());
+    for (Schedulable worker : workers) {
+      addresses.add(worker.getAddress());
+    }
+    return addresses;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java 
b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
new file mode 100644
index 0000000..fba3413
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.store.conf.StoreConf;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.PropertyConfigurator;
+
+public class StoreUtil {
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(StoreUtil.class.getName());
+
+  public static void loadProperties(String filePath, StoreConf conf) {
+    InputStream input = null;
+    try {
+      input = new FileInputStream(filePath);
+      Properties prop = new Properties();
+      prop.load(input);
+      for (Map.Entry<Object, Object> entry : prop.entrySet()) {
+        conf.conf(entry.getKey().toString(), entry.getValue().toString());
+      }
+      LOGGER.audit("loaded properties: " + filePath);
+    } catch (IOException ex) {
+      LOGGER.error(ex, "Failed to load properties file " + filePath);
+    } finally {
+      if (input != null) {
+        try {
+          input.close();
+        } catch (IOException e) {
+          LOGGER.error(e);
+        }
+      }
+    }
+  }
+
+  public static void initLog4j(String propertiesFilePath) {
+    BasicConfigurator.configure();
+    PropertyConfigurator.configure(propertiesFilePath);
+  }
+
+  public static byte[] serialize(Object object) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(object);
+    } catch (IOException e) {
+      LOGGER.error(e);
+    }
+    return baos.toByteArray();
+  }
+
+  public static Object deserialize(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    try {
+      ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bytes));
+      return ois.readObject();
+    } catch (IOException e) {
+      LOGGER.error(e);
+    } catch (ClassNotFoundException e) {
+      LOGGER.error(e);
+    }
+    return null;
+  }
+
+  public static void configureCSVInputFormat(Configuration configuration,
+      CarbonLoadModel carbonLoadModel) {
+    CSVInputFormat.setCommentCharacter(configuration, 
carbonLoadModel.getCommentChar());
+    CSVInputFormat.setCSVDelimiter(configuration, 
carbonLoadModel.getCsvDelimiter());
+    CSVInputFormat.setSkipEmptyLine(configuration, 
carbonLoadModel.getSkipEmptyLine());
+    CSVInputFormat.setEscapeCharacter(configuration, 
carbonLoadModel.getEscapeChar());
+    CSVInputFormat.setMaxColumns(configuration, 
carbonLoadModel.getMaxColumns());
+    CSVInputFormat.setNumberOfColumns(configuration,
+        "" + carbonLoadModel.getCsvHeaderColumns().length);
+
+    CSVInputFormat.setHeaderExtractionEnabled(
+        configuration,
+        carbonLoadModel.getCsvHeader() == null ||
+            StringUtils.isEmpty(carbonLoadModel.getCsvHeader()));
+
+    CSVInputFormat.setQuoteCharacter(configuration, 
carbonLoadModel.getQuoteChar());
+
+    CSVInputFormat.setReadBufferSize(
+        configuration,
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
+  }
+
+  public static void clearUnsafeMemory(long taskId) {
+    UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId);
+    UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java 
b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
new file mode 100644
index 0000000..6fa2191
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.worker;
+
+import java.io.IOException;
+import java.net.BindException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.store.rpc.RegistryService;
+import org.apache.carbondata.store.rpc.ServiceFactory;
+import org.apache.carbondata.store.rpc.StoreService;
+import org.apache.carbondata.store.rpc.impl.StoreServiceImpl;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+public class Worker {
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Worker.class.getName());
+
+  private String id;
+  private RegistryService registry;
+  private StoreConf conf;
+  private Configuration hadoopConf;
+  private RPC.Server server;
+
+  public Worker(StoreConf conf) {
+    this.conf = conf;
+    this.hadoopConf = this.conf.newHadoopConf();
+  }
+
+  public void start() {
+    try {
+      startService();
+      registerToMaster();
+    } catch (IOException e) {
+      LOGGER.error(e, "worker failed to start");
+    }
+  }
+
+  private void startService() throws IOException {
+    BindException exception;
+    // we will try to create service at worse case 100 times
+    int numTry = 100;
+    int coreNum = conf.workerCoreNum();
+    String host = conf.workerHost();
+    int port = conf.workerPort();
+    StoreService queryService = new StoreServiceImpl(this);
+    do {
+      try {
+        server = new RPC.Builder(hadoopConf)
+            .setNumHandlers(coreNum)
+            .setBindAddress(host)
+            .setPort(port)
+            .setProtocol(StoreService.class)
+            .setInstance(queryService)
+            .build();
+        server.start();
+
+        numTry = 0;
+        exception = null;
+      } catch (BindException e) {
+        // port is occupied, increase the port number and try again
+        exception = e;
+        port = port + 1;
+        numTry = numTry - 1;
+      }
+    } while (numTry > 0);
+
+    if (exception != null) {
+      // we have tried many times, but still failed to find an available port
+      LOGGER.error(exception, "worker failed to start");
+      throw exception;
+    }
+
+    conf.conf(StoreConf.WORKER_PORT, port);
+    LOGGER.info("worker started on " + host + ":" + port + " successfully");
+
+  }
+
+  public void stop() {
+    try {
+      stopService();
+    } catch (InterruptedException e) {
+      LOGGER.error(e, "worker failed to start");
+    }
+  }
+
+  private void stopService() throws InterruptedException {
+    if (server != null) {
+      server.stop();
+      server.join();
+      server = null;
+    }
+  }
+
+  private void registerToMaster() throws IOException {
+
+    LOGGER.info("trying to register to master " + conf.masterHost() + ":" + 
conf.masterPort());
+    if (registry == null) {
+      registry = ServiceFactory.createRegistryService(conf.masterHost(), 
conf.masterPort());
+    }
+    RegisterWorkerRequest request =
+        new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), 
conf.workerCoreNum());
+    try {
+      RegisterWorkerResponse response = registry.registerWorker(request);
+      id = response.getWorkerId();
+    } catch (Throwable throwable) {
+      LOGGER.error(throwable, "worker failed to register");
+      throw new IOException(throwable);
+    }
+
+    LOGGER.info("worker " + id + " registered successfully");
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: Worker <log4j file> <properties file>");
+      return;
+    }
+
+    StoreUtil.initLog4j(args[0]);
+    Worker worker = new Worker(new StoreConf(args[1]));
+    worker.start();
+  }
+
+  public StoreConf getConf() {
+    return conf;
+  }
+
+  public void setConf(StoreConf conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public void setHadoopConf(Configuration hadoopConf) {
+    this.hadoopConf = hadoopConf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala 
b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
deleted file mode 100644
index 2109251..0000000
--- a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-import java.util.{List => JList, Map => JMap, Objects, Random, UUID}
-import java.util.concurrent.{ExecutionException, Future, TimeoutException, 
TimeUnit}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.ipc.RPC
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.Distributable
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.Expression
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.store.rpc.{RegistryService, ServiceFactory}
-import org.apache.carbondata.store.rpc.impl.{RegistryServiceImpl, Status}
-import org.apache.carbondata.store.rpc.model._
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-@InterfaceAudience.Internal
-private[store] class Master {
-  private val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  // worker host address map to EndpointRef
-
-  private val random = new Random
-
-  private var registryServer: RPC.Server = _
-
-  private val scheduler: Scheduler = new Scheduler
-
-  def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
-    val hadoopConf = FileFactory.getConfiguration
-    val builder = new RPC.Builder(hadoopConf)
-    builder
-      .setBindAddress(serverHost)
-      .setPort(serverPort)
-      .setProtocol(classOf[RegistryService])
-      .setInstance(new RegistryServiceImpl(this))
-      .build
-  }
-
-  /** start service and listen on port passed in constructor */
-  def startService(): Unit = {
-    if (registryServer == null) {
-      LOG.info("Start search mode master thread")
-      val isStarted: AtomicBoolean = new AtomicBoolean(false)
-      new Thread(new Runnable {
-        override def run(): Unit = {
-          val hostAddress = InetAddress.getLocalHost.getHostAddress
-          var port = CarbonProperties.getSearchMasterPort
-          var exception: BindException = null
-          var numTry = 100  // we will try to create service at worse case 100 
times
-          do {
-            try {
-              LOG.info(s"building registry-service on $hostAddress:$port")
-              registryServer = buildServer(hostAddress, port)
-              numTry = 0
-            } catch {
-              case e: BindException =>
-                // port is occupied, increase the port number and try again
-                exception = e
-                LOG.error(s"start registry-service failed: ${e.getMessage}")
-                port = port + 1
-                numTry = numTry - 1
-            }
-          } while (numTry > 0)
-          if (registryServer == null) {
-            // we have tried many times, but still failed to find an available 
port
-            throw exception
-          }
-          if (isStarted.compareAndSet(false, false)) {
-            synchronized {
-              isStarted.compareAndSet(false, true)
-            }
-          }
-          LOG.info("starting registry-service")
-          registryServer.start()
-          LOG.info("registry-service started")
-        }
-      }).start()
-      var count = 0
-      val countThreshold = 5000
-      while (isStarted.compareAndSet(false, false) && count < countThreshold) {
-        LOG.info(s"Waiting search mode master to start, retrying $count times")
-        Thread.sleep(10)
-        count = count + 1
-      }
-      if (count >= countThreshold) {
-        LOG.error(s"Search mode try $countThreshold times to start master but 
failed")
-        throw new RuntimeException(
-          s"Search mode try $countThreshold times to start master but failed")
-      } else {
-        LOG.info("Search mode master started")
-      }
-    } else {
-      LOG.info("Search mode master has already started")
-    }
-  }
-
-  def stopService(): Unit = {
-    if (registryServer != null) {
-      registryServer.stop()
-      registryServer.join()
-      registryServer = null
-    }
-  }
-
-  def stopAllWorkers(): Unit = {
-    scheduler.getAllWorkers.toSeq.foreach { case (address, schedulable) =>
-      val response = try {
-        schedulable.service.shutdown(new ShutdownRequest("user"))
-      } catch {
-        case throwable: Throwable =>
-          throw new IOException(throwable)
-      }
-      scheduler.removeWorker(address)
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to 
this searcher */
-  def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
-    LOG.info(s"Receive Register request from worker 
${request.getHostAddress}:${request.getPort} " +
-             s"with ${request.getCores} cores")
-    val workerId = UUID.randomUUID().toString
-    val workerAddress = request.getHostAddress
-    val workerPort = request.getPort
-    LOG.info(s"connecting to worker 
${request.getHostAddress}:${request.getPort}, " +
-             s"workerId $workerId")
-
-    val searchService = ServiceFactory.createSearchService(workerAddress, 
workerPort)
-    scheduler.addWorker(workerAddress,
-      new Schedulable(workerId, workerAddress, workerPort, request.getCores, 
searchService))
-    LOG.info(s"worker ${request.getHostAddress}:${request.getPort} registered")
-    new RegisterWorkerResponse(workerId)
-  }
-
-  /**
-   * Execute search by firing RPC call to worker, return the result rows
-   * @param table table to search
-   * @param columns projection column names
-   * @param filter filter expression
-   * @param globalLimit max number of rows required in Master
-   * @param localLimit max number of rows required in Worker
-   * @return
-   */
-  def search(table: CarbonTable, columns: Array[String], filter: Expression,
-      globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
-    Objects.requireNonNull(table)
-    Objects.requireNonNull(columns)
-    if (globalLimit < 0 || localLimit < 0) {
-      throw new IllegalArgumentException("limit should be positive")
-    }
-
-    val queryId = random.nextInt
-    var rowCount = 0
-    val output = new ArrayBuffer[CarbonRow]
-
-    def onSuccess(result: QueryResponse): Unit = {
-      // in case of RPC success, collect all rows in response message
-      if (result.getQueryId != queryId) {
-        throw new IOException(
-          s"queryId in response does not match request: ${result.getQueryId} 
!= $queryId")
-      }
-      if (result.getStatus != Status.SUCCESS.ordinal()) {
-        throw new IOException(s"failure in worker: ${ result.getMessage }")
-      }
-
-      val itor = result.getRows.iterator
-      while (itor.hasNext && rowCount < globalLimit) {
-        output += new CarbonRow(itor.next())
-        rowCount = rowCount + 1
-      }
-      LOG.info(s"[QueryId:$queryId] accumulated result size $rowCount")
-    }
-    def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: 
${ e.getMessage }")
-    def onTimedout() = throw new ExecutionTimeoutException()
-
-    // prune data and get a mapping of worker hostname to list of blocks,
-    // then add these blocks to the QueryRequest and fire the RPC call
-    val nodeBlockMapping: JMap[String, JList[Distributable]] = 
pruneBlock(table, columns, filter)
-    val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
-      // Build a QueryRequest
-      val split = new CarbonMultiBlockSplit(blocks, splitAddress)
-      val request =
-        new QueryRequest(queryId, split, table.getTableInfo, columns, filter, 
localLimit)
-
-      // Find an Endpoind and send the request to it
-      // This RPC is non-blocking so that we do not need to wait before send 
to next worker
-      scheduler.sendRequestAsync(splitAddress, request)
-    }
-
-    // loop to get the result of each Worker
-    tuple.foreach { case (worker: Schedulable, future: Future[QueryResponse]) 
=>
-
-      // if we have enough data already, we do not need to collect more result
-      if (rowCount < globalLimit) {
-        // wait for worker
-        val response = try {
-          future.get(CarbonProperties.getInstance().getQueryTimeout.toLong, 
TimeUnit.SECONDS)
-        } catch {
-          case e: ExecutionException => onFaiure(e)
-          case t: TimeoutException => onTimedout()
-        } finally {
-          worker.workload.decrementAndGet()
-        }
-        LOG.info(s"[QueryId:$queryId] receive search response from worker " +
-                 s"${worker.address}:${worker.port}")
-        onSuccess(response)
-      }
-    }
-    output.toArray
-  }
-
-  /**
-   * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of host address to list of block
-   */
-  private def pruneBlock(
-      table: CarbonTable,
-      columns: Array[String],
-      filter: Expression): JMap[String, JList[Distributable]] = {
-    val jobConf = new JobConf(new Configuration)
-    val job = new Job(jobConf)
-    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
-      job, table, columns, filter, null, null)
-
-    // We will do FG pruning in reader side, so don't do it here
-    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false)
-    val splits = format.getSplits(job)
-    val distributables = splits.asScala.map { split =>
-      split.asInstanceOf[Distributable]
-    }
-    CarbonLoaderUtil.nodeBlockMapping(
-      distributables.asJava,
-      -1,
-      getWorkers.asJava,
-      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST,
-      null)
-  }
-
-  /** return hostname of all workers */
-  def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq
-}
-
-// Exception if execution timed out in search mode
-class ExecutionTimeoutException extends RuntimeException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala 
b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
deleted file mode 100644
index fb3ef86..0000000
--- a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import java.io.IOException
-import java.util.concurrent.{Callable, Executors, Future}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.store.rpc.QueryService
-import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse}
-
-/**
- * [[Master]] uses Scheduler to pick a Worker to send request
- */
-@InterfaceAudience.Internal
-private[store] class Scheduler {
-  // mapping of worker IP address to worker instance
-  private val workers = mutable.Map[String, Schedulable]()
-  private val random = new Random()
-  private val executors = Executors.newCachedThreadPool()
-  private val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * Pick a Worker according to the address and workload of the Worker
-   * Invoke the RPC and return Future result
-   */
-  def sendRequestAsync(
-      splitAddress: String,
-      request: QueryRequest): (Schedulable, Future[QueryResponse]) = {
-    require(splitAddress != null)
-    if (workers.isEmpty) {
-      throw new IOException("No worker is available")
-    }
-    var worker: Schedulable = pickWorker(splitAddress)
-
-    // check whether worker exceed max workload, if exceeded, pick next worker
-    val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores)
-    var numTry = workers.size
-    do {
-      if (worker.workload.get() >= maxWorkload) {
-        LOG.info(s"worker ${worker.address}:${worker.port} reach limit, 
re-select worker...")
-        worker = pickNextWorker(worker)
-        numTry = numTry - 1
-      } else {
-        numTry = -1
-      }
-    } while (numTry > 0)
-    if (numTry == 0) {
-      // tried so many times and still not able to find Worker
-      throw new WorkerTooBusyException(
-        s"All workers are busy, number of workers: ${workers.size}, workload 
limit: $maxWorkload")
-    }
-    LOG.info(s"sending search request to worker 
${worker.address}:${worker.port}")
-    val future = executors.submit(
-      new Callable[QueryResponse] {
-        override def call(): QueryResponse = worker.service.query(request)
-      }
-    )
-    worker.workload.incrementAndGet()
-    (worker, future)
-  }
-
-  private def pickWorker[T: ClassTag](splitAddress: String) = {
-    try {
-      workers(splitAddress)
-    } catch {
-      case e: NoSuchElementException =>
-        // no local worker available, choose one worker randomly
-        pickRandomWorker()
-    }
-  }
-
-  /** pick a worker randomly */
-  private def pickRandomWorker() = {
-    val index = random.nextInt(workers.size)
-    workers.toSeq(index)._2
-  }
-
-  /** pick the next worker of the input worker in the [[Scheduler.workers]] */
-  private def pickNextWorker(worker: Schedulable) = {
-    val index = workers.zipWithIndex.find { case ((address, w), index) =>
-      w == worker
-    }.get._2
-    if (index == workers.size - 1) {
-      workers.toSeq.head._2
-    } else {
-      workers.toSeq(index + 1)._2
-    }
-  }
-
-  /** A new searcher is trying to register, add it to the map and connect to 
this searcher */
-  def addWorker(address: String, schedulable: Schedulable): Unit = {
-    require(schedulable != null)
-    require(address.equals(schedulable.address))
-    workers(address) = schedulable
-  }
-
-  def removeWorker(address: String): Unit = {
-    workers.remove(address)
-  }
-
-  def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator
-}
-
-/**
- * Represent a Worker which [[Scheduler]] can send
- * Search request on it
- * @param id Worker ID, a UUID string
- * @param cores, number of cores in Worker
- * @param service RPC service reference
- * @param workload number of outstanding request sent to Worker
- */
-private[store] class Schedulable(
-    val id: String,
-    val address: String,
-    val port: Int,
-    val cores: Int,
-    val service: QueryService,
-    var workload: AtomicInteger) {
-  def this(id: String, address: String, port: Int, cores: Int, service: 
QueryService) = {
-    this(id, address, port, cores, service, new AtomicInteger())
-  }
-}
-
-class WorkerTooBusyException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
----------------------------------------------------------------------
diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala 
b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
deleted file mode 100644
index 2ded00b..0000000
--- a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import java.io.IOException
-import java.net.{BindException, InetAddress}
-
-import org.apache.hadoop.ipc.RPC
-
-import org.apache.carbondata.common.annotations.InterfaceAudience
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.store.rpc.{QueryService, RegistryService, 
ServiceFactory}
-import org.apache.carbondata.store.rpc.impl.QueryServiceImpl
-import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest
-
-@InterfaceAudience.Internal
-private[store] object Worker {
-  private val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  private val hostAddress = InetAddress.getLocalHost.getHostAddress
-  private var port: Int = _
-  private var registry: RegistryService = _
-
-  def init(masterHostAddress: String, masterPort: Int): Unit = {
-    LOG.info(s"initializing worker...")
-    startService()
-    LOG.info(s"registering to master $masterHostAddress:$masterPort")
-    val workerId = registerToMaster(masterHostAddress, masterPort)
-    LOG.info(s"worker registered to master, workerId: $workerId")
-  }
-
-  def buildServer(serverHost: String, serverPort: Int): RPC.Server = {
-    val hadoopConf = FileFactory.getConfiguration
-    val builder = new RPC.Builder(hadoopConf)
-    builder
-      .setNumHandlers(Runtime.getRuntime.availableProcessors)
-      .setBindAddress(serverHost)
-      .setPort(serverPort)
-      .setProtocol(classOf[QueryService])
-      .setInstance(new QueryServiceImpl)
-      .build
-  }
-
-  /**
-   * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
-   */
-  private def startService(): Unit = {
-    new Thread(new Runnable {
-      override def run(): Unit = {
-        port = CarbonProperties.getSearchWorkerPort
-        var searchServer: RPC.Server = null
-        var exception: BindException = null
-        var numTry = 100  // we will try to create service at worse case 100 
times
-        do {
-          try {
-            LOG.info(s"building search-service on $hostAddress:$port")
-            searchServer = buildServer(hostAddress, port)
-            numTry = 0
-          } catch {
-            case e: BindException =>
-              // port is occupied, increase the port number and try again
-              exception = e
-              LOG.error(s"start search-service failed: ${e.getMessage}")
-              port = port + 1
-              numTry = numTry - 1
-          }
-        } while (numTry > 0)
-        if (searchServer == null) {
-          // we have tried many times, but still failed to find an available 
port
-          throw exception
-        }
-        LOG.info("starting search-service")
-        searchServer.start()
-        LOG.info("search-service started")
-      }
-    }).start()
-  }
-
-  private def registerToMaster(registryHostAddress: String, registryPort: 
Int): String = {
-    LOG.info(s"trying to register to master 
$registryHostAddress:$registryPort")
-    if (registry == null) {
-      registry = ServiceFactory.createRegistryService(registryHostAddress, 
registryPort)
-    }
-    val cores = Runtime.getRuntime.availableProcessors()
-    val request = new RegisterWorkerRequest(hostAddress, port, cores)
-    val response = try {
-      registry.registerWorker(request)
-    } catch {
-      case throwable: Throwable =>
-        LOG.error(s"worker failed to registered: $throwable")
-        throw new IOException(throwable)
-    }
-
-    LOG.info("worker registered")
-    response.getWorkerId
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala 
b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
deleted file mode 100644
index 95e7335..0000000
--- a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store
-
-import org.apache.hadoop.ipc.ProtocolSignature
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.store.rpc.QueryService
-import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse, 
ShutdownRequest, ShutdownResponse}
-
-class SchedulerSuite extends FunSuite with BeforeAndAfterEach {
-
-  var scheduler: Scheduler = _
-  var w1: Schedulable = _
-  var w2: Schedulable = _
-  var w3: Schedulable = _
-
-  override def beforeEach(): Unit = {
-    scheduler = new Scheduler()
-    w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef())
-    w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef())
-    w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef())
-
-    scheduler.addWorker("1.1.1.1", w1)
-    scheduler.addWorker("1.1.1.2", w2)
-    scheduler.addWorker("1.1.1.3", w3)
-  }
-
-  test("test addWorker, removeWorker, getAllWorkers") {
-    assertResult(Set("1.1.1.1", "1.1.1.2", 
"1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    scheduler.removeWorker("1.1.1.2")
-    assertResult(Set("1.1.1.1", 
"1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet)
-
-    val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef())
-    scheduler.addWorker("1.1.1.4", w4)
-    assertResult(Set("1.1.1.1", "1.1.1.3", 
"1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet)
-    assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id)
-  }
-
-  test("test normal schedule") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r1.id)
-    val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r2.id)
-    val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r3.id)
-    val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-    assertResult(w1.id)(r4.id)
-    val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-    assertResult(w2.id)(r5.id)
-    val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    assertResult(w3.id)(r6.id)
-  }
-
-  test("test worker unavailable") {
-    val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null)
-    assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id))
-  }
-
-  test("test reschedule when target worker is overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this 
test suite
-    (1 to 40).foreach { i =>
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-    val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    // it must be worker1 since worker3 exceed max workload
-    assertResult(w1.id)(r.id)
-  }
-
-  test("test all workers are overload") {
-    // by default, maxWorkload is number of core * 10, so it is 40 in this 
test suite
-    (1 to 40).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-  }
-
-  test("test user configured overload param") {
-    val original = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-
-    CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3")
-
-    (1 to 3).foreach { i =>
-      val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null)
-      val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null)
-      val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    val e = intercept[WorkerTooBusyException] {
-      scheduler.sendRequestAsync("1.1.1.3", null)
-    }
-
-    if (original != null) {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, 
original)
-    }
-  }
-
-  test("test invalid property") {
-    intercept[IllegalArgumentException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3")
-    }
-    var value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-
-    intercept[NumberFormatException] {
-      CarbonProperties.getInstance().addProperty(
-        CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s")
-    }
-    value = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT)
-    assertResult(null)(value)
-  }
-}
-
-class DummyRef extends QueryService {
-  override def query(request: QueryRequest): QueryResponse = ???
-
-  override def shutdown(request: ShutdownRequest): ShutdownResponse = ???
-
-  override def getProtocolVersion(protocol: String,
-      clientVersion: Long): Long = ???
-
-  override def getProtocolSignature(protocol: String,
-      clientVersion: Long,
-      clientMethodsHash: Int): ProtocolSignature = ???
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/pom.xml
----------------------------------------------------------------------
diff --git a/store/horizon/pom.xml b/store/horizon/pom.xml
new file mode 100644
index 0000000..3665e53
--- /dev/null
+++ b/store/horizon/pom.xml
@@ -0,0 +1,95 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-horizon</artifactId>
+  <name>Apache CarbonData :: Horizon </name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+    <spring.version>2.0.2.RELEASE</spring.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>om.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
+      <version>${spring.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr4-runtime</artifactId>
+      <version>4.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <version>${spring.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.8.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.9.5</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/anltr/Expression.g4
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/anltr/Expression.g4 
b/store/horizon/src/main/anltr/Expression.g4
new file mode 100644
index 0000000..81688cd
--- /dev/null
+++ b/store/horizon/src/main/anltr/Expression.g4
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * copy from SqlBase.g4 of Presto and Spark.
+ */
+
+grammar Expression;
+
+parseFilter
+ : booleanExpression EOF
+ ;
+
+booleanExpression
+    : predicate
+    | left=booleanExpression operator=AND right=booleanExpression
+    | left=booleanExpression operator=OR right=booleanExpression
+    | '(' booleanExpression ')'
+    ;
+
+predicate
+    : left=primaryExpression comparisonOperator right=primaryExpression
+    | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND 
upper=primaryExpression
+    | left=primaryExpression NOT? IN '(' primaryExpression (',' 
primaryExpression)* ')'
+    | left=primaryExpression IS NOT? NULL
+    ;
+
+primaryExpression
+    : constant                                           #constantDefault
+    | identifier                                         #columnReference
+    | base=identifier '.' fieldName=identifier           #dereference
+    | '(' booleanExpression ')'                          
#parenthesizedExpression
+    ;
+
+constant
+    : NULL                                               #nullLiteral
+    | number                                             #numericLiteral
+    | booleanValue                                       #booleanLiteral
+    | STRING+                                            #stringLiteral
+    ;
+
+identifier
+    : IDENTIFIER                                         #unquotedIdentifier
+    | BACKQUOTED_IDENTIFIER                              #backQuotedIdentifier
+    ;
+
+comparisonOperator
+    : EQ | NEQ | LT | LTE | GT | GTE
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+number
+    : MINUS? DECIMAL_VALUE                               #decimalLiteral
+    | MINUS? INTEGER_VALUE                               #integerLiteral
+    | MINUS? BIGINT_LITERAL                              #bigIntLiteral
+    | MINUS? SMALLINT_LITERAL                            #smallIntLiteral
+    | MINUS? TINYINT_LITERAL                             #tinyIntLiteral
+    | MINUS? DOUBLE_LITERAL                              #doubleLiteral
+    | MINUS? BIGDECIMAL_LITERAL                          #bigDecimalLiteral
+    ;
+
+AND: 'AND';
+BETWEEN: 'BETWEEN';
+FALSE: 'FALSE';
+IN: 'IN';
+IS: 'IS';
+NOT: 'NOT';
+NULL: 'NULL';
+OR: 'OR';
+TRUE: 'TRUE';
+
+EQ  : '=';
+NEQ : '<>' | '!=';
+LT  : '<';
+LTE : '<=';
+GT  : '>';
+GTE : '>=';
+
+MINUS: '-';
+
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : DIGIT+ 'L'
+    ;
+
+SMALLINT_LITERAL
+    : DIGIT+ 'S'
+    ;
+
+TINYINT_LITERAL
+    : DIGIT+ 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+DECIMAL_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D'
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD'
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+WS
+    : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+    : .
+    ;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java
new file mode 100644
index 0000000..1032c51
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.antlr;
+
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.IntStream;
+
+public class ANTLRNoCaseStringStream extends ANTLRInputStream {
+
+  public ANTLRNoCaseStringStream(String input) {
+    super(input);
+  }
+
+  @Override public int LA(int i) {
+    int la = super.LA(i);
+    if (la == 0 || la == IntStream.EOF) {
+      return la;
+    } else {
+      return Character.toUpperCase(la);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java
new file mode 100644
index 0000000..0f28ea9
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.antlr;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression;
+import org.apache.carbondata.core.scan.expression.conditional.NotInExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.expression.logical.RangeExpression;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionBaseVisitor;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionParser;
+
+public class FilterVisitor extends ExpressionBaseVisitor<Expression> {
+
+  private CarbonTable carbonTable;
+
+  public FilterVisitor(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+  }
+
+  public ColumnExpression getColumnExpression(String columnName) {
+    return getColumnExpression(carbonTable.getTableName(), columnName);
+  }
+
+  public ColumnExpression getColumnExpression(String tableName, String 
columnName) {
+    CarbonColumn column = carbonTable.getColumnByName(tableName, columnName);
+    if (column == null) {
+      throw new RuntimeException("column not exists: " + tableName + "." + 
columnName);
+    }
+    return new ColumnExpression(column.getColName(), column.getDataType());
+  }
+
+  @Override
+  public Expression visitParseFilter(ExpressionParser.ParseFilterContext ctx) {
+    return visitBooleanExpression(ctx.booleanExpression());
+  }
+
+  @Override
+  public Expression 
visitBooleanExpression(ExpressionParser.BooleanExpressionContext ctx) {
+    if (ctx.AND() != null) {
+      return new AndExpression(visitBooleanExpression(ctx.left), 
visitBooleanExpression(ctx.right));
+    } else if (ctx.OR() != null) {
+      return new OrExpression(visitBooleanExpression(ctx.left), 
visitBooleanExpression(ctx.right));
+    } else if (ctx.predicate() != null) {
+      return visitPredicate(ctx.predicate());
+    } else if (!ctx.booleanExpression().isEmpty()) {
+      return visitBooleanExpression(ctx.booleanExpression().get(0));
+    }
+
+    return super.visitBooleanExpression(ctx);
+  }
+
+  @Override
+  public Expression visitPredicate(ExpressionParser.PredicateContext ctx) {
+    ExpressionParser.ComparisonOperatorContext comparision = 
ctx.comparisonOperator();
+    if (comparision != null) {
+      if (comparision.EQ() != null) {
+        return new EqualToExpression(visit(ctx.left), visit(ctx.right));
+      } else if (comparision.GT() != null) {
+        return new GreaterThanExpression(visit(ctx.left), visit(ctx.right));
+      } else if (comparision.GTE() != null) {
+        return new GreaterThanEqualToExpression(visit(ctx.left), 
visit(ctx.right));
+      } else if (comparision.LT() != null) {
+        return new LessThanExpression(visit(ctx.left), visit(ctx.right));
+      } else if (comparision.LTE() != null) {
+        return new LessThanEqualToExpression(visit(ctx.left), 
visit(ctx.right));
+      } else if (comparision.NEQ() != null) {
+        return new NotEqualsExpression(visit(ctx.left), visit(ctx.right));
+      }
+    } else if (ctx.BETWEEN() != null) {
+      if (ctx.NOT() != null) {
+        return new RangeExpression(new GreaterThanExpression(visit(ctx.left), 
visit(ctx.upper)),
+            new LessThanExpression(visit(ctx.left), visit(ctx.lower)));
+      } else {
+        return new RangeExpression(
+            new GreaterThanEqualToExpression(visit(ctx.left), 
visit(ctx.lower)),
+            new LessThanEqualToExpression(visit(ctx.left), visit(ctx.upper)));
+      }
+    } else if (ctx.IN() != null) {
+      List<Expression> listExpression = new ArrayList<Expression>();
+      List<ExpressionParser.PrimaryExpressionContext> 
primaryExpressionContexts =
+          ctx.primaryExpression();
+      for (ExpressionParser.PrimaryExpressionContext primary : 
primaryExpressionContexts) {
+        if (ctx.left != primary) {
+          listExpression.add(visit(primary));
+        }
+      }
+      if (ctx.NOT() != null) {
+        return new NotInExpression(visit(ctx.left), new 
ListExpression(listExpression));
+      } else {
+        return new InExpression(visit(ctx.left), new 
ListExpression(listExpression));
+      }
+    } else if (ctx.NULL() != null) {
+      if (ctx.NOT() == null) {
+        return new EqualToExpression(
+            visit(ctx.left), new LiteralExpression(null, DataTypes.STRING), 
true);
+      } else {
+        return new NotEqualsExpression(visit(ctx.left),
+            new LiteralExpression(null, DataTypes.STRING), true);
+      }
+    }
+    return super.visitPredicate(ctx);
+  }
+
+  @Override
+  public Expression visitNullLiteral(ExpressionParser.NullLiteralContext ctx) {
+    return null;
+  }
+
+  @Override
+  public Expression visitDecimalLiteral(ExpressionParser.DecimalLiteralContext 
ctx) {
+    return new LiteralExpression(new BigDecimal(ctx.getText()),
+        DataTypes.createDefaultDecimalType());
+  }
+
+  @Override
+  public Expression visitIntegerLiteral(ExpressionParser.IntegerLiteralContext 
ctx) {
+    return new LiteralExpression(Integer.parseInt(ctx.getText()), 
DataTypes.INT);
+  }
+
+  @Override
+  public Expression visitBigIntLiteral(ExpressionParser.BigIntLiteralContext 
ctx) {
+    return new LiteralExpression(Long.parseLong(ctx.getText()), 
DataTypes.LONG);
+  }
+
+  @Override
+  public Expression 
visitSmallIntLiteral(ExpressionParser.SmallIntLiteralContext ctx) {
+    return new LiteralExpression(Short.parseShort(ctx.getText()), 
DataTypes.SHORT);
+  }
+
+  @Override
+  public Expression visitTinyIntLiteral(ExpressionParser.TinyIntLiteralContext 
ctx) {
+    return new LiteralExpression(Short.parseShort(ctx.getText()), 
DataTypes.SHORT);
+  }
+
+  @Override
+  public Expression visitDoubleLiteral(ExpressionParser.DoubleLiteralContext 
ctx) {
+    return new LiteralExpression(Double.parseDouble(ctx.getText()), 
DataTypes.DOUBLE);
+  }
+
+  @Override
+  public Expression 
visitBigDecimalLiteral(ExpressionParser.BigDecimalLiteralContext ctx) {
+    return new LiteralExpression(new BigDecimal(ctx.getText()),
+        DataTypes.createDefaultDecimalType());
+  }
+
+  @Override
+  public Expression visitBooleanLiteral(ExpressionParser.BooleanLiteralContext 
ctx) {
+    ExpressionParser.BooleanValueContext booleanValueContext = 
ctx.booleanValue();
+    if (booleanValueContext.FALSE() != null) {
+      return new LiteralExpression(false, DataTypes.BOOLEAN);
+    } else {
+      return new LiteralExpression(true, DataTypes.BOOLEAN);
+    }
+  }
+
+  @Override
+  public Expression visitStringLiteral(ExpressionParser.StringLiteralContext 
ctx) {
+    return new LiteralExpression(ctx.getText(), DataTypes.STRING);
+  }
+
+  @Override
+  public Expression 
visitUnquotedIdentifier(ExpressionParser.UnquotedIdentifierContext ctx) {
+    return getColumnExpression(ctx.getText());
+  }
+
+  private String identifier(String identifier) {
+    return identifier.replace("", "`");
+  }
+
+  @Override
+  public Expression 
visitBackQuotedIdentifier(ExpressionParser.BackQuotedIdentifierContext ctx) {
+    return getColumnExpression(identifier(ctx.getText()));
+  }
+
+  @Override
+  public Expression visitDereference(ExpressionParser.DereferenceContext ctx) {
+    String tableName = identifier(ctx.base.getText());
+    String columnName = identifier(ctx.fieldName.getText());
+    return getColumnExpression(tableName, columnName);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens
new file mode 100644
index 0000000..0ba3c59
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens
@@ -0,0 +1,51 @@
+T__0=1
+T__1=2
+T__2=3
+T__3=4
+AND=5
+BETWEEN=6
+FALSE=7
+IN=8
+IS=9
+NOT=10
+NULL=11
+OR=12
+TRUE=13
+EQ=14
+NEQ=15
+LT=16
+LTE=17
+GT=18
+GTE=19
+MINUS=20
+STRING=21
+BIGINT_LITERAL=22
+SMALLINT_LITERAL=23
+TINYINT_LITERAL=24
+INTEGER_VALUE=25
+DECIMAL_VALUE=26
+DOUBLE_LITERAL=27
+BIGDECIMAL_LITERAL=28
+IDENTIFIER=29
+BACKQUOTED_IDENTIFIER=30
+WS=31
+UNRECOGNIZED=32
+'('=1
+')'=2
+','=3
+'.'=4
+'AND'=5
+'BETWEEN'=6
+'FALSE'=7
+'IN'=8
+'IS'=9
+'NOT'=10
+'NULL'=11
+'OR'=12
+'TRUE'=13
+'='=14
+'<'=16
+'<='=17
+'>'=18
+'>='=19
+'-'=20

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java
new file mode 100644
index 0000000..42ef9c8
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java
@@ -0,0 +1,168 @@
+// Generated from 
/home/david/Documents/code/carbondata/store/horizon/src/main/anltr/Expression.g4
 by ANTLR 4.7
+package org.apache.carbondata.horizon.antlr.gen;
+import org.antlr.v4.runtime.tree.AbstractParseTreeVisitor;
+
+/**
+ * This class provides an empty implementation of {@link ExpressionVisitor},
+ * which can be extended to create a visitor which only needs to handle a 
subset
+ * of the available methods.
+ *
+ * @param <T> The return type of the visit operation. Use {@link Void} for
+ * operations with no return type.
+ */
+public class ExpressionBaseVisitor<T> extends AbstractParseTreeVisitor<T> 
implements ExpressionVisitor<T> {
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T visitParseFilter(ExpressionParser.ParseFilterContext 
ctx) { return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBooleanExpression(ExpressionParser.BooleanExpressionContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T visitPredicate(ExpressionParser.PredicateContext 
ctx) { return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitConstantDefault(ExpressionParser.ConstantDefaultContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitColumnReference(ExpressionParser.ColumnReferenceContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T visitDereference(ExpressionParser.DereferenceContext 
ctx) { return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitParenthesizedExpression(ExpressionParser.ParenthesizedExpressionContext 
ctx) { return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T visitNullLiteral(ExpressionParser.NullLiteralContext 
ctx) { return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitNumericLiteral(ExpressionParser.NumericLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBooleanLiteral(ExpressionParser.BooleanLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitStringLiteral(ExpressionParser.StringLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitUnquotedIdentifier(ExpressionParser.UnquotedIdentifierContext ctx) { 
return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBackQuotedIdentifier(ExpressionParser.BackQuotedIdentifierContext ctx) { 
return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitComparisonOperator(ExpressionParser.ComparisonOperatorContext ctx) { 
return visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBooleanValue(ExpressionParser.BooleanValueContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitDecimalLiteral(ExpressionParser.DecimalLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitIntegerLiteral(ExpressionParser.IntegerLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBigIntLiteral(ExpressionParser.BigIntLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitSmallIntLiteral(ExpressionParser.SmallIntLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitTinyIntLiteral(ExpressionParser.TinyIntLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitDoubleLiteral(ExpressionParser.DoubleLiteralContext ctx) { return 
visitChildren(ctx); }
+       /**
+        * {@inheritDoc}
+        *
+        * <p>The default implementation returns the result of calling
+        * {@link #visitChildren} on {@code ctx}.</p>
+        */
+       @Override public T 
visitBigDecimalLiteral(ExpressionParser.BigDecimalLiteralContext ctx) { return 
visitChildren(ctx); }
+}
\ No newline at end of file

Reply via email to