http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
deleted file mode 100644
index d826b32..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class BaseResponse implements Serializable, Writable {
-  private int status;
-  private String message;
-
-  public BaseResponse() {
-  }
-
-  public BaseResponse(int status, String message) {
-    this.status = status;
-    this.message = message;
-  }
-
-  public int getStatus() {
-    return status;
-  }
-
-  public void setStatus(int status) {
-    this.status = status;
-  }
-
-  public String getMessage() {
-    return message;
-  }
-
-  public void setMessage(String message) {
-    this.message = message;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(status);
-    out.writeUTF(message);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    status = in.readInt();
-    message = in.readUTF();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
deleted file mode 100644
index e79fad2..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-public class LoadDataRequest implements Serializable, Writable {
-
-  private CarbonLoadModel model;
-
-  public LoadDataRequest() {
-  }
-
-  public LoadDataRequest(CarbonLoadModel model) {
-    this.model = model;
-  }
-
-  public CarbonLoadModel getModel() {
-    return model;
-  }
-
-  public void setModel(CarbonLoadModel model) {
-    this.model = model;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model));
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    byte[] bytes = WritableUtils.readCompressedByteArray(in);
-    model = (CarbonLoadModel) StoreUtil.deserialize(bytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
deleted file mode 100644
index 27dc38b..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.util.ObjectSerializationUtil;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class QueryRequest implements Serializable, Writable {
-  private int requestId;
-  private CarbonMultiBlockSplit split;
-  private TableInfo tableInfo;
-  private String[] projectColumns;
-  private Expression filterExpression;
-  private long limit;
-
-  public QueryRequest() {
-  }
-
-  public QueryRequest(int requestId, CarbonMultiBlockSplit split,
-      TableInfo tableInfo, String[] projectColumns, Expression 
filterExpression, long limit) {
-    this.requestId = requestId;
-    this.split = split;
-    this.tableInfo = tableInfo;
-    this.projectColumns = projectColumns;
-    this.filterExpression = filterExpression;
-    this.limit = limit;
-  }
-
-  public int getRequestId() {
-    return requestId;
-  }
-
-  public CarbonMultiBlockSplit getSplit() {
-    return split;
-  }
-
-  public TableInfo getTableInfo() {
-    return tableInfo;
-  }
-
-  public String[] getProjectColumns() {
-    return projectColumns;
-  }
-
-  public Expression getFilterExpression() {
-    return filterExpression;
-  }
-
-  public long getLimit() {
-    return limit;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(requestId);
-    split.write(out);
-    tableInfo.write(out);
-    out.writeInt(projectColumns.length);
-    for (String projectColumn : projectColumns) {
-      out.writeUTF(projectColumn);
-    }
-    String filter = 
ObjectSerializationUtil.convertObjectToString(filterExpression);
-    out.writeUTF(filter);
-    out.writeLong(limit);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    requestId = in.readInt();
-    split = new CarbonMultiBlockSplit();
-    split.readFields(in);
-    tableInfo = new TableInfo();
-    tableInfo.readFields(in);
-    projectColumns = new String[in.readInt()];
-    for (int i = 0; i < projectColumns.length; i++) {
-      projectColumns[i] = in.readUTF();
-    }
-    String filter = in.readUTF();
-    filterExpression = (Expression) 
ObjectSerializationUtil.convertStringToObject(filter);
-    limit = in.readLong();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
deleted file mode 100644
index 7ad9210..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.util.ObjectSerializationUtil;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-@InterfaceAudience.Internal
-public class QueryResponse extends BaseResponse implements Serializable, 
Writable {
-  private int queryId;
-  private Object[][] rows;
-
-  public QueryResponse() {
-    super();
-  }
-
-  public QueryResponse(int queryId, int status, String message, Object[][] 
rows) {
-    super(status, message);
-    this.queryId = queryId;
-    this.rows = rows;
-  }
-
-  public int getQueryId() {
-    return queryId;
-  }
-
-
-  public Object[][] getRows() {
-    return rows;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    out.writeInt(queryId);
-    WritableUtils.writeCompressedByteArray(out, 
ObjectSerializationUtil.serialize(rows));
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    queryId = in.readInt();
-    try {
-      rows = (Object[][])ObjectSerializationUtil.deserialize(
-          WritableUtils.readCompressedByteArray(in));
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
deleted file mode 100644
index 2131e3b..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class RegisterWorkerRequest implements Serializable, Writable {
-  private String hostAddress;
-  private int port;
-  private int cores;
-
-  public RegisterWorkerRequest() {
-  }
-
-  public RegisterWorkerRequest(String hostAddress, int port, int cores) {
-    this.hostAddress = hostAddress;
-    this.port = port;
-    this.cores = cores;
-  }
-
-  public String getHostAddress() {
-    return hostAddress;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public int getCores() {
-    return cores;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeUTF(hostAddress);
-    out.writeInt(port);
-    out.writeInt(cores);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    hostAddress = in.readUTF();
-    port = in.readInt();
-    cores = in.readInt();
-  }
-
-  @Override public String toString() {
-    return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + 
", port=" + port + '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
deleted file mode 100644
index 8465c90..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class RegisterWorkerResponse implements Serializable, Writable {
-
-  private String workerId;
-
-  public RegisterWorkerResponse() {
-  }
-
-  public RegisterWorkerResponse(String workerId) {
-    this.workerId = workerId;
-  }
-
-  public String getWorkerId() {
-    return workerId;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeUTF(workerId);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    workerId = in.readUTF();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
deleted file mode 100644
index 7a25944..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class ShutdownRequest implements Serializable, Writable {
-  private String reason;
-
-  public ShutdownRequest() {
-  }
-
-  public ShutdownRequest(String reason) {
-    this.reason = reason;
-  }
-
-  public String getReason() {
-    return reason;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeUTF(reason);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    reason = in.readUTF();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
deleted file mode 100644
index f6f329f..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.rpc.model;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-
-@InterfaceAudience.Internal
-public class ShutdownResponse implements Serializable, Writable {
-  private int status;
-  private String message;
-
-  public ShutdownResponse() {
-  }
-
-  public ShutdownResponse(int status, String message) {
-    this.status = status;
-    this.message = message;
-  }
-
-  public int getStatus() {
-    return status;
-  }
-
-  public String getMessage() {
-    return message;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(status);
-    out.writeUTF(message);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    status = in.readInt();
-    message = in.readUTF();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
deleted file mode 100644
index 65d0786..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.scheduler;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.carbondata.store.rpc.StoreService;
-
-public class Schedulable {
-
-  private String id;
-  private String address;
-  private int port;
-  private int cores;
-  public StoreService service;
-  public AtomicInteger workload;
-
-  public Schedulable(String id, String address, int port, int cores, 
StoreService service) {
-    this.id = id;
-    this.address = address;
-    this.port = port;
-    this.cores = cores;
-    this.service = service;
-    this.workload = new AtomicInteger();
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-
-  public String getAddress() {
-    return address;
-  }
-
-  public void setAddress(String address) {
-    this.address = address;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public void setPort(int port) {
-    this.port = port;
-  }
-
-  int getCores() {
-    return cores;
-  }
-
-  @Override public String toString() {
-    return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + 
'\'' + ", port=" + port
-        + '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java 
b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
deleted file mode 100644
index 1b4cdde..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.scheduler;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.store.exception.WorkerTooBusyException;
-import org.apache.carbondata.store.rpc.model.BaseResponse;
-import org.apache.carbondata.store.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.rpc.model.QueryRequest;
-import org.apache.carbondata.store.rpc.model.QueryResponse;
-
-/**
- * [[Master]] uses Scheduler to pick a Worker to send request
- */
-public class Scheduler {
-
-  private static LogService LOGGER = 
LogServiceFactory.getLogService(Scheduler.class.getName());
-
-  // mapping of worker IP address to worker instance
-  private Map<String, Schedulable> ipMapWorker = new HashMap<>();
-  private List<Schedulable> workers = new ArrayList<>();
-  private AtomicLong index = new AtomicLong(0);
-  private ExecutorService executors = Executors.newCachedThreadPool();
-
-  /**
-   * Pick a Worker according to the address and workload of the Worker
-   * Invoke the RPC and return Future result
-   */
-  public Future<QueryResponse> sendRequestAsync(final Schedulable worker,
-      final QueryRequest request) {
-
-    LOGGER.info("sending search request to worker " + worker);
-    worker.workload.incrementAndGet();
-    return executors.submit(new Callable<QueryResponse>() {
-      @Override public QueryResponse call() {
-        return worker.service.query(request);
-      }
-    });
-  }
-
-  public BaseResponse sendRequest(final Schedulable worker,
-      final LoadDataRequest request) {
-
-    LOGGER.info("sending load data request to worker " + worker);
-    worker.workload.incrementAndGet();
-    return worker.service.loadData(request);
-  }
-
-  public Schedulable pickWorker(String splitAddress) {
-    Schedulable worker = ipMapWorker.get(splitAddress);
-    // no local worker available, choose one worker randomly
-    if (worker == null) {
-      worker = pickNexWorker();
-    }
-    // check whether worker exceed max workload, if exceeded, pick next worker
-    int maxWorkload = 
CarbonProperties.getMaxWorkloadForWorker(worker.getCores());
-    int numTry = workers.size();
-    do {
-      if (worker.workload.get() >= maxWorkload) {
-        LOGGER.info("worker " + worker + " reach limit, re-select worker...");
-        worker = pickNexWorker();
-        numTry = numTry - 1;
-      } else {
-        numTry = -1;
-      }
-    } while (numTry > 0);
-    if (numTry == 0) {
-      // tried so many times and still not able to find Worker
-      throw new WorkerTooBusyException(
-          "All workers are busy, number of workers: " + workers.size() + ", 
workload limit:"
-              + maxWorkload);
-    }
-
-    return worker;
-  }
-
-  public Schedulable pickNexWorker() {
-    return workers.get((int) (index.get() % workers.size()));
-  }
-
-  /**
-   * A new searcher is trying to register, add it to the map and connect to 
this searcher
-   */
-  public void addWorker(Schedulable schedulable) {
-    workers.add(schedulable);
-    ipMapWorker.put(schedulable.getAddress(), schedulable);
-  }
-
-  public void removeWorker(String address) {
-    Schedulable schedulable = ipMapWorker.get(address);
-    if (schedulable != null) {
-      ipMapWorker.remove(address);
-      workers.remove(schedulable);
-    }
-  }
-
-  public List<Schedulable> getAllWorkers() {
-    return workers;
-  }
-
-  public List<String> getAllWorkerAddresses() {
-    List<String> addresses = new ArrayList<>(workers.size());
-    for (Schedulable worker : workers) {
-      addresses.add(worker.getAddress());
-    }
-    return addresses;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java 
b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
index fba3413..775669f 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
@@ -27,6 +27,7 @@ import java.io.ObjectOutputStream;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -35,13 +36,14 @@ import 
org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.store.api.conf.StoreConf;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.PropertyConfigurator;
 
+@InterfaceAudience.Internal
 public class StoreUtil {
 
   private static LogService LOGGER = 
LogServiceFactory.getLogService(StoreUtil.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java 
b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
deleted file mode 100644
index 6fa2191..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.worker;
-
-import java.io.IOException;
-import java.net.BindException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.store.conf.StoreConf;
-import org.apache.carbondata.store.rpc.RegistryService;
-import org.apache.carbondata.store.rpc.ServiceFactory;
-import org.apache.carbondata.store.rpc.StoreService;
-import org.apache.carbondata.store.rpc.impl.StoreServiceImpl;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-
-public class Worker {
-
-  private static LogService LOGGER = 
LogServiceFactory.getLogService(Worker.class.getName());
-
-  private String id;
-  private RegistryService registry;
-  private StoreConf conf;
-  private Configuration hadoopConf;
-  private RPC.Server server;
-
-  public Worker(StoreConf conf) {
-    this.conf = conf;
-    this.hadoopConf = this.conf.newHadoopConf();
-  }
-
-  public void start() {
-    try {
-      startService();
-      registerToMaster();
-    } catch (IOException e) {
-      LOGGER.error(e, "worker failed to start");
-    }
-  }
-
-  private void startService() throws IOException {
-    BindException exception;
-    // we will try to create service at worse case 100 times
-    int numTry = 100;
-    int coreNum = conf.workerCoreNum();
-    String host = conf.workerHost();
-    int port = conf.workerPort();
-    StoreService queryService = new StoreServiceImpl(this);
-    do {
-      try {
-        server = new RPC.Builder(hadoopConf)
-            .setNumHandlers(coreNum)
-            .setBindAddress(host)
-            .setPort(port)
-            .setProtocol(StoreService.class)
-            .setInstance(queryService)
-            .build();
-        server.start();
-
-        numTry = 0;
-        exception = null;
-      } catch (BindException e) {
-        // port is occupied, increase the port number and try again
-        exception = e;
-        port = port + 1;
-        numTry = numTry - 1;
-      }
-    } while (numTry > 0);
-
-    if (exception != null) {
-      // we have tried many times, but still failed to find an available port
-      LOGGER.error(exception, "worker failed to start");
-      throw exception;
-    }
-
-    conf.conf(StoreConf.WORKER_PORT, port);
-    LOGGER.info("worker started on " + host + ":" + port + " successfully");
-
-  }
-
-  public void stop() {
-    try {
-      stopService();
-    } catch (InterruptedException e) {
-      LOGGER.error(e, "worker failed to start");
-    }
-  }
-
-  private void stopService() throws InterruptedException {
-    if (server != null) {
-      server.stop();
-      server.join();
-      server = null;
-    }
-  }
-
-  private void registerToMaster() throws IOException {
-
-    LOGGER.info("trying to register to master " + conf.masterHost() + ":" + 
conf.masterPort());
-    if (registry == null) {
-      registry = ServiceFactory.createRegistryService(conf.masterHost(), 
conf.masterPort());
-    }
-    RegisterWorkerRequest request =
-        new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), 
conf.workerCoreNum());
-    try {
-      RegisterWorkerResponse response = registry.registerWorker(request);
-      id = response.getWorkerId();
-    } catch (Throwable throwable) {
-      LOGGER.error(throwable, "worker failed to register");
-      throw new IOException(throwable);
-    }
-
-    LOGGER.info("worker " + id + " registered successfully");
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public static void main(String[] args) {
-    if (args.length != 2) {
-      System.err.println("Usage: Worker <log4j file> <properties file>");
-      return;
-    }
-
-    StoreUtil.initLog4j(args[0]);
-    Worker worker = new Worker(new StoreConf(args[1]));
-    worker.start();
-  }
-
-  public StoreConf getConf() {
-    return conf;
-  }
-
-  public void setConf(StoreConf conf) {
-    this.conf = conf;
-  }
-
-  public Configuration getHadoopConf() {
-    return hadoopConf;
-  }
-
-  public void setHadoopConf(Configuration hadoopConf) {
-    this.hadoopConf = hadoopConf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
 
b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
new file mode 100644
index 0000000..2448660
--- /dev/null
+++ 
b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.store.api.CarbonStore;
+import org.apache.carbondata.store.api.CarbonStoreFactory;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.impl.worker.Worker;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DistributedCarbonStoreTest {
+
+  private static String projectFolder;
+  private static CarbonStore store;
+
+  @BeforeClass
+  public static void beforeAll() throws IOException, StoreException {
+    projectFolder = new 
File(DistributedCarbonStoreTest.class.getResource("/").getPath() +
+        "../../../../").getCanonicalPath();
+    String confFile = projectFolder + "/store/conf/store.conf";
+    StoreConf storeConf = new StoreConf(confFile);
+
+    store = 
CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf);
+    projectFolder = new 
File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../")
+        .getCanonicalPath();
+
+    // start worker
+    Worker worker = new Worker(storeConf);
+    worker.start();
+  }
+
+  @AfterClass
+  public static void afterAll() throws IOException {
+    store.close();
+  }
+
+  @Before
+  public void cleanFile() {
+    assert (TestUtil.cleanMdtFile());
+  }
+
+  @After
+  public void verifyDMFile() {
+    assert (!TestUtil.verifyMdtFile());
+  }
+
+  @Test
+  public void testSelect() throws IOException, StoreException {
+    TableIdentifier tableIdentifier = new TableIdentifier("table_1", 
"default");
+    store.dropTable(tableIdentifier);
+    TableDescriptor table = TableDescriptor
+        .builder()
+        .ifNotExists()
+        .table(tableIdentifier)
+        .comment("first table")
+        .column("shortField", DataTypes.SHORT, "short field")
+        .column("intField", DataTypes.INT, "int field")
+        .column("bigintField", DataTypes.LONG, "long field")
+        .column("doubleField", DataTypes.DOUBLE, "double field")
+        .column("stringField", DataTypes.STRING, "string field")
+        .column("timestampField", DataTypes.TIMESTAMP, "timestamp field")
+        .column("decimalField", DataTypes.createDecimalType(18, 2), "decimal 
field")
+        .column("dateField", DataTypes.DATE, "date field")
+        .column("charField", DataTypes.STRING, "char field")
+        .column("floatField", DataTypes.DOUBLE, "float field")
+        .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
+        .create();
+    store.createTable(table);
+
+    // load one segment
+    LoadDescriptor load = LoadDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .overwrite(false)
+        .inputPath(projectFolder + "/store/core/src/test/resources/data1.csv")
+        .options("header", "true")
+        .create();
+    store.loadData(load);
+
+    // select row
+    SelectDescriptor select = SelectDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .select("intField", "stringField")
+        .limit(5)
+        .create();
+    List<CarbonRow> result = store.select(select);
+    Assert.assertEquals(5, result.size());
+
+    // select row with filter
+    SelectDescriptor select2 = SelectDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .select("intField", "stringField")
+        .filter(new EqualToExpression(
+            new ColumnExpression("intField", DataTypes.INT),
+            new LiteralExpression(11, DataTypes.INT)))
+        .limit(5)
+        .create();
+    List<CarbonRow> result2 = store.select(select2);
+    Assert.assertEquals(1, result2.size());
+
+    store.dropTable(tableIdentifier);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
 
b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
index c885a26..420c8cf 100644
--- 
a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ 
b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -19,20 +19,50 @@ package org.apache.carbondata.store;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
+import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.sdk.file.TestUtil;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.store.api.CarbonStore;
+import org.apache.carbondata.store.api.CarbonStoreFactory;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class LocalCarbonStoreTest {
+
+  private static String projectFolder;
+  private static CarbonStore store;
+
+  @BeforeClass
+  public static void setup() throws IOException, StoreException {
+    StoreConf conf = new StoreConf("test", "./");
+    conf.conf(StoreConf.STORE_TEMP_LOCATION, "./temp");
+    store = CarbonStoreFactory.getLocalStore("LocalCarbonStoreTest", conf);
+    projectFolder = new 
File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../")
+        .getCanonicalPath();
+  }
+
+  @AfterClass
+  public static void afterAll() throws IOException {
+    store.close();
+  }
+
   @Before
   public void cleanFile() {
     assert (TestUtil.cleanMdtFile());
@@ -43,30 +73,63 @@ public class LocalCarbonStoreTest {
     assert (!TestUtil.verifyMdtFile());
   }
 
-  // TODO: complete this testcase
-  // Currently result rows are empty, because SDK is not writing table status 
file
-  // so that reader does not find any segment.
-  // Complete this testcase after flat folder reader is done.
   @Test
-  public void testWriteAndReadFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
+  public void testWriteAndReadFiles() throws IOException, StoreException {
+    TableIdentifier tableIdentifier = new TableIdentifier("table_1", 
"default");
+    store.dropTable(tableIdentifier);
+    TableDescriptor table = TableDescriptor
+        .builder()
+        .ifNotExists()
+        .table(tableIdentifier)
+        .comment("first table")
+        .column("shortField", DataTypes.SHORT, "short field")
+        .column("intField", DataTypes.INT, "int field")
+        .column("bigintField", DataTypes.LONG, "long field")
+        .column("doubleField", DataTypes.DOUBLE, "double field")
+        .column("stringField", DataTypes.STRING, "string field")
+        .column("timestampField", DataTypes.TIMESTAMP, "timestamp field")
+        .column("decimalField", DataTypes.createDecimalType(18, 2), "decimal 
field")
+        .column("dateField", DataTypes.DATE, "date field")
+        .column("charField", DataTypes.STRING, "char field")
+        .column("floatField", DataTypes.DOUBLE, "float field")
+        .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
+        .create();
+    store.createTable(table);
 
-    TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
+    // load one segment
+    LoadDescriptor load = LoadDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .overwrite(false)
+        .inputPath(projectFolder + "/store/core/src/test/resources/data1.csv")
+        .options("header", "true")
+        .create();
+    store.loadData(load);
 
-    CarbonStore store = new LocalCarbonStore();
-    Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, 
null);
+    // select row
+    SelectDescriptor select = SelectDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .select("intField", "stringField")
+        .limit(5)
+        .create();
+    List<CarbonRow> result = store.select(select);
+    Assert.assertEquals(5, result.size());
 
-    while (rows.hasNext()) {
-      CarbonRow row = rows.next();
-      System.out.println(row.toString());
-    }
+    // select row with filter
+    SelectDescriptor select2 = SelectDescriptor
+        .builder()
+        .table(tableIdentifier)
+        .select("intField", "stringField")
+        .filter(new EqualToExpression(
+            new ColumnExpression("intField", DataTypes.INT),
+            new LiteralExpression(11, DataTypes.INT)))
+        .limit(5)
+        .create();
+    List<CarbonRow> result2 = store.select(select2);
+    Assert.assertEquals(1, result2.size());
 
-    FileUtils.deleteDirectory(new File(path));
+    store.dropTable(tableIdentifier);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java 
b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
index 9b9aa9e..f73591c 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.sdk.file;
+package org.apache.carbondata.store;
 
 import java.io.File;
 import java.io.FileFilter;
@@ -26,104 +26,16 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Schema;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.util.StoreUtil;
 
 import org.junit.Assert;
 
 public class TestUtil {
 
-  static void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
-  static void writeFilesAndVerify(Schema schema, String path, String[] 
sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true);
-  }
-
-  public static void writeFilesAndVerify(int rows, Schema schema, String path, 
boolean persistSchema) {
-    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true);
-  }
-
-  public static void writeFilesAndVerify(Schema schema, String path, boolean 
persistSchema,
-      boolean isTransactionalTable) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, 
isTransactionalTable);
-  }
-
-  /**
-   * write file and verify
-   *
-   * @param rows                 number of rows
-   * @param schema               schema
-   * @param path                 table store path
-   * @param persistSchema        whether persist schema
-   * @param isTransactionalTable whether is transactional table
-   */
-  public static void writeFilesAndVerify(int rows, Schema schema, String path, 
boolean persistSchema,
-      boolean isTransactionalTable) {
-    writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, 
isTransactionalTable);
-  }
-
-  /**
-   * Invoke CarbonWriter API to write carbon files and assert the file is 
rewritten
-   * @param rows number of rows to write
-   * @param schema schema of the file
-   * @param path local write path
-   * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
-   * @param blockletSize blockletSize in the file, -1 for default size
-   * @param blockSize blockSize in the file, -1 for default size
-   * @param isTransactionalTable set to true if this is written for 
Transactional Table.
-   */
-  static void writeFilesAndVerify(int rows, Schema schema, String path, 
String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize, boolean 
isTransactionalTable) {
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .isTransactionalTable(isTransactionalTable)
-          .outputPath(path);
-      if (sortColumns != null) {
-        builder = builder.sortBy(sortColumns);
-      }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
-      if (blockletSize != -1) {
-        builder = builder.withBlockletSize(blockletSize);
-      }
-      if (blockSize != -1) {
-        builder = builder.withBlockSize(blockSize);
-      }
-
-      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
-
-      for (int i = 0; i < rows; i++) {
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf((double) i / 2)});
-      }
-      writer.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } catch (InvalidLoadOptionException l) {
-      l.printStackTrace();
-      Assert.fail(l.getMessage());
-    }
-
-    File segmentFolder = null;
-    if (isTransactionalTable) {
-      segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-      Assert.assertTrue(segmentFolder.exists());
-    } else {
-      segmentFolder = new File(path);
-      Assert.assertTrue(segmentFolder.exists());
-    }
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return 
pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-  }
-
   /**
    * verify whether the file exists
    * if delete the file success or file not exists, then return true; 
otherwise return false
@@ -165,4 +77,5 @@ public class TestUtil {
       throw new RuntimeException("IO exception:", e);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/resources/data1.csv
----------------------------------------------------------------------
diff --git a/store/core/src/test/resources/data1.csv 
b/store/core/src/test/resources/data1.csv
new file mode 100644
index 0000000..cf732eb
--- /dev/null
+++ b/store/core/src/test/resources/data1.csv
@@ -0,0 +1,11 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField
+1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5
+5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5
+1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5
+1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5
+1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5
+3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5
+2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5
+1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5
+4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5
+1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/pom.xml
----------------------------------------------------------------------
diff --git a/store/horizon/pom.xml b/store/horizon/pom.xml
index 3665e53..7ae848a 100644
--- a/store/horizon/pom.xml
+++ b/store/horizon/pom.xml
@@ -47,21 +47,11 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.antlr</groupId>
-      <artifactId>antlr4-runtime</artifactId>
-      <version>4.7.1</version>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <version>${spring.version}</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/anltr/Expression.g4
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/anltr/Expression.g4 
b/store/horizon/src/main/anltr/Expression.g4
deleted file mode 100644
index 81688cd..0000000
--- a/store/horizon/src/main/anltr/Expression.g4
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * copy from SqlBase.g4 of Presto and Spark.
- */
-
-grammar Expression;
-
-parseFilter
- : booleanExpression EOF
- ;
-
-booleanExpression
-    : predicate
-    | left=booleanExpression operator=AND right=booleanExpression
-    | left=booleanExpression operator=OR right=booleanExpression
-    | '(' booleanExpression ')'
-    ;
-
-predicate
-    : left=primaryExpression comparisonOperator right=primaryExpression
-    | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND 
upper=primaryExpression
-    | left=primaryExpression NOT? IN '(' primaryExpression (',' 
primaryExpression)* ')'
-    | left=primaryExpression IS NOT? NULL
-    ;
-
-primaryExpression
-    : constant                                           #constantDefault
-    | identifier                                         #columnReference
-    | base=identifier '.' fieldName=identifier           #dereference
-    | '(' booleanExpression ')'                          
#parenthesizedExpression
-    ;
-
-constant
-    : NULL                                               #nullLiteral
-    | number                                             #numericLiteral
-    | booleanValue                                       #booleanLiteral
-    | STRING+                                            #stringLiteral
-    ;
-
-identifier
-    : IDENTIFIER                                         #unquotedIdentifier
-    | BACKQUOTED_IDENTIFIER                              #backQuotedIdentifier
-    ;
-
-comparisonOperator
-    : EQ | NEQ | LT | LTE | GT | GTE
-    ;
-
-booleanValue
-    : TRUE | FALSE
-    ;
-
-number
-    : MINUS? DECIMAL_VALUE                               #decimalLiteral
-    | MINUS? INTEGER_VALUE                               #integerLiteral
-    | MINUS? BIGINT_LITERAL                              #bigIntLiteral
-    | MINUS? SMALLINT_LITERAL                            #smallIntLiteral
-    | MINUS? TINYINT_LITERAL                             #tinyIntLiteral
-    | MINUS? DOUBLE_LITERAL                              #doubleLiteral
-    | MINUS? BIGDECIMAL_LITERAL                          #bigDecimalLiteral
-    ;
-
-AND: 'AND';
-BETWEEN: 'BETWEEN';
-FALSE: 'FALSE';
-IN: 'IN';
-IS: 'IS';
-NOT: 'NOT';
-NULL: 'NULL';
-OR: 'OR';
-TRUE: 'TRUE';
-
-EQ  : '=';
-NEQ : '<>' | '!=';
-LT  : '<';
-LTE : '<=';
-GT  : '>';
-GTE : '>=';
-
-MINUS: '-';
-
-STRING
-    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
-    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
-    ;
-
-BIGINT_LITERAL
-    : DIGIT+ 'L'
-    ;
-
-SMALLINT_LITERAL
-    : DIGIT+ 'S'
-    ;
-
-TINYINT_LITERAL
-    : DIGIT+ 'Y'
-    ;
-
-INTEGER_VALUE
-    : DIGIT+
-    ;
-
-DECIMAL_VALUE
-    : DIGIT+ EXPONENT
-    | DECIMAL_DIGITS EXPONENT?
-    ;
-
-DOUBLE_LITERAL
-    : DIGIT+ EXPONENT? 'D'
-    | DECIMAL_DIGITS EXPONENT? 'D'
-    ;
-
-BIGDECIMAL_LITERAL
-    : DIGIT+ EXPONENT? 'BD'
-    | DECIMAL_DIGITS EXPONENT? 'BD'
-    ;
-
-IDENTIFIER
-    : (LETTER | DIGIT | '_')+
-    ;
-
-BACKQUOTED_IDENTIFIER
-    : '`' ( ~'`' | '``' )* '`'
-    ;
-
-fragment DECIMAL_DIGITS
-    : DIGIT+ '.' DIGIT*
-    | '.' DIGIT+
-    ;
-
-fragment EXPONENT
-    : 'E' [+-]? DIGIT+
-    ;
-
-fragment DIGIT
-    : [0-9]
-    ;
-
-fragment LETTER
-    : [A-Z]
-    ;
-
-WS
-    : [ \r\n\t]+ -> channel(HIDDEN)
-    ;
-
-// Catch-all for anything we can't recognize.
-// We use this to be able to ignore and recover all the text
-// when splitting statements with DelimiterLexer
-UNRECOGNIZED
-    : .
-    ;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/antlr/Expression.g4
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/antlr/Expression.g4 
b/store/horizon/src/main/antlr/Expression.g4
new file mode 100644
index 0000000..81688cd
--- /dev/null
+++ b/store/horizon/src/main/antlr/Expression.g4
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * copy from SqlBase.g4 of Presto and Spark.
+ */
+
+grammar Expression;
+
+parseFilter
+ : booleanExpression EOF
+ ;
+
+booleanExpression
+    : predicate
+    | left=booleanExpression operator=AND right=booleanExpression
+    | left=booleanExpression operator=OR right=booleanExpression
+    | '(' booleanExpression ')'
+    ;
+
+predicate
+    : left=primaryExpression comparisonOperator right=primaryExpression
+    | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND 
upper=primaryExpression
+    | left=primaryExpression NOT? IN '(' primaryExpression (',' 
primaryExpression)* ')'
+    | left=primaryExpression IS NOT? NULL
+    ;
+
+primaryExpression
+    : constant                                           #constantDefault
+    | identifier                                         #columnReference
+    | base=identifier '.' fieldName=identifier           #dereference
+    | '(' booleanExpression ')'                          
#parenthesizedExpression
+    ;
+
+constant
+    : NULL                                               #nullLiteral
+    | number                                             #numericLiteral
+    | booleanValue                                       #booleanLiteral
+    | STRING+                                            #stringLiteral
+    ;
+
+identifier
+    : IDENTIFIER                                         #unquotedIdentifier
+    | BACKQUOTED_IDENTIFIER                              #backQuotedIdentifier
+    ;
+
+comparisonOperator
+    : EQ | NEQ | LT | LTE | GT | GTE
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+number
+    : MINUS? DECIMAL_VALUE                               #decimalLiteral
+    | MINUS? INTEGER_VALUE                               #integerLiteral
+    | MINUS? BIGINT_LITERAL                              #bigIntLiteral
+    | MINUS? SMALLINT_LITERAL                            #smallIntLiteral
+    | MINUS? TINYINT_LITERAL                             #tinyIntLiteral
+    | MINUS? DOUBLE_LITERAL                              #doubleLiteral
+    | MINUS? BIGDECIMAL_LITERAL                          #bigDecimalLiteral
+    ;
+
+AND: 'AND';
+BETWEEN: 'BETWEEN';
+FALSE: 'FALSE';
+IN: 'IN';
+IS: 'IS';
+NOT: 'NOT';
+NULL: 'NULL';
+OR: 'OR';
+TRUE: 'TRUE';
+
+EQ  : '=';
+NEQ : '<>' | '!=';
+LT  : '<';
+LTE : '<=';
+GT  : '>';
+GTE : '>=';
+
+MINUS: '-';
+
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : DIGIT+ 'L'
+    ;
+
+SMALLINT_LITERAL
+    : DIGIT+ 'S'
+    ;
+
+TINYINT_LITERAL
+    : DIGIT+ 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+DECIMAL_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D'
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD'
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+WS
+    : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+    : .
+    ;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java
new file mode 100644
index 0000000..da7bff7
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.antlr;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionLexer;
+import org.apache.carbondata.horizon.antlr.gen.ExpressionParser;
+
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+
+public class Parser {
+  public static Expression parseFilter(String filter, CarbonTable carbonTable) 
{
+    if (filter == null) {
+      return null;
+    }
+    CharStream input = new ANTLRNoCaseStringStream(filter);
+    ExpressionLexer lexer = new ExpressionLexer(input);
+    CommonTokenStream tokens = new CommonTokenStream(lexer);
+    ExpressionParser parser = new ExpressionParser(tokens);
+    ExpressionParser.ParseFilterContext tree = parser.parseFilter();
+    FilterVisitor visitor = new FilterVisitor(carbonTable);
+    return visitor.visitParseFilter(tree);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java
index e32ff07..d1f68de 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java
@@ -2,12 +2,9 @@
 package org.apache.carbondata.horizon.antlr.gen;
 import org.antlr.v4.runtime.Lexer;
 import org.antlr.v4.runtime.CharStream;
-import org.antlr.v4.runtime.Token;
-import org.antlr.v4.runtime.TokenStream;
 import org.antlr.v4.runtime.*;
 import org.antlr.v4.runtime.atn.*;
 import org.antlr.v4.runtime.dfa.DFA;
-import org.antlr.v4.runtime.misc.*;
 
 @SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"})
 public class ExpressionLexer extends Lexer {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java
index 08139eb..c4701af 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java
@@ -3,11 +3,8 @@ package org.apache.carbondata.horizon.antlr.gen;
 import org.antlr.v4.runtime.atn.*;
 import org.antlr.v4.runtime.dfa.DFA;
 import org.antlr.v4.runtime.*;
-import org.antlr.v4.runtime.misc.*;
 import org.antlr.v4.runtime.tree.*;
 import java.util.List;
-import java.util.Iterator;
-import java.util.ArrayList;
 
 @SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"})
 public class ExpressionParser extends Parser {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
new file mode 100644
index 0000000..eaa4583
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
+import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
+import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
+import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+/**
+ * Client to send REST request to Horizon service
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface HorizonClient extends Closeable {
+
+  /**
+   * Create a Table
+   * @param create descriptor for create table operation
+   * @throws IOException if network or disk IO error occurs
+   */
+  void createTable(CreateTableRequest create) throws IOException, 
StoreException;
+
+  /**
+   * Drop a Table, and remove all data in it
+   * @param table table identifier
+   * @throws IOException if network or disk IO error occurs
+   */
+  void dropTable(DropTableRequest drop) throws IOException;
+
+  /**
+   * Load data into a Table
+   * @param load descriptor for load operation
+   * @throws IOException if network or disk IO error occurs
+   */
+  void loadData(LoadRequest load) throws IOException, StoreException;
+
+  /**
+   * Scan a Table and return matched rows
+   * @param select descriptor for scan operation, including required column, 
filter, etc
+   * @return matched rows
+   * @throws IOException if network or disk IO error occurs
+   */
+  List<CarbonRow> select(SelectRequest select) throws IOException, 
StoreException;
+
+  /**
+   * Executor a SQL statement
+   * @param sqlString SQL statement
+   * @return matched rows
+   * @throws IOException if network or disk IO error occurs
+   */
+  List<CarbonRow> sql(String sqlString) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
new file mode 100644
index 0000000..ba86180
--- /dev/null
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.horizon.rest.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.horizon.rest.client.HorizonClient;
+import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
+import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
+import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
+import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
+import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+public class SimpleHorizonClient implements HorizonClient {
+
+  private RestTemplate restTemplate;
+  private String serviceUri;
+
+  public SimpleHorizonClient(String serviceUri) {
+    this.serviceUri = serviceUri;
+    this.restTemplate = new RestTemplate();
+  }
+
+  @Override
+  public void createTable(CreateTableRequest create) throws IOException, 
StoreException {
+    Objects.requireNonNull(create);
+    restTemplate.postForEntity(serviceUri + "/table/create", create, 
String.class);
+  }
+
+  @Override
+  public void dropTable(DropTableRequest drop) throws IOException {
+    Objects.requireNonNull(drop);
+    restTemplate.postForEntity(serviceUri + "/table/drop", drop, String.class);
+  }
+
+  @Override
+  public void loadData(LoadRequest load) throws IOException, StoreException {
+    Objects.requireNonNull(load);
+    restTemplate.postForEntity(serviceUri + "/table/load", load, String.class);
+  }
+
+  @Override
+  public List<CarbonRow> select(SelectRequest select) throws IOException, 
StoreException {
+    Objects.requireNonNull(select);
+    ResponseEntity<SelectResponse> response =
+        restTemplate.postForEntity(serviceUri + "/table/select", select, 
SelectResponse.class);
+    Object[][] rows = Objects.requireNonNull(response.getBody()).getRows();
+    List<CarbonRow> output = new ArrayList<>(rows.length);
+    for (Object[] row : rows) {
+      output.add(new CarbonRow(row));
+    }
+    return output;
+  }
+
+  @Override
+  public List<CarbonRow> sql(String sqlString) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
index 1f6f485..d7b7c5e 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
@@ -27,10 +27,14 @@ public class Horizon {
   private static ConfigurableApplicationContext context;
 
   public static void main(String[] args) {
+    start(args);
+  }
+
+  public static void start(String[] args) {
     context = SpringApplication.run(Horizon.class, args);
   }
 
-  public static void close() {
+  public static void stop() {
     SpringApplication.exit(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
index 2089c1a..33ea50b 100644
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
+++ 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
@@ -16,21 +16,30 @@
  */
 package org.apache.carbondata.horizon.rest.controller;
 
-import java.util.UUID;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor;
-import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor;
-import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.horizon.antlr.Parser;
 import org.apache.carbondata.horizon.rest.model.validate.RequestValidator;
 import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
+import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
-import org.apache.carbondata.horizon.rest.service.HorizonService;
-import org.apache.carbondata.store.exception.StoreException;
+import org.apache.carbondata.store.api.CarbonStore;
+import org.apache.carbondata.store.api.CarbonStoreFactory;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
 
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -45,48 +54,63 @@ public class HorizonController {
   private static LogService LOGGER =
       LogServiceFactory.getLogService(HorizonController.class.getName());
 
-  private HorizonService service;
+  private CarbonStore store;
 
-  public HorizonController() {
-    service = HorizonService.getInstance();
+  public HorizonController() throws StoreException {
+    String storeFile = System.getProperty("carbonstore.conf.file");
+    store = CarbonStoreFactory.getDistributedStore("GlobalStore", new 
StoreConf(storeFile));
   }
 
   @RequestMapping(value = "/table/create", produces = 
MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<String> createTable(
-      @RequestBody CreateTableRequest request) throws StoreException {
+      @RequestBody CreateTableRequest request) throws StoreException, 
IOException {
     RequestValidator.validateTable(request);
     TableDescriptor tableDescriptor = request.convertToDto();
-    boolean result = service.createTable(tableDescriptor);
-    return new ResponseEntity<>(String.valueOf(result), HttpStatus.OK);
+    store.createTable(tableDescriptor);
+    return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK);
+  }
+
+  @RequestMapping(value = "/table/drop", produces = 
MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<String> dropTable(
+      @RequestBody DropTableRequest request) throws StoreException, 
IOException {
+    RequestValidator.validateDrop(request);
+    store.dropTable(new TableIdentifier(request.getTableName(), 
request.getDatabaseName()));
+    return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK);
   }
 
   @RequestMapping(value = "/table/load", produces = 
MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<String> load(@RequestBody LoadRequest request) throws 
StoreException {
+  public ResponseEntity<String> load(@RequestBody LoadRequest request)
+      throws StoreException, IOException {
     RequestValidator.validateLoad(request);
     LoadDescriptor loadDescriptor = request.convertToDto();
-    boolean result = service.loadData(loadDescriptor);
-    return new ResponseEntity<>(String.valueOf(result), HttpStatus.OK);
+    store.loadData(loadDescriptor);
+    return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK);
   }
 
-
   @RequestMapping(value = "/table/select", produces = 
MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<SelectResponse> select(@RequestBody SelectRequest 
request)
-      throws StoreException {
+      throws StoreException, IOException {
     long start = System.currentTimeMillis();
     RequestValidator.validateSelect(request);
-    SelectDescriptor selectDescriptor = request.convertToDto();
-    selectDescriptor.setId(UUID.randomUUID().toString());
-    CarbonRow[] result = service.select(selectDescriptor);
-    Object[][] newResult = new Object[result.length][];
-    for (int i = newResult.length - 1; i >= 0; i--) {
-      newResult[i] = result[i].getData();
+    TableIdentifier table = new TableIdentifier(request.getTableName(), 
request.getDatabaseName());
+    CarbonTable carbonTable = store.getTable(table);
+    Expression expression = Parser.parseFilter(request.getFilter(), 
carbonTable);
+    SelectDescriptor selectDescriptor = new SelectDescriptor(
+        table, request.getSelect(), expression, request.getLimit());
+    List<CarbonRow> result = store.select(selectDescriptor);
+    Iterator<CarbonRow> iterator = result.iterator();
+    Object[][] output = new Object[result.size()][];
+    int i = 0;
+    while (iterator.hasNext()) {
+      output[i] = (iterator.next().getData());
+      i++;
     }
     long end = System.currentTimeMillis();
-    LOGGER.audit("[" + selectDescriptor.getId() +  "] HorizonController select 
" +
+    LOGGER.audit("[" + request.getRequestId() +  "] HorizonController select " 
+
         request.getDatabaseName() + "." + request.getTableName() +
         ", take time: " + (end - start) + " ms");
-    return new ResponseEntity<>(
-        new SelectResponse(selectDescriptor.getId(), newResult), 
HttpStatus.OK);
+
+    return new ResponseEntity<>(new SelectResponse(request, output), 
HttpStatus.OK);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java
----------------------------------------------------------------------
diff --git 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java
 
b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java
deleted file mode 100644
index ec2c0f4..0000000
--- 
a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.horizon.rest.model.descriptor;
-
-import java.util.Map;
-
-public class LoadDescriptor {
-
-  private String databaseName;
-  private String tableName;
-  private String inputPath;
-  private Map<String, String> options;
-  private boolean isOverwrite;
-
-  public LoadDescriptor() {
-  }
-
-  public LoadDescriptor(String databaseName, String tableName, String 
inputPaths,
-      Map<String, String> options, boolean isOverwrite) {
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-    this.inputPath = inputPaths;
-    this.options = options;
-    this.isOverwrite = isOverwrite;
-  }
-
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public String getInputPath() {
-    return inputPath;
-  }
-
-  public void setInputPath(String inputPath) {
-    this.inputPath = inputPath;
-  }
-
-  public Map<String, String> getOptions() {
-    return options;
-  }
-
-  public void setOptions(Map<String, String> options) {
-    this.options = options;
-  }
-
-  public boolean isOverwrite() {
-    return isOverwrite;
-  }
-
-  public void setOverwrite(boolean overwrite) {
-    isOverwrite = overwrite;
-  }
-}

Reply via email to