http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java
new file mode 100644
index 0000000..304fd0f
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Internal
+public class QueryResponse extends BaseResponse implements Serializable, 
Writable {
+  private int queryId;
+  private Object[][] rows;
+
+  public QueryResponse() {
+    super();
+  }
+
+  public QueryResponse(int queryId, int status, String message, Object[][] 
rows) {
+    super(status, message);
+    this.queryId = queryId;
+    this.rows = rows;
+  }
+
+  public int getQueryId() {
+    return queryId;
+  }
+
+
+  public Object[][] getRows() {
+    return rows;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(queryId);
+    WritableUtils.writeCompressedByteArray(out, 
ObjectSerializationUtil.serialize(rows));
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    queryId = in.readInt();
+    try {
+      rows = (Object[][])ObjectSerializationUtil.deserialize(
+          WritableUtils.readCompressedByteArray(in));
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java
new file mode 100644
index 0000000..5f223d6
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerRequest implements Serializable, Writable {
+  private String hostAddress;
+  private int port;
+  private int cores;
+
+  public RegisterWorkerRequest() {
+  }
+
+  public RegisterWorkerRequest(String hostAddress, int port, int cores) {
+    this.hostAddress = hostAddress;
+    this.port = port;
+    this.cores = cores;
+  }
+
+  public String getHostAddress() {
+    return hostAddress;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public int getCores() {
+    return cores;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(hostAddress);
+    out.writeInt(port);
+    out.writeInt(cores);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    hostAddress = in.readUTF();
+    port = in.readInt();
+    cores = in.readInt();
+  }
+
+  @Override public String toString() {
+    return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + 
", port=" + port + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java
new file mode 100644
index 0000000..16915e9
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class RegisterWorkerResponse implements Serializable, Writable {
+
+  private String workerId;
+
+  public RegisterWorkerResponse() {
+  }
+
+  public RegisterWorkerResponse(String workerId) {
+    this.workerId = workerId;
+  }
+
+  public String getWorkerId() {
+    return workerId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(workerId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    workerId = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java
new file mode 100644
index 0000000..86767e5
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class Scan implements Serializable, Writable {
+  private int requestId;
+  private CarbonMultiBlockSplit split;
+  private TableInfo tableInfo;
+  private String[] projectColumns;
+  private Expression filterExpression;
+  private long limit;
+
+  public Scan() {
+  }
+
+  public Scan(int requestId, CarbonMultiBlockSplit split,
+      TableInfo tableInfo, String[] projectColumns, Expression 
filterExpression, long limit) {
+    this.requestId = requestId;
+    this.split = split;
+    this.tableInfo = tableInfo;
+    this.projectColumns = projectColumns;
+    this.filterExpression = filterExpression;
+    this.limit = limit;
+  }
+
+  public int getRequestId() {
+    return requestId;
+  }
+
+  public CarbonMultiBlockSplit getSplit() {
+    return split;
+  }
+
+  public TableInfo getTableInfo() {
+    return tableInfo;
+  }
+
+  public String[] getProjectColumns() {
+    return projectColumns;
+  }
+
+  public Expression getFilterExpression() {
+    return filterExpression;
+  }
+
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+    split.write(out);
+    tableInfo.write(out);
+    out.writeInt(projectColumns.length);
+    for (String projectColumn : projectColumns) {
+      out.writeUTF(projectColumn);
+    }
+    String filter = 
ObjectSerializationUtil.convertObjectToString(filterExpression);
+    out.writeUTF(filter);
+    out.writeLong(limit);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    requestId = in.readInt();
+    split = new CarbonMultiBlockSplit();
+    split.readFields(in);
+    tableInfo = new TableInfo();
+    tableInfo.readFields(in);
+    projectColumns = new String[in.readInt()];
+    for (int i = 0; i < projectColumns.length; i++) {
+      projectColumns[i] = in.readUTF();
+    }
+    String filter = in.readUTF();
+    filterExpression = (Expression) 
ObjectSerializationUtil.convertStringToObject(filter);
+    limit = in.readLong();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java
new file mode 100644
index 0000000..311963d
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownRequest implements Serializable, Writable {
+  private String reason;
+
+  public ShutdownRequest() {
+  }
+
+  public ShutdownRequest(String reason) {
+    this.reason = reason;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(reason);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reason = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java
new file mode 100644
index 0000000..0143f48
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownResponse implements Serializable, Writable {
+  private int status;
+  private String message;
+
+  public ShutdownResponse() {
+  }
+
+  public ShutdownResponse(int status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(status);
+    out.writeUTF(message);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    message = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
new file mode 100644
index 0000000..fd13b20
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.worker;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import 
org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import 
org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.impl.CarbonStoreBase;
+import org.apache.carbondata.store.impl.Status;
+import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
+import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * It handles request from master.
+ */
+@InterfaceAudience.Internal
+class RequestHandler {
+
+  private StoreConf storeConf;
+  private Configuration hadoopConf;
+
+  RequestHandler(StoreConf conf, Configuration hadoopConf) {
+    this.storeConf = conf;
+    this.hadoopConf = hadoopConf;
+  }
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RequestHandler.class.getName());
+
+  QueryResponse handleScan(Scan scan) {
+    try {
+      LOGGER.info(String.format("[QueryId:%d] receive search request", 
scan.getRequestId()));
+      CarbonTable table = CarbonTable.buildFromTableInfo(scan.getTableInfo());
+      List<CarbonRow> rows = CarbonStoreBase.scan(table, scan);
+      LOGGER.info(String.format("[QueryId:%d] sending success response", 
scan.getRequestId()));
+      return createSuccessResponse(scan, rows);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      LOGGER.info(String.format("[QueryId:%d] sending failure response", 
scan.getRequestId()));
+      return createFailureResponse(scan, e);
+    }
+  }
+
+  ShutdownResponse handleShutdown(ShutdownRequest request) {
+    LOGGER.info("Shutting down worker...");
+    SearchModeDetailQueryExecutor.shutdownThreadPool();
+    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+    LOGGER.info("Worker shut down");
+    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
+  }
+
+  /**
+   * create a failure response
+   */
+  private QueryResponse createFailureResponse(Scan scan, Throwable throwable) {
+    return new QueryResponse(scan.getRequestId(), Status.FAILURE.ordinal(),
+        throwable.getMessage(), new Object[0][]);
+  }
+
+  /**
+   * create a success response with result rows
+   */
+  private QueryResponse createSuccessResponse(Scan scan, List<CarbonRow> rows) 
{
+    Iterator<CarbonRow> itor = rows.iterator();
+    Object[][] output = new Object[rows.size()][];
+    int i = 0;
+    while (itor.hasNext()) {
+      output[i++] = itor.next().getData();
+    }
+    return new QueryResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), 
"", output);
+  }
+
+  public BaseResponse handleLoadData(LoadDataRequest request) {
+    DataLoadExecutor executor = null;
+    try {
+      CarbonLoadModel model = request.getModel();
+
+      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+      CarbonInputFormatUtil.createJobTrackerID(new Date());
+      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
+      Configuration configuration = new Configuration(hadoopConf);
+      StoreUtil.configureCSVInputFormat(configuration, model);
+      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
+      // Set up the attempt context required to use in the output committer.
+      TaskAttemptContext hadoopAttemptContext =
+          new TaskAttemptContextImpl(configuration, taskAttemptId);
+
+      CSVInputFormat format = new CSVInputFormat();
+      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
+
+      CarbonIterator<Object[]>[] readerIterators = new 
CSVRecordReaderIterator[splits.size()];
+      for (int index = 0; index < splits.size(); index++) {
+        readerIterators[index] = new CSVRecordReaderIterator(
+            format.createRecordReader(splits.get(index), 
hadoopAttemptContext), splits.get(index),
+            hadoopAttemptContext);
+      }
+
+      executor = new DataLoadExecutor();
+      executor.execute(model, storeConf.storeTempLocation(), readerIterators);
+
+      return new BaseResponse(Status.SUCCESS.ordinal(), "");
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle load data");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (InterruptedException e) {
+      LOGGER.error(e, "Interrupted handle load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed to execute load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } finally {
+      if (executor != null) {
+        executor.close();
+        
StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
new file mode 100644
index 0000000..26f252c
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.worker;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.impl.rpc.StoreService;
+import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
+import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
+@InterfaceAudience.Internal
+public class StoreServiceImpl implements StoreService {
+
+  private Worker worker;
+  RequestHandler handler;
+
+  public StoreServiceImpl(Worker worker) {
+    this.worker = worker;
+    this.handler = new RequestHandler(worker.getConf(), 
worker.getHadoopConf());
+  }
+
+  @Override
+  public BaseResponse loadData(LoadDataRequest request) {
+    return handler.handleLoadData(request);
+  }
+
+  @Override
+  public QueryResponse query(Scan scan) {
+    return handler.handleScan(scan);
+  }
+
+  @Override
+  public ShutdownResponse shutdown(ShutdownRequest request) {
+    return handler.handleShutdown(request);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+
+  public Worker getWorker() {
+    return worker;
+  }
+
+  public void setWorker(Worker worker) {
+    this.worker = worker;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
new file mode 100644
index 0000000..a360e36
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.worker;
+
+import java.io.IOException;
+import java.net.BindException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.impl.rpc.RegistryService;
+import org.apache.carbondata.store.impl.rpc.ServiceFactory;
+import org.apache.carbondata.store.impl.rpc.StoreService;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+@InterfaceAudience.Internal
+public class Worker {
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Worker.class.getName());
+
+  private String id;
+  private RegistryService registry;
+  private StoreConf conf;
+  private Configuration hadoopConf;
+  private RPC.Server server;
+
+  public Worker(StoreConf conf) {
+    this.conf = conf;
+    this.hadoopConf = this.conf.newHadoopConf();
+  }
+
+  public void start() {
+    try {
+      startService();
+      registerToMaster();
+    } catch (IOException e) {
+      LOGGER.error(e, "worker failed to start");
+    }
+  }
+
+  private void startService() throws IOException {
+    BindException exception;
+    // we will try to create service at worse case 100 times
+    int numTry = 100;
+    int coreNum = conf.workerCoreNum();
+    String host = conf.workerHost();
+    int port = conf.workerPort();
+    StoreService queryService = new StoreServiceImpl(this);
+    do {
+      try {
+        server = new RPC.Builder(hadoopConf)
+            .setNumHandlers(coreNum)
+            .setBindAddress(host)
+            .setPort(port)
+            .setProtocol(StoreService.class)
+            .setInstance(queryService)
+            .build();
+        server.start();
+
+        numTry = 0;
+        exception = null;
+      } catch (BindException e) {
+        // port is occupied, increase the port number and try again
+        exception = e;
+        port = port + 1;
+        numTry = numTry - 1;
+      }
+    } while (numTry > 0);
+
+    if (exception != null) {
+      // we have tried many times, but still failed to find an available port
+      LOGGER.error(exception, "worker failed to start");
+      throw exception;
+    }
+
+    conf.conf(StoreConf.WORKER_PORT, port);
+    LOGGER.info("worker started on " + host + ":" + port + " successfully");
+
+  }
+
+  public void stop() {
+    try {
+      stopService();
+    } catch (InterruptedException e) {
+      LOGGER.error(e, "worker failed to start");
+    }
+  }
+
+  private void stopService() throws InterruptedException {
+    if (server != null) {
+      server.stop();
+      server.join();
+      server = null;
+    }
+  }
+
+  private void registerToMaster() throws IOException {
+    LOGGER.info("trying to register to master " + conf.masterHost() + ":" + 
conf.masterPort());
+    if (registry == null) {
+      registry = ServiceFactory.createRegistryService(conf.masterHost(), 
conf.masterPort());
+    }
+    RegisterWorkerRequest request =
+        new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), 
conf.workerCoreNum());
+    try {
+      RegisterWorkerResponse response = registry.registerWorker(request);
+      id = response.getWorkerId();
+    } catch (Throwable throwable) {
+      LOGGER.error(throwable, "worker failed to register");
+      throw new IOException(throwable);
+    }
+
+    LOGGER.info("worker " + id + " registered successfully");
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: Worker <log4j file> <properties file>");
+      return;
+    }
+
+    StoreUtil.initLog4j(args[0]);
+    Worker worker = new Worker(new StoreConf(args[1]));
+    worker.start();
+  }
+
+  public StoreConf getConf() {
+    return conf;
+  }
+
+  public void setConf(StoreConf conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public void setHadoopConf(Configuration hadoopConf) {
+    this.hadoopConf = hadoopConf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java 
b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
deleted file mode 100644
index 0a724d9..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.master;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.locks.CarbonLockUtil;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonLoaderUtil;
-import org.apache.carbondata.store.conf.StoreConf;
-import org.apache.carbondata.store.exception.ExecutionTimeoutException;
-import org.apache.carbondata.store.exception.StoreException;
-import org.apache.carbondata.store.rpc.RegistryService;
-import org.apache.carbondata.store.rpc.ServiceFactory;
-import org.apache.carbondata.store.rpc.StoreService;
-import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl;
-import org.apache.carbondata.store.rpc.impl.Status;
-import org.apache.carbondata.store.rpc.model.BaseResponse;
-import org.apache.carbondata.store.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.scheduler.Schedulable;
-import org.apache.carbondata.store.scheduler.Scheduler;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Master of CarbonSearch.
- * It provides a Registry service for worker to register.
- * And it provides search API to fire RPC call to workers.
- */
-
-public class Master {
-
-  private static Master instance = null;
-
-  private static LogService LOGGER = 
LogServiceFactory.getLogService(Master.class.getName());
-
-  private Map<String, SoftReference<CarbonTable>> cacheTables;
-
-  // worker host address map to EndpointRef
-  private StoreConf conf;
-  private Configuration hadoopConf;
-  private Random random = new Random();
-  private RPC.Server registryServer = null;
-  private Scheduler scheduler = new Scheduler();
-
-  private Master(StoreConf conf) {
-    cacheTables = new HashMap<>();
-    this.conf = conf;
-    this.hadoopConf = this.conf.newHadoopConf();
-  }
-
-  /**
-   * start service and listen on port passed in constructor
-   */
-  public void startService() throws IOException {
-    if (registryServer == null) {
-
-      BindException exception;
-      // we will try to create service at worse case 100 times
-      int numTry = 100;
-      String host = conf.masterHost();
-      int port = conf.masterPort();
-      LOGGER.info("building registry-service on " + host + ":" + port);
-
-      RegistryService registryService = new RegistryServiceImpl(this);
-      do {
-        try {
-          registryServer = new 
RPC.Builder(hadoopConf).setBindAddress(host).setPort(port)
-              
.setProtocol(RegistryService.class).setInstance(registryService).build();
-
-          registryServer.start();
-          numTry = 0;
-          exception = null;
-        } catch (BindException e) {
-          // port is occupied, increase the port number and try again
-          exception = e;
-          LOGGER.error(e, "start registry-service failed");
-          port = port + 1;
-          numTry = numTry - 1;
-        }
-      } while (numTry > 0);
-      if (exception != null) {
-        // we have tried many times, but still failed to find an available port
-        throw exception;
-      }
-      LOGGER.info("registry-service started");
-    } else {
-      LOGGER.info("Search mode master has already started");
-    }
-  }
-
-  public void stopService() throws InterruptedException {
-    if (registryServer != null) {
-      registryServer.stop();
-      registryServer.join();
-      registryServer = null;
-    }
-  }
-
-  public void stopAllWorkers() throws IOException {
-    for (Schedulable worker : getWorkers()) {
-      try {
-        worker.service.shutdown(new ShutdownRequest("user"));
-      } catch (Throwable throwable) {
-        throw new IOException(throwable);
-      }
-      scheduler.removeWorker(worker.getAddress());
-    }
-  }
-
-  /**
-   * A new searcher is trying to register, add it to the map and connect to 
this searcher
-   */
-  public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) 
throws IOException {
-    LOGGER.info(
-        "Receive Register request from worker " + request.getHostAddress() + 
":" + request.getPort()
-            + " with " + request.getCores() + " cores");
-    String workerId = UUID.randomUUID().toString();
-    String workerAddress = request.getHostAddress();
-    int workerPort = request.getPort();
-    LOGGER.info(
-        "connecting to worker " + request.getHostAddress() + ":" + 
request.getPort() + ", workerId "
-            + workerId);
-
-    StoreService searchService = 
ServiceFactory.createStoreService(workerAddress, workerPort);
-    scheduler.addWorker(
-        new Schedulable(workerId, workerAddress, workerPort, 
request.getCores(), searchService));
-    LOGGER.info("worker " + request + " registered");
-    return new RegisterWorkerResponse(workerId);
-  }
-
-  private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> 
output, long globalLimit)
-      throws IOException {
-    // in case of RPC success, collect all rows in response message
-    if (result.getQueryId() != queryId) {
-      throw new IOException(
-          "queryId in response does not match request: " + result.getQueryId() 
+ " != " + queryId);
-    }
-    if (result.getStatus() != Status.SUCCESS.ordinal()) {
-      throw new IOException("failure in worker: " + result.getMessage());
-    }
-    int rowCount = 0;
-    Object[][] rows = result.getRows();
-    for (Object[] row : rows) {
-      output.add(new CarbonRow(row));
-      rowCount++;
-      if (rowCount >= globalLimit) {
-        break;
-      }
-    }
-    LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + 
rowCount);
-    return rowCount;
-  }
-
-  private void onFailure(Throwable e) throws IOException {
-    throw new IOException("exception in worker: " + e.getMessage());
-  }
-
-  private void onTimeout() {
-    throw new ExecutionTimeoutException();
-  }
-
-  public String getTableFolder(String database, String tableName) {
-    return conf.storeLocation() + File.separator + database + File.separator + 
tableName;
-  }
-
-  public CarbonTable getTable(String database, String tableName) throws 
StoreException {
-    String tablePath = getTableFolder(database, tableName);
-    CarbonTable carbonTable;
-    SoftReference<CarbonTable> reference = cacheTables.get(tablePath);
-    if (reference != null) {
-      carbonTable = reference.get();
-      if (carbonTable != null) {
-        return carbonTable;
-      }
-    }
-
-    try {
-      org.apache.carbondata.format.TableInfo tableInfo =
-          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      TableInfo tableInfo1 = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
-      tableInfo1.setTablePath(tablePath);
-      carbonTable = CarbonTable.buildFromTableInfo(tableInfo1);
-      cacheTables.put(tablePath, new SoftReference<>(carbonTable));
-      return carbonTable;
-    } catch (IOException e) {
-      String message = "Failed to get table from " + tablePath;
-      LOGGER.error(e, message);
-      throw new StoreException(message);
-    }
-  }
-
-  public boolean createTable(TableInfo tableInfo, boolean ifNotExists) throws 
IOException {
-    AbsoluteTableIdentifier identifier = 
tableInfo.getOrCreateAbsoluteTableIdentifier();
-    boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
-    if (tableExists) {
-      if (ifNotExists) {
-        return true;
-      } else {
-        throw new IOException(
-            "car't create table " + tableInfo.getDatabaseName() + "." + 
tableInfo.getFactTable()
-                .getTableName() + ", because it already exists");
-      }
-    }
-
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    String databaseName = tableInfo.getDatabaseName();
-    String tableName = tableInfo.getFactTable().getTableName();
-    org.apache.carbondata.format.TableInfo thriftTableInfo =
-        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, 
databaseName, tableName);
-
-    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
-    String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    FileFactory.FileType fileType = 
FileFactory.getFileType(schemaMetadataPath);
-    try {
-      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, 
fileType);
-        if (!isDirCreated) {
-          throw new IOException("Failed to create the metadata directory " + 
schemaMetadataPath);
-        }
-      }
-      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
-      thriftWriter.open(FileWriteOperation.OVERWRITE);
-      thriftWriter.write(thriftTableInfo);
-      thriftWriter.close();
-      return true;
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to handle create table");
-      throw e;
-    }
-  }
-
-  private void openSegment(CarbonLoadModel loadModel, boolean 
isOverwriteTable) throws IOException {
-    try {
-      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, 
isOverwriteTable);
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to handle load data");
-      throw e;
-    }
-  }
-
-  private void closeSegment(CarbonLoadModel loadModel) throws IOException {
-    try {
-      CarbonLoaderUtil.updateTableStatusForFailure(loadModel, "");
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to close segment");
-      throw e;
-    }
-  }
-
-  private void commitSegment(CarbonLoadModel loadModel) throws IOException {
-    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    String segmentId = loadModel.getSegmentId();
-    String segmentFileName = SegmentFileStore
-        .writeSegmentFile(carbonTable, segmentId, 
String.valueOf(loadModel.getFactTimeStamp()));
-
-    AbsoluteTableIdentifier absoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier();
-    String tablePath = absoluteTableIdentifier.getTablePath();
-    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
-    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
-
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-    int retryCount = CarbonLockUtil
-        
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
-    int maxTimeout = CarbonLockUtil
-        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
-    try {
-      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
-        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table 
status updation");
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metadataPath);
-        LoadMetadataDetails loadMetadataDetails = null;
-        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
-          // if the segments is in the list of marked for delete then update 
the status.
-          if (segmentId.equals(detail.getLoadName())) {
-            loadMetadataDetails = detail;
-            detail.setSegmentFile(segmentFileName);
-            break;
-          }
-        }
-        if (loadMetadataDetails == null) {
-          throw new IOException("can not find segment: " + segmentId);
-        }
-
-        CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, 
SegmentStatus.SUCCESS,
-            loadModel.getFactTimeStamp(), true);
-        CarbonLoaderUtil
-            .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, 
carbonTable);
-
-        SegmentStatusManager
-            .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
-      } else {
-        LOGGER.error(
-            "Not able to acquire the lock for Table status updation for table 
path " + tablePath);
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" 
+ tablePath);
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + tablePath + " during 
table status updation");
-      }
-    }
-  }
-
-  public boolean loadData(CarbonLoadModel loadModel, boolean isOverwrite) 
throws IOException {
-    Schedulable worker = scheduler.pickNexWorker();
-    try {
-      if (loadModel.getFactTimeStamp() == 0) {
-        loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
-      }
-      openSegment(loadModel, isOverwrite);
-      LoadDataRequest request = new LoadDataRequest(loadModel);
-      BaseResponse response = scheduler.sendRequest(worker, request);
-      if (Status.SUCCESS.ordinal() == response.getStatus()) {
-        commitSegment(loadModel);
-        return true;
-      } else {
-        closeSegment(loadModel);
-        throw new IOException(response.getMessage());
-      }
-    } finally {
-      worker.workload.decrementAndGet();
-    }
-  }
-
-  /**
-   * Execute search by firing RPC call to worker, return the result rows
-   *
-   * @param table       table to search
-   * @param columns     projection column names
-   * @param filter      filter expression
-   * @param globalLimit max number of rows required in Master
-   * @param localLimit  max number of rows required in Worker
-   * @return CarbonRow array
-   */
-  public CarbonRow[] search(CarbonTable table, String[] columns, Expression 
filter,
-      long globalLimit, long localLimit) throws IOException {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(columns);
-    if (globalLimit < 0 || localLimit < 0) {
-      throw new IllegalArgumentException("limit should be positive");
-    }
-
-    int queryId = random.nextInt();
-
-    List<CarbonRow> output = new ArrayList<>();
-
-    // prune data and get a mapping of worker hostname to list of blocks,
-    // then add these blocks to the QueryRequest and fire the RPC call
-    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, 
columns, filter);
-    Set<Map.Entry<String, List<Distributable>>> entries = 
nodeBlockMapping.entrySet();
-    List<Future<QueryResponse>> futures = new ArrayList<>(entries.size());
-    List<Schedulable> workers = new ArrayList<>(entries.size());
-    for (Map.Entry<String, List<Distributable>> entry : entries) {
-      CarbonMultiBlockSplit split = new 
CarbonMultiBlockSplit(entry.getValue(), entry.getKey());
-      QueryRequest request =
-          new QueryRequest(queryId, split, table.getTableInfo(), columns, 
filter, localLimit);
-
-      // Find an Endpoind and send the request to it
-      // This RPC is non-blocking so that we do not need to wait before send 
to next worker
-      Schedulable worker = scheduler.pickWorker(entry.getKey());
-      workers.add(worker);
-      futures.add(scheduler.sendRequestAsync(worker, request));
-    }
-
-    int rowCount = 0;
-    int length = futures.size();
-    for (int i = 0; i < length; i++) {
-      Future<QueryResponse> future = futures.get(i);
-      Schedulable worker = workers.get(i);
-      if (rowCount < globalLimit) {
-        // wait for worker
-        QueryResponse response = null;
-        try {
-          response = future
-              .get((long) (CarbonProperties.getInstance().getQueryTimeout()), 
TimeUnit.SECONDS);
-        } catch (ExecutionException | InterruptedException e) {
-          onFailure(e);
-        } catch (TimeoutException t) {
-          onTimeout();
-        } finally {
-          worker.workload.decrementAndGet();
-        }
-        LOGGER.info("[QueryId: " + queryId + "] receive search response from 
worker " + worker);
-        rowCount += onSuccess(queryId, response, output, globalLimit);
-      }
-    }
-    CarbonRow[] rows = new CarbonRow[output.size()];
-    return output.toArray(rows);
-  }
-
-  /**
-   * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of host address to list of block
-   */
-  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, 
String[] columns,
-      Expression filter) throws IOException {
-    JobConf jobConf = new JobConf(new Configuration());
-    Job job = new Job(jobConf);
-    CarbonTableInputFormat format;
-    try {
-      format = CarbonInputFormatUtil
-          .createCarbonTableInputFormat(job, table, columns, filter, null, 
null, true);
-    } catch (InvalidConfigurationException e) {
-      throw new IOException(e.getMessage());
-    }
-
-    // We will do FG pruning in reader side, so don't do it here
-    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
-    List<InputSplit> splits = format.getSplits(job);
-    List<Distributable> blockInfos = new ArrayList<>(splits.size());
-    for (InputSplit split : splits) {
-      blockInfos.add((Distributable) split);
-    }
-    return CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, 
getWorkerAddresses(),
-        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
-  }
-
-  /**
-   * return hostname of all workers
-   */
-  public List<Schedulable> getWorkers() {
-    return scheduler.getAllWorkers();
-  }
-
-  private List<String> getWorkerAddresses() {
-    return scheduler.getAllWorkerAddresses();
-  }
-
-  public static synchronized Master getInstance(StoreConf conf) {
-    if (instance == null) {
-      instance = new Master(conf);
-    }
-    return instance;
-  }
-
-  public static void main(String[] args) throws InterruptedException {
-    if (args.length != 2) {
-      System.err.println("Usage: Master <log4j file> <properties file>");
-      return;
-    }
-
-    StoreUtil.initLog4j(args[0]);
-    StoreConf conf = new StoreConf(args[1]);
-    Master master = getInstance(conf);
-    master.stopService();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
deleted file mode 100644
index 08a0e97..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
-
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-@InterfaceAudience.Internal
-public interface RegistryService extends VersionedProtocol {
-  long versionID = 1L;
-  RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws 
IOException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
deleted file mode 100644
index d9d0f3e..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-
-@InterfaceAudience.Internal
-public class ServiceFactory {
-
-  public static StoreService createStoreService(String host, int port) throws 
IOException {
-    InetSocketAddress address = new 
InetSocketAddress(InetAddress.getByName(host), port);
-    return RPC.getProxy(
-        StoreService.class, StoreService.versionID, address, new 
Configuration());
-  }
-
-  public static RegistryService createRegistryService(String host, int port) 
throws IOException {
-    InetSocketAddress address = new 
InetSocketAddress(InetAddress.getByName(host), port);
-    return RPC.getProxy(
-        RegistryService.class, RegistryService.versionID, address, new 
Configuration());
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
deleted file mode 100644
index 48dec79..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.rpc.model.BaseResponse;
-import org.apache.carbondata.store.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-@InterfaceAudience.Internal
-public interface StoreService extends VersionedProtocol {
-
-  long versionID = 1L;
-
-  BaseResponse loadData(LoadDataRequest request);
-
-  QueryResponse query(QueryRequest request);
-
-  ShutdownResponse shutdown(ShutdownRequest request);
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
deleted file mode 100644
index 45e2dce..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapDistributable;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * This is a special RecordReader that leverages FGDataMap before reading 
carbondata file
- * and return CarbonRow object
- */
-public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
-
-  private static final LogService LOG =
-      LogServiceFactory.getLogService(RequestHandler.class.getName());
-
-  private int queryId;
-  private CarbonTable table;
-
-  public IndexedRecordReader(int queryId, CarbonTable table, QueryModel 
queryModel) {
-    super(queryModel, new CarbonRowReadSupport());
-    this.queryId = queryId;
-    this.table = table;
-  }
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit;
-    List<CarbonInputSplit> splits =  mbSplit.getAllSplits();
-    List<TableBlockInfo> list = 
CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
-    queryModel.setTableBlockInfos(list);
-
-    // prune the block with FGDataMap is there is one based on the filter 
condition
-    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
-        queryModel.getFilterExpressionResolverTree());
-    if (fgDataMap != null) {
-      queryModel = prune(table, queryModel, mbSplit, fgDataMap);
-    } else {
-      List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splits);
-      queryModel.setTableBlockInfos(tableBlockInfoList);
-    }
-
-    readSupport.initialize(queryModel.getProjectionColumns(), 
queryModel.getTable());
-    try {
-      carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
-    } catch (QueryExecutionException e) {
-      throw new InterruptedException(e.getMessage());
-    }
-  }
-
-  private DataMapExprWrapper chooseFGDataMap(
-      CarbonTable table,
-      FilterResolverIntf filterInterface) {
-    DataMapChooser chooser = null;
-    try {
-      chooser = new DataMapChooser(table);
-      return chooser.chooseFGDataMap(filterInterface);
-    } catch (IOException e) {
-      LOG.error(e);
-      return null;
-    }
-  }
-
-  /**
-   * If there is FGDataMap defined for this table and filter condition in the 
query,
-   * prune the splits by the DataMap and set the pruned split into the 
QueryModel and return
-   */
-  private QueryModel prune(CarbonTable table, QueryModel queryModel,
-      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws 
IOException {
-    Objects.requireNonNull(datamap);
-    List<Segment> segments = new LinkedList<>();
-    HashMap<String, Integer> uniqueSegments = new HashMap<>();
-    LoadMetadataDetails[] loadMetadataDetails =
-        SegmentStatusManager.readLoadMetadata(
-            CarbonTablePath.getMetadataPath(table.getTablePath()));
-    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      String segmentId = Segment.getSegment(split.getSegmentId(), 
loadMetadataDetails).toString();
-      if (uniqueSegments.get(segmentId) == null) {
-        segments.add(Segment.toSegment(segmentId,
-            new 
TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
-                loadMetadataDetails)));
-        uniqueSegments.put(segmentId, 1);
-      } else {
-        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
-      }
-    }
-
-    List<DataMapDistributableWrapper> distributables = 
datamap.toDistributable(segments);
-    List<ExtendedBlocklet> prunnedBlocklets = new 
LinkedList<ExtendedBlocklet>();
-    for (int i = 0; i < distributables.size(); i++) {
-      DataMapDistributable dataMapDistributable = 
distributables.get(i).getDistributable();
-      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
-    }
-
-    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
-    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
-      pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
-    }
-
-    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
-    List<TableBlockInfo> blockToRead = new LinkedList<>();
-    for (TableBlockInfo block : blocks) {
-      if (pathToRead.keySet().contains(block.getFilePath())) {
-        // If not set this, it won't create FineGrainBlocklet object in
-        // 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
-        
block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
-        blockToRead.add(block);
-      }
-    }
-    LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned 
blocks: %d", queryId,
-        blockToRead.size()));
-    queryModel.setTableBlockInfos(blockToRead);
-    return queryModel;
-  }
-
-  @Override public void close() throws IOException {
-    logStatistics(rowCount, queryModel.getStatisticsRecorder());
-    // clear dictionary cache
-    Map<String, Dictionary> columnToDictionaryMapping = 
queryModel.getColumnToDictionaryMapping();
-    if (null != columnToDictionaryMapping) {
-      for (Map.Entry<String, Dictionary> entry : 
columnToDictionaryMapping.entrySet()) {
-        CarbonUtil.clearDictionaryCache(entry.getValue());
-      }
-    }
-
-    // close read support
-    readSupport.close();
-    try {
-      queryExecutor.finish();
-    } catch (QueryExecutionException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
deleted file mode 100644
index 03f9b2c..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.master.Master;
-import org.apache.carbondata.store.rpc.RegistryService;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-
-@InterfaceAudience.Internal
-public class RegistryServiceImpl implements RegistryService {
-
-  private Master master;
-
-  public RegistryServiceImpl(Master master) {
-    this.master = master;
-  }
-
-  @Override
-  public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) 
throws IOException {
-    return master.addWorker(request);
-  }
-
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
-    return versionID;
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
-      int clientMethodsHash) throws IOException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
deleted file mode 100644
index 3b98019..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import 
org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import 
org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import 
org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.conf.StoreConf;
-import org.apache.carbondata.store.rpc.model.BaseResponse;
-import org.apache.carbondata.store.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.rpc.model.ShutdownResponse;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * It handles request from master.
- */
-@InterfaceAudience.Internal
-class RequestHandler {
-
-  private StoreConf conf;
-  private Configuration hadoopConf;
-
-  public RequestHandler(StoreConf conf, Configuration hadoopConf) {
-    this.conf = conf;
-    this.hadoopConf = hadoopConf;
-  }
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RequestHandler.class.getName());
-
-  QueryResponse handleSearch(QueryRequest request) {
-    try {
-      LOGGER.info(String.format("[QueryId:%d] receive search request", 
request.getRequestId()));
-      List<CarbonRow> rows = handleRequest(request);
-      LOGGER.info(String.format("[QueryId:%d] sending success response", 
request.getRequestId()));
-      return createSuccessResponse(request, rows);
-    } catch (IOException e) {
-      LOGGER.error(e);
-      LOGGER.info(String.format("[QueryId:%d] sending failure response", 
request.getRequestId()));
-      return createFailureResponse(request, e);
-    }
-  }
-
-  ShutdownResponse handleShutdown(ShutdownRequest request) {
-    LOGGER.info("Shutting down worker...");
-    SearchModeDetailQueryExecutor.shutdownThreadPool();
-    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
-    LOGGER.info("Worker shut down");
-    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
-  }
-
-  /**
-   * Builds {@link QueryModel} and read data from files
-   */
-  private List<CarbonRow> handleRequest(QueryRequest request) throws 
IOException {
-    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
-    carbonTaskInfo.setTaskId(System.nanoTime());
-    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
-    CarbonMultiBlockSplit mbSplit = request.getSplit();
-    long limit = request.getLimit();
-    TableInfo tableInfo = request.getTableInfo();
-    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
-    QueryModel queryModel = createQueryModel(table, request);
-
-    LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", 
request.getRequestId(),
-        queryModel.toString(), mbSplit.getAllSplits().size()));
-
-    // read all rows by the reader
-    List<CarbonRow> rows = new LinkedList<>();
-    try (CarbonRecordReader<CarbonRow> reader = new 
IndexedRecordReader(request.getRequestId(),
-        table, queryModel)) {
-      reader.initialize(mbSplit, null);
-
-      // loop to read required number of rows.
-      // By default, if user does not specify the limit value, limit is 
Long.MaxValue
-      long rowCount = 0;
-      while (reader.nextKeyValue() && rowCount < limit) {
-        rows.add(reader.getCurrentValue());
-        rowCount++;
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-    LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
-        request.getRequestId(), rows.size()));
-    return rows;
-  }
-
-  private QueryModel createQueryModel(CarbonTable table, QueryRequest request) 
{
-    String[] projectColumns = request.getProjectColumns();
-    Expression filter = null;
-    if (request.getFilterExpression() != null) {
-      filter = request.getFilterExpression();
-    }
-    return new 
QueryModelBuilder(table).projectColumns(projectColumns).filterExpression(filter)
-        .build();
-  }
-
-  /**
-   * create a failure response
-   */
-  private QueryResponse createFailureResponse(QueryRequest request, Throwable 
throwable) {
-    return new QueryResponse(request.getRequestId(), Status.FAILURE.ordinal(),
-        throwable.getMessage(), new Object[0][]);
-  }
-
-  /**
-   * create a success response with result rows
-   */
-  private QueryResponse createSuccessResponse(QueryRequest request, 
List<CarbonRow> rows) {
-    Iterator<CarbonRow> itor = rows.iterator();
-    Object[][] output = new Object[rows.size()][];
-    int i = 0;
-    while (itor.hasNext()) {
-      output[i++] = itor.next().getData();
-    }
-    return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), 
"", output);
-  }
-
-  public BaseResponse handleLoadData(LoadDataRequest request) {
-    DataLoadExecutor executor = null;
-    try {
-      CarbonLoadModel model = request.getModel();
-
-      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-      CarbonInputFormatUtil.createJobTrackerID(new Date());
-      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
-      Configuration configuration = new Configuration(hadoopConf);
-      StoreUtil.configureCSVInputFormat(configuration, model);
-      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
-      // Set up the attempt context required to use in the output committer.
-      TaskAttemptContext hadoopAttemptContext =
-          new TaskAttemptContextImpl(configuration, taskAttemptId);
-
-      CSVInputFormat format = new CSVInputFormat();
-      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
-
-      CarbonIterator<Object[]>[] readerIterators = new 
CSVRecordReaderIterator[splits.size()];
-      for (int index = 0; index < splits.size(); index++) {
-        readerIterators[index] = new CSVRecordReaderIterator(
-            format.createRecordReader(splits.get(index), 
hadoopAttemptContext), splits.get(index),
-            hadoopAttemptContext);
-      }
-
-      executor = new DataLoadExecutor();
-      executor.execute(model, conf.storeTempLocation(), readerIterators);
-
-      return new BaseResponse(Status.SUCCESS.ordinal(), "");
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to handle load data");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } catch (InterruptedException e) {
-      LOGGER.error(e, "Interrupted handle load data ");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } catch (Exception e) {
-      LOGGER.error(e, "Failed to execute load data ");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } finally {
-      if (executor != null) {
-        executor.close();
-        
StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
deleted file mode 100644
index 9bcd397..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Status of RPC response
- */
-@InterfaceAudience.Internal
-public enum Status {
-  SUCCESS, FAILURE
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
deleted file mode 100644
index ac3b199..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.impl;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.rpc.StoreService;
-import org.apache.carbondata.store.rpc.model.BaseResponse;
-import org.apache.carbondata.store.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-import org.apache.carbondata.store.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.rpc.model.ShutdownResponse;
-import org.apache.carbondata.store.worker.Worker;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-
-@InterfaceAudience.Internal
-public class StoreServiceImpl implements StoreService {
-
-  private Worker worker;
-  RequestHandler handler;
-
-  public StoreServiceImpl(Worker worker) {
-    this.worker = worker;
-    this.handler = new RequestHandler(worker.getConf(), 
worker.getHadoopConf());
-  }
-
-  @Override
-  public BaseResponse loadData(LoadDataRequest request) {
-    return handler.handleLoadData(request);
-  }
-
-  @Override
-  public QueryResponse query(QueryRequest request) {
-    return handler.handleSearch(request);
-  }
-
-  @Override
-  public ShutdownResponse shutdown(ShutdownRequest request) {
-    return handler.handleShutdown(request);
-  }
-
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
-    return versionID;
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
-      int clientMethodsHash) throws IOException {
-    return null;
-  }
-
-  public Worker getWorker() {
-    return worker;
-  }
-
-  public void setWorker(Worker worker) {
-    this.worker = worker;
-  }
-}

Reply via email to