http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java deleted file mode 100644 index d826b32..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class BaseResponse implements Serializable, Writable { - private int status; - private String message; - - public BaseResponse() { - } - - public BaseResponse(int status, String message) { - this.status = status; - this.message = message; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(status); - out.writeUTF(message); - } - - @Override - public void readFields(DataInput in) throws IOException { - status = in.readInt(); - message = in.readUTF(); - } -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java deleted file mode 100644 index e79fad2..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -public class LoadDataRequest implements Serializable, Writable { - - private CarbonLoadModel model; - - public LoadDataRequest() { - } - - public LoadDataRequest(CarbonLoadModel model) { - this.model = model; - } - - public CarbonLoadModel getModel() { - return model; - } - - public void setModel(CarbonLoadModel model) { - this.model = model; - } - - @Override - public void write(DataOutput out) throws IOException { - WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model)); - } - - @Override - public void readFields(DataInput in) throws IOException { - byte[] bytes = WritableUtils.readCompressedByteArray(in); - model = (CarbonLoadModel) StoreUtil.deserialize(bytes); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java deleted file mode 100644 index 27dc38b..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.util.ObjectSerializationUtil; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class QueryRequest implements Serializable, Writable { - private int requestId; - private CarbonMultiBlockSplit split; - private TableInfo tableInfo; - private String[] projectColumns; - private Expression filterExpression; - private long limit; - - public QueryRequest() { - } - - public QueryRequest(int requestId, CarbonMultiBlockSplit split, - TableInfo tableInfo, String[] projectColumns, Expression filterExpression, long limit) { - this.requestId = requestId; - this.split = split; - this.tableInfo = tableInfo; - this.projectColumns = projectColumns; - this.filterExpression = filterExpression; - this.limit = limit; - } - - public int getRequestId() { - return requestId; - } - - public CarbonMultiBlockSplit getSplit() { - return split; - } - - public TableInfo getTableInfo() { - return tableInfo; - } - - public String[] getProjectColumns() { - return projectColumns; - } - - public Expression getFilterExpression() { - return filterExpression; - } - - public long getLimit() { - return limit; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(requestId); - split.write(out); - tableInfo.write(out); - out.writeInt(projectColumns.length); - for (String projectColumn : projectColumns) { - out.writeUTF(projectColumn); - } - String filter = ObjectSerializationUtil.convertObjectToString(filterExpression); - out.writeUTF(filter); - out.writeLong(limit); - } - - @Override - public void readFields(DataInput in) throws IOException { - requestId = in.readInt(); - split = new CarbonMultiBlockSplit(); - split.readFields(in); - tableInfo = new TableInfo(); - tableInfo.readFields(in); - projectColumns = new String[in.readInt()]; - for (int i = 0; i < projectColumns.length; i++) { - projectColumns[i] = in.readUTF(); - } - String filter = in.readUTF(); - filterExpression = (Expression) ObjectSerializationUtil.convertStringToObject(filter); - limit = in.readLong(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java deleted file mode 100644 index 7ad9210..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.util.ObjectSerializationUtil; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -@InterfaceAudience.Internal -public class QueryResponse extends BaseResponse implements Serializable, Writable { - private int queryId; - private Object[][] rows; - - public QueryResponse() { - super(); - } - - public QueryResponse(int queryId, int status, String message, Object[][] rows) { - super(status, message); - this.queryId = queryId; - this.rows = rows; - } - - public int getQueryId() { - return queryId; - } - - - public Object[][] getRows() { - return rows; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeInt(queryId); - WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows)); - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - queryId = in.readInt(); - try { - rows = (Object[][])ObjectSerializationUtil.deserialize( - WritableUtils.readCompressedByteArray(in)); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java deleted file mode 100644 index 2131e3b..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class RegisterWorkerRequest implements Serializable, Writable { - private String hostAddress; - private int port; - private int cores; - - public RegisterWorkerRequest() { - } - - public RegisterWorkerRequest(String hostAddress, int port, int cores) { - this.hostAddress = hostAddress; - this.port = port; - this.cores = cores; - } - - public String getHostAddress() { - return hostAddress; - } - - public int getPort() { - return port; - } - - public int getCores() { - return cores; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(hostAddress); - out.writeInt(port); - out.writeInt(cores); - } - - @Override - public void readFields(DataInput in) throws IOException { - hostAddress = in.readUTF(); - port = in.readInt(); - cores = in.readInt(); - } - - @Override public String toString() { - return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + ", port=" + port + '}'; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java deleted file mode 100644 index 8465c90..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class RegisterWorkerResponse implements Serializable, Writable { - - private String workerId; - - public RegisterWorkerResponse() { - } - - public RegisterWorkerResponse(String workerId) { - this.workerId = workerId; - } - - public String getWorkerId() { - return workerId; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(workerId); - } - - @Override - public void readFields(DataInput in) throws IOException { - workerId = in.readUTF(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java deleted file mode 100644 index 7a25944..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class ShutdownRequest implements Serializable, Writable { - private String reason; - - public ShutdownRequest() { - } - - public ShutdownRequest(String reason) { - this.reason = reason; - } - - public String getReason() { - return reason; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(reason); - } - - @Override - public void readFields(DataInput in) throws IOException { - reason = in.readUTF(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java deleted file mode 100644 index f6f329f..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.model; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.io.Writable; - -@InterfaceAudience.Internal -public class ShutdownResponse implements Serializable, Writable { - private int status; - private String message; - - public ShutdownResponse() { - } - - public ShutdownResponse(int status, String message) { - this.status = status; - this.message = message; - } - - public int getStatus() { - return status; - } - - public String getMessage() { - return message; - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(status); - out.writeUTF(message); - } - - @Override - public void readFields(DataInput in) throws IOException { - status = in.readInt(); - message = in.readUTF(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java deleted file mode 100644 index 65d0786..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.scheduler; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.carbondata.store.rpc.StoreService; - -public class Schedulable { - - private String id; - private String address; - private int port; - private int cores; - public StoreService service; - public AtomicInteger workload; - - public Schedulable(String id, String address, int port, int cores, StoreService service) { - this.id = id; - this.address = address; - this.port = port; - this.cores = cores; - this.service = service; - this.workload = new AtomicInteger(); - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getAddress() { - return address; - } - - public void setAddress(String address) { - this.address = address; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - int getCores() { - return cores; - } - - @Override public String toString() { - return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port - + '}'; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java deleted file mode 100644 index 1b4cdde..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.scheduler; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.store.exception.WorkerTooBusyException; -import org.apache.carbondata.store.rpc.model.BaseResponse; -import org.apache.carbondata.store.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; - -/** - * [[Master]] uses Scheduler to pick a Worker to send request - */ -public class Scheduler { - - private static LogService LOGGER = LogServiceFactory.getLogService(Scheduler.class.getName()); - - // mapping of worker IP address to worker instance - private Map<String, Schedulable> ipMapWorker = new HashMap<>(); - private List<Schedulable> workers = new ArrayList<>(); - private AtomicLong index = new AtomicLong(0); - private ExecutorService executors = Executors.newCachedThreadPool(); - - /** - * Pick a Worker according to the address and workload of the Worker - * Invoke the RPC and return Future result - */ - public Future<QueryResponse> sendRequestAsync(final Schedulable worker, - final QueryRequest request) { - - LOGGER.info("sending search request to worker " + worker); - worker.workload.incrementAndGet(); - return executors.submit(new Callable<QueryResponse>() { - @Override public QueryResponse call() { - return worker.service.query(request); - } - }); - } - - public BaseResponse sendRequest(final Schedulable worker, - final LoadDataRequest request) { - - LOGGER.info("sending load data request to worker " + worker); - worker.workload.incrementAndGet(); - return worker.service.loadData(request); - } - - public Schedulable pickWorker(String splitAddress) { - Schedulable worker = ipMapWorker.get(splitAddress); - // no local worker available, choose one worker randomly - if (worker == null) { - worker = pickNexWorker(); - } - // check whether worker exceed max workload, if exceeded, pick next worker - int maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.getCores()); - int numTry = workers.size(); - do { - if (worker.workload.get() >= maxWorkload) { - LOGGER.info("worker " + worker + " reach limit, re-select worker..."); - worker = pickNexWorker(); - numTry = numTry - 1; - } else { - numTry = -1; - } - } while (numTry > 0); - if (numTry == 0) { - // tried so many times and still not able to find Worker - throw new WorkerTooBusyException( - "All workers are busy, number of workers: " + workers.size() + ", workload limit:" - + maxWorkload); - } - - return worker; - } - - public Schedulable pickNexWorker() { - return workers.get((int) (index.get() % workers.size())); - } - - /** - * A new searcher is trying to register, add it to the map and connect to this searcher - */ - public void addWorker(Schedulable schedulable) { - workers.add(schedulable); - ipMapWorker.put(schedulable.getAddress(), schedulable); - } - - public void removeWorker(String address) { - Schedulable schedulable = ipMapWorker.get(address); - if (schedulable != null) { - ipMapWorker.remove(address); - workers.remove(schedulable); - } - } - - public List<Schedulable> getAllWorkers() { - return workers; - } - - public List<String> getAllWorkerAddresses() { - List<String> addresses = new ArrayList<>(workers.size()); - for (Schedulable worker : workers) { - addresses.add(worker.getAddress()); - } - return addresses; - } -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java index fba3413..775669f 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java +++ b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java @@ -27,6 +27,7 @@ import java.io.ObjectOutputStream; import java.util.Map; import java.util.Properties; +import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -35,13 +36,14 @@ import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.store.conf.StoreConf; +import org.apache.carbondata.store.api.conf.StoreConf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.PropertyConfigurator; +@InterfaceAudience.Internal public class StoreUtil { private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java deleted file mode 100644 index 6fa2191..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.worker; - -import java.io.IOException; -import java.net.BindException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.store.conf.StoreConf; -import org.apache.carbondata.store.rpc.RegistryService; -import org.apache.carbondata.store.rpc.ServiceFactory; -import org.apache.carbondata.store.rpc.StoreService; -import org.apache.carbondata.store.rpc.impl.StoreServiceImpl; -import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; - -public class Worker { - - private static LogService LOGGER = LogServiceFactory.getLogService(Worker.class.getName()); - - private String id; - private RegistryService registry; - private StoreConf conf; - private Configuration hadoopConf; - private RPC.Server server; - - public Worker(StoreConf conf) { - this.conf = conf; - this.hadoopConf = this.conf.newHadoopConf(); - } - - public void start() { - try { - startService(); - registerToMaster(); - } catch (IOException e) { - LOGGER.error(e, "worker failed to start"); - } - } - - private void startService() throws IOException { - BindException exception; - // we will try to create service at worse case 100 times - int numTry = 100; - int coreNum = conf.workerCoreNum(); - String host = conf.workerHost(); - int port = conf.workerPort(); - StoreService queryService = new StoreServiceImpl(this); - do { - try { - server = new RPC.Builder(hadoopConf) - .setNumHandlers(coreNum) - .setBindAddress(host) - .setPort(port) - .setProtocol(StoreService.class) - .setInstance(queryService) - .build(); - server.start(); - - numTry = 0; - exception = null; - } catch (BindException e) { - // port is occupied, increase the port number and try again - exception = e; - port = port + 1; - numTry = numTry - 1; - } - } while (numTry > 0); - - if (exception != null) { - // we have tried many times, but still failed to find an available port - LOGGER.error(exception, "worker failed to start"); - throw exception; - } - - conf.conf(StoreConf.WORKER_PORT, port); - LOGGER.info("worker started on " + host + ":" + port + " successfully"); - - } - - public void stop() { - try { - stopService(); - } catch (InterruptedException e) { - LOGGER.error(e, "worker failed to start"); - } - } - - private void stopService() throws InterruptedException { - if (server != null) { - server.stop(); - server.join(); - server = null; - } - } - - private void registerToMaster() throws IOException { - - LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort()); - if (registry == null) { - registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort()); - } - RegisterWorkerRequest request = - new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum()); - try { - RegisterWorkerResponse response = registry.registerWorker(request); - id = response.getWorkerId(); - } catch (Throwable throwable) { - LOGGER.error(throwable, "worker failed to register"); - throw new IOException(throwable); - } - - LOGGER.info("worker " + id + " registered successfully"); - } - - public String getId() { - return id; - } - - public static void main(String[] args) { - if (args.length != 2) { - System.err.println("Usage: Worker <log4j file> <properties file>"); - return; - } - - StoreUtil.initLog4j(args[0]); - Worker worker = new Worker(new StoreConf(args[1])); - worker.start(); - } - - public StoreConf getConf() { - return conf; - } - - public void setConf(StoreConf conf) { - this.conf = conf; - } - - public Configuration getHadoopConf() { - return hadoopConf; - } - - public void setHadoopConf(Configuration hadoopConf) { - this.hadoopConf = hadoopConf; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java new file mode 100644 index 0000000..2448660 --- /dev/null +++ b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.store.api.CarbonStore; +import org.apache.carbondata.store.api.CarbonStoreFactory; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.impl.worker.Worker; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class DistributedCarbonStoreTest { + + private static String projectFolder; + private static CarbonStore store; + + @BeforeClass + public static void beforeAll() throws IOException, StoreException { + projectFolder = new File(DistributedCarbonStoreTest.class.getResource("/").getPath() + + "../../../../").getCanonicalPath(); + String confFile = projectFolder + "/store/conf/store.conf"; + StoreConf storeConf = new StoreConf(confFile); + + store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf); + projectFolder = new File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../") + .getCanonicalPath(); + + // start worker + Worker worker = new Worker(storeConf); + worker.start(); + } + + @AfterClass + public static void afterAll() throws IOException { + store.close(); + } + + @Before + public void cleanFile() { + assert (TestUtil.cleanMdtFile()); + } + + @After + public void verifyDMFile() { + assert (!TestUtil.verifyMdtFile()); + } + + @Test + public void testSelect() throws IOException, StoreException { + TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default"); + store.dropTable(tableIdentifier); + TableDescriptor table = TableDescriptor + .builder() + .ifNotExists() + .table(tableIdentifier) + .comment("first table") + .column("shortField", DataTypes.SHORT, "short field") + .column("intField", DataTypes.INT, "int field") + .column("bigintField", DataTypes.LONG, "long field") + .column("doubleField", DataTypes.DOUBLE, "double field") + .column("stringField", DataTypes.STRING, "string field") + .column("timestampField", DataTypes.TIMESTAMP, "timestamp field") + .column("decimalField", DataTypes.createDecimalType(18, 2), "decimal field") + .column("dateField", DataTypes.DATE, "date field") + .column("charField", DataTypes.STRING, "char field") + .column("floatField", DataTypes.DOUBLE, "float field") + .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") + .create(); + store.createTable(table); + + // load one segment + LoadDescriptor load = LoadDescriptor + .builder() + .table(tableIdentifier) + .overwrite(false) + .inputPath(projectFolder + "/store/core/src/test/resources/data1.csv") + .options("header", "true") + .create(); + store.loadData(load); + + // select row + SelectDescriptor select = SelectDescriptor + .builder() + .table(tableIdentifier) + .select("intField", "stringField") + .limit(5) + .create(); + List<CarbonRow> result = store.select(select); + Assert.assertEquals(5, result.size()); + + // select row with filter + SelectDescriptor select2 = SelectDescriptor + .builder() + .table(tableIdentifier) + .select("intField", "stringField") + .filter(new EqualToExpression( + new ColumnExpression("intField", DataTypes.INT), + new LiteralExpression(11, DataTypes.INT))) + .limit(5) + .create(); + List<CarbonRow> result2 = store.select(select2); + Assert.assertEquals(1, result2.size()); + + store.dropTable(tableIdentifier); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java index c885a26..420c8cf 100644 --- a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java +++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java @@ -19,20 +19,50 @@ package org.apache.carbondata.store; import java.io.File; import java.io.IOException; -import java.util.Iterator; +import java.util.List; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.sdk.file.Field; -import org.apache.carbondata.sdk.file.Schema; -import org.apache.carbondata.sdk.file.TestUtil; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.store.api.CarbonStore; +import org.apache.carbondata.store.api.CarbonStoreFactory; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; -import org.apache.commons.io.FileUtils; import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class LocalCarbonStoreTest { + + private static String projectFolder; + private static CarbonStore store; + + @BeforeClass + public static void setup() throws IOException, StoreException { + StoreConf conf = new StoreConf("test", "./"); + conf.conf(StoreConf.STORE_TEMP_LOCATION, "./temp"); + store = CarbonStoreFactory.getLocalStore("LocalCarbonStoreTest", conf); + projectFolder = new File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../") + .getCanonicalPath(); + } + + @AfterClass + public static void afterAll() throws IOException { + store.close(); + } + @Before public void cleanFile() { assert (TestUtil.cleanMdtFile()); @@ -43,30 +73,63 @@ public class LocalCarbonStoreTest { assert (!TestUtil.verifyMdtFile()); } - // TODO: complete this testcase - // Currently result rows are empty, because SDK is not writing table status file - // so that reader does not find any segment. - // Complete this testcase after flat folder reader is done. @Test - public void testWriteAndReadFiles() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); + public void testWriteAndReadFiles() throws IOException, StoreException { + TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default"); + store.dropTable(tableIdentifier); + TableDescriptor table = TableDescriptor + .builder() + .ifNotExists() + .table(tableIdentifier) + .comment("first table") + .column("shortField", DataTypes.SHORT, "short field") + .column("intField", DataTypes.INT, "int field") + .column("bigintField", DataTypes.LONG, "long field") + .column("doubleField", DataTypes.DOUBLE, "double field") + .column("stringField", DataTypes.STRING, "string field") + .column("timestampField", DataTypes.TIMESTAMP, "timestamp field") + .column("decimalField", DataTypes.createDecimalType(18, 2), "decimal field") + .column("dateField", DataTypes.DATE, "date field") + .column("charField", DataTypes.STRING, "char field") + .column("floatField", DataTypes.DOUBLE, "float field") + .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") + .create(); + store.createTable(table); - TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true); + // load one segment + LoadDescriptor load = LoadDescriptor + .builder() + .table(tableIdentifier) + .overwrite(false) + .inputPath(projectFolder + "/store/core/src/test/resources/data1.csv") + .options("header", "true") + .create(); + store.loadData(load); - CarbonStore store = new LocalCarbonStore(); - Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null); + // select row + SelectDescriptor select = SelectDescriptor + .builder() + .table(tableIdentifier) + .select("intField", "stringField") + .limit(5) + .create(); + List<CarbonRow> result = store.select(select); + Assert.assertEquals(5, result.size()); - while (rows.hasNext()) { - CarbonRow row = rows.next(); - System.out.println(row.toString()); - } + // select row with filter + SelectDescriptor select2 = SelectDescriptor + .builder() + .table(tableIdentifier) + .select("intField", "stringField") + .filter(new EqualToExpression( + new ColumnExpression("intField", DataTypes.INT), + new LiteralExpression(11, DataTypes.INT))) + .limit(5) + .create(); + List<CarbonRow> result2 = store.select(select2); + Assert.assertEquals(1, result2.size()); - FileUtils.deleteDirectory(new File(path)); + store.dropTable(tableIdentifier); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java index 9b9aa9e..f73591c 100644 --- a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java +++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.carbondata.sdk.file; +package org.apache.carbondata.store; import java.io.File; import java.io.FileFilter; @@ -26,104 +26,16 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.sdk.file.CarbonWriter; +import org.apache.carbondata.sdk.file.CarbonWriterBuilder; +import org.apache.carbondata.sdk.file.Schema; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.util.StoreUtil; import org.junit.Assert; public class TestUtil { - static void writeFilesAndVerify(Schema schema, String path) { - writeFilesAndVerify(schema, path, null); - } - - static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { - writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true); - } - - public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema) { - writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true); - } - - public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema, - boolean isTransactionalTable) { - writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable); - } - - /** - * write file and verify - * - * @param rows number of rows - * @param schema schema - * @param path table store path - * @param persistSchema whether persist schema - * @param isTransactionalTable whether is transactional table - */ - public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema, - boolean isTransactionalTable) { - writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable); - } - - /** - * Invoke CarbonWriter API to write carbon files and assert the file is rewritten - * @param rows number of rows to write - * @param schema schema of the file - * @param path local write path - * @param sortColumns sort columns - * @param persistSchema true if want to persist schema file - * @param blockletSize blockletSize in the file, -1 for default size - * @param blockSize blockSize in the file, -1 for default size - * @param isTransactionalTable set to true if this is written for Transactional Table. - */ - static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) { - try { - CarbonWriterBuilder builder = CarbonWriter.builder() - .isTransactionalTable(isTransactionalTable) - .outputPath(path); - if (sortColumns != null) { - builder = builder.sortBy(sortColumns); - } - if (persistSchema) { - builder = builder.persistSchemaFile(true); - } - if (blockletSize != -1) { - builder = builder.withBlockletSize(blockletSize); - } - if (blockSize != -1) { - builder = builder.withBlockSize(blockSize); - } - - CarbonWriter writer = builder.buildWriterForCSVInput(schema); - - for (int i = 0; i < rows; i++) { - writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); - } - writer.close(); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } catch (InvalidLoadOptionException l) { - l.printStackTrace(); - Assert.fail(l.getMessage()); - } - - File segmentFolder = null; - if (isTransactionalTable) { - segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); - } else { - segmentFolder = new File(path); - Assert.assertTrue(segmentFolder.exists()); - } - - File[] dataFiles = segmentFolder.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); - } - }); - Assert.assertNotNull(dataFiles); - Assert.assertTrue(dataFiles.length > 0); - } - /** * verify whether the file exists * if delete the file success or file not exists, then return true; otherwise return false @@ -165,4 +77,5 @@ public class TestUtil { throw new RuntimeException("IO exception:", e); } } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/test/resources/data1.csv ---------------------------------------------------------------------- diff --git a/store/core/src/test/resources/data1.csv b/store/core/src/test/resources/data1.csv new file mode 100644 index 0000000..cf732eb --- /dev/null +++ b/store/core/src/test/resources/data1.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField +1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5 +5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5 +1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5 +1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5 +1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5 +3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5 +2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5 +1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5 +4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5 +1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5 http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/pom.xml ---------------------------------------------------------------------- diff --git a/store/horizon/pom.xml b/store/horizon/pom.xml index 3665e53..7ae848a 100644 --- a/store/horizon/pom.xml +++ b/store/horizon/pom.xml @@ -47,21 +47,11 @@ </exclusions> </dependency> <dependency> - <groupId>org.antlr</groupId> - <artifactId>antlr4-runtime</artifactId> - <version>4.7.1</version> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>${spring.version}</version> http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/anltr/Expression.g4 ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/anltr/Expression.g4 b/store/horizon/src/main/anltr/Expression.g4 deleted file mode 100644 index 81688cd..0000000 --- a/store/horizon/src/main/anltr/Expression.g4 +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * copy from SqlBase.g4 of Presto and Spark. - */ - -grammar Expression; - -parseFilter - : booleanExpression EOF - ; - -booleanExpression - : predicate - | left=booleanExpression operator=AND right=booleanExpression - | left=booleanExpression operator=OR right=booleanExpression - | '(' booleanExpression ')' - ; - -predicate - : left=primaryExpression comparisonOperator right=primaryExpression - | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND upper=primaryExpression - | left=primaryExpression NOT? IN '(' primaryExpression (',' primaryExpression)* ')' - | left=primaryExpression IS NOT? NULL - ; - -primaryExpression - : constant #constantDefault - | identifier #columnReference - | base=identifier '.' fieldName=identifier #dereference - | '(' booleanExpression ')' #parenthesizedExpression - ; - -constant - : NULL #nullLiteral - | number #numericLiteral - | booleanValue #booleanLiteral - | STRING+ #stringLiteral - ; - -identifier - : IDENTIFIER #unquotedIdentifier - | BACKQUOTED_IDENTIFIER #backQuotedIdentifier - ; - -comparisonOperator - : EQ | NEQ | LT | LTE | GT | GTE - ; - -booleanValue - : TRUE | FALSE - ; - -number - : MINUS? DECIMAL_VALUE #decimalLiteral - | MINUS? INTEGER_VALUE #integerLiteral - | MINUS? BIGINT_LITERAL #bigIntLiteral - | MINUS? SMALLINT_LITERAL #smallIntLiteral - | MINUS? TINYINT_LITERAL #tinyIntLiteral - | MINUS? DOUBLE_LITERAL #doubleLiteral - | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral - ; - -AND: 'AND'; -BETWEEN: 'BETWEEN'; -FALSE: 'FALSE'; -IN: 'IN'; -IS: 'IS'; -NOT: 'NOT'; -NULL: 'NULL'; -OR: 'OR'; -TRUE: 'TRUE'; - -EQ : '='; -NEQ : '<>' | '!='; -LT : '<'; -LTE : '<='; -GT : '>'; -GTE : '>='; - -MINUS: '-'; - -STRING - : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' - | '"' ( ~('"'|'\\') | ('\\' .) )* '"' - ; - -BIGINT_LITERAL - : DIGIT+ 'L' - ; - -SMALLINT_LITERAL - : DIGIT+ 'S' - ; - -TINYINT_LITERAL - : DIGIT+ 'Y' - ; - -INTEGER_VALUE - : DIGIT+ - ; - -DECIMAL_VALUE - : DIGIT+ EXPONENT - | DECIMAL_DIGITS EXPONENT? - ; - -DOUBLE_LITERAL - : DIGIT+ EXPONENT? 'D' - | DECIMAL_DIGITS EXPONENT? 'D' - ; - -BIGDECIMAL_LITERAL - : DIGIT+ EXPONENT? 'BD' - | DECIMAL_DIGITS EXPONENT? 'BD' - ; - -IDENTIFIER - : (LETTER | DIGIT | '_')+ - ; - -BACKQUOTED_IDENTIFIER - : '`' ( ~'`' | '``' )* '`' - ; - -fragment DECIMAL_DIGITS - : DIGIT+ '.' DIGIT* - | '.' DIGIT+ - ; - -fragment EXPONENT - : 'E' [+-]? DIGIT+ - ; - -fragment DIGIT - : [0-9] - ; - -fragment LETTER - : [A-Z] - ; - -WS - : [ \r\n\t]+ -> channel(HIDDEN) - ; - -// Catch-all for anything we can't recognize. -// We use this to be able to ignore and recover all the text -// when splitting statements with DelimiterLexer -UNRECOGNIZED - : . - ; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/antlr/Expression.g4 ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/antlr/Expression.g4 b/store/horizon/src/main/antlr/Expression.g4 new file mode 100644 index 0000000..81688cd --- /dev/null +++ b/store/horizon/src/main/antlr/Expression.g4 @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * copy from SqlBase.g4 of Presto and Spark. + */ + +grammar Expression; + +parseFilter + : booleanExpression EOF + ; + +booleanExpression + : predicate + | left=booleanExpression operator=AND right=booleanExpression + | left=booleanExpression operator=OR right=booleanExpression + | '(' booleanExpression ')' + ; + +predicate + : left=primaryExpression comparisonOperator right=primaryExpression + | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND upper=primaryExpression + | left=primaryExpression NOT? IN '(' primaryExpression (',' primaryExpression)* ')' + | left=primaryExpression IS NOT? NULL + ; + +primaryExpression + : constant #constantDefault + | identifier #columnReference + | base=identifier '.' fieldName=identifier #dereference + | '(' booleanExpression ')' #parenthesizedExpression + ; + +constant + : NULL #nullLiteral + | number #numericLiteral + | booleanValue #booleanLiteral + | STRING+ #stringLiteral + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | BACKQUOTED_IDENTIFIER #backQuotedIdentifier + ; + +comparisonOperator + : EQ | NEQ | LT | LTE | GT | GTE + ; + +booleanValue + : TRUE | FALSE + ; + +number + : MINUS? DECIMAL_VALUE #decimalLiteral + | MINUS? INTEGER_VALUE #integerLiteral + | MINUS? BIGINT_LITERAL #bigIntLiteral + | MINUS? SMALLINT_LITERAL #smallIntLiteral + | MINUS? TINYINT_LITERAL #tinyIntLiteral + | MINUS? DOUBLE_LITERAL #doubleLiteral + | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral + ; + +AND: 'AND'; +BETWEEN: 'BETWEEN'; +FALSE: 'FALSE'; +IN: 'IN'; +IS: 'IS'; +NOT: 'NOT'; +NULL: 'NULL'; +OR: 'OR'; +TRUE: 'TRUE'; + +EQ : '='; +NEQ : '<>' | '!='; +LT : '<'; +LTE : '<='; +GT : '>'; +GTE : '>='; + +MINUS: '-'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java new file mode 100644 index 0000000..da7bff7 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/Parser.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.antlr; + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.horizon.antlr.gen.ExpressionLexer; +import org.apache.carbondata.horizon.antlr.gen.ExpressionParser; + +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CommonTokenStream; + +public class Parser { + public static Expression parseFilter(String filter, CarbonTable carbonTable) { + if (filter == null) { + return null; + } + CharStream input = new ANTLRNoCaseStringStream(filter); + ExpressionLexer lexer = new ExpressionLexer(input); + CommonTokenStream tokens = new CommonTokenStream(lexer); + ExpressionParser parser = new ExpressionParser(tokens); + ExpressionParser.ParseFilterContext tree = parser.parseFilter(); + FilterVisitor visitor = new FilterVisitor(carbonTable); + return visitor.visitParseFilter(tree); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java index e32ff07..d1f68de 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionLexer.java @@ -2,12 +2,9 @@ package org.apache.carbondata.horizon.antlr.gen; import org.antlr.v4.runtime.Lexer; import org.antlr.v4.runtime.CharStream; -import org.antlr.v4.runtime.Token; -import org.antlr.v4.runtime.TokenStream; import org.antlr.v4.runtime.*; import org.antlr.v4.runtime.atn.*; import org.antlr.v4.runtime.dfa.DFA; -import org.antlr.v4.runtime.misc.*; @SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"}) public class ExpressionLexer extends Lexer { http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java index 08139eb..c4701af 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionParser.java @@ -3,11 +3,8 @@ package org.apache.carbondata.horizon.antlr.gen; import org.antlr.v4.runtime.atn.*; import org.antlr.v4.runtime.dfa.DFA; import org.antlr.v4.runtime.*; -import org.antlr.v4.runtime.misc.*; import org.antlr.v4.runtime.tree.*; import java.util.List; -import java.util.Iterator; -import java.util.ArrayList; @SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"}) public class ExpressionParser extends Parser { http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java new file mode 100644 index 0000000..eaa4583 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; +import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; +import org.apache.carbondata.horizon.rest.model.view.LoadRequest; +import org.apache.carbondata.horizon.rest.model.view.SelectRequest; +import org.apache.carbondata.store.api.exception.StoreException; + +/** + * Client to send REST request to Horizon service + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface HorizonClient extends Closeable { + + /** + * Create a Table + * @param create descriptor for create table operation + * @throws IOException if network or disk IO error occurs + */ + void createTable(CreateTableRequest create) throws IOException, StoreException; + + /** + * Drop a Table, and remove all data in it + * @param table table identifier + * @throws IOException if network or disk IO error occurs + */ + void dropTable(DropTableRequest drop) throws IOException; + + /** + * Load data into a Table + * @param load descriptor for load operation + * @throws IOException if network or disk IO error occurs + */ + void loadData(LoadRequest load) throws IOException, StoreException; + + /** + * Scan a Table and return matched rows + * @param select descriptor for scan operation, including required column, filter, etc + * @return matched rows + * @throws IOException if network or disk IO error occurs + */ + List<CarbonRow> select(SelectRequest select) throws IOException, StoreException; + + /** + * Executor a SQL statement + * @param sqlString SQL statement + * @return matched rows + * @throws IOException if network or disk IO error occurs + */ + List<CarbonRow> sql(String sqlString) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java new file mode 100644 index 0000000..ba86180 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.client.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.horizon.rest.client.HorizonClient; +import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; +import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; +import org.apache.carbondata.horizon.rest.model.view.LoadRequest; +import org.apache.carbondata.horizon.rest.model.view.SelectRequest; +import org.apache.carbondata.horizon.rest.model.view.SelectResponse; +import org.apache.carbondata.store.api.exception.StoreException; + +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; + +public class SimpleHorizonClient implements HorizonClient { + + private RestTemplate restTemplate; + private String serviceUri; + + public SimpleHorizonClient(String serviceUri) { + this.serviceUri = serviceUri; + this.restTemplate = new RestTemplate(); + } + + @Override + public void createTable(CreateTableRequest create) throws IOException, StoreException { + Objects.requireNonNull(create); + restTemplate.postForEntity(serviceUri + "/table/create", create, String.class); + } + + @Override + public void dropTable(DropTableRequest drop) throws IOException { + Objects.requireNonNull(drop); + restTemplate.postForEntity(serviceUri + "/table/drop", drop, String.class); + } + + @Override + public void loadData(LoadRequest load) throws IOException, StoreException { + Objects.requireNonNull(load); + restTemplate.postForEntity(serviceUri + "/table/load", load, String.class); + } + + @Override + public List<CarbonRow> select(SelectRequest select) throws IOException, StoreException { + Objects.requireNonNull(select); + ResponseEntity<SelectResponse> response = + restTemplate.postForEntity(serviceUri + "/table/select", select, SelectResponse.class); + Object[][] rows = Objects.requireNonNull(response.getBody()).getRows(); + List<CarbonRow> output = new ArrayList<>(rows.length); + for (Object[] row : rows) { + output.add(new CarbonRow(row)); + } + return output; + } + + @Override + public List<CarbonRow> sql(String sqlString) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java index 1f6f485..d7b7c5e 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java @@ -27,10 +27,14 @@ public class Horizon { private static ConfigurableApplicationContext context; public static void main(String[] args) { + start(args); + } + + public static void start(String[] args) { context = SpringApplication.run(Horizon.class, args); } - public static void close() { + public static void stop() { SpringApplication.exit(context); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java index 2089c1a..33ea50b 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java @@ -16,21 +16,30 @@ */ package org.apache.carbondata.horizon.rest.controller; -import java.util.UUID; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.horizon.rest.model.descriptor.LoadDescriptor; -import org.apache.carbondata.horizon.rest.model.descriptor.SelectDescriptor; -import org.apache.carbondata.horizon.rest.model.descriptor.TableDescriptor; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.horizon.antlr.Parser; import org.apache.carbondata.horizon.rest.model.validate.RequestValidator; import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; +import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; import org.apache.carbondata.horizon.rest.model.view.SelectResponse; -import org.apache.carbondata.horizon.rest.service.HorizonService; -import org.apache.carbondata.store.exception.StoreException; +import org.apache.carbondata.store.api.CarbonStore; +import org.apache.carbondata.store.api.CarbonStoreFactory; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -45,48 +54,63 @@ public class HorizonController { private static LogService LOGGER = LogServiceFactory.getLogService(HorizonController.class.getName()); - private HorizonService service; + private CarbonStore store; - public HorizonController() { - service = HorizonService.getInstance(); + public HorizonController() throws StoreException { + String storeFile = System.getProperty("carbonstore.conf.file"); + store = CarbonStoreFactory.getDistributedStore("GlobalStore", new StoreConf(storeFile)); } @RequestMapping(value = "/table/create", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<String> createTable( - @RequestBody CreateTableRequest request) throws StoreException { + @RequestBody CreateTableRequest request) throws StoreException, IOException { RequestValidator.validateTable(request); TableDescriptor tableDescriptor = request.convertToDto(); - boolean result = service.createTable(tableDescriptor); - return new ResponseEntity<>(String.valueOf(result), HttpStatus.OK); + store.createTable(tableDescriptor); + return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK); + } + + @RequestMapping(value = "/table/drop", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity<String> dropTable( + @RequestBody DropTableRequest request) throws StoreException, IOException { + RequestValidator.validateDrop(request); + store.dropTable(new TableIdentifier(request.getTableName(), request.getDatabaseName())); + return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK); } @RequestMapping(value = "/table/load", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<String> load(@RequestBody LoadRequest request) throws StoreException { + public ResponseEntity<String> load(@RequestBody LoadRequest request) + throws StoreException, IOException { RequestValidator.validateLoad(request); LoadDescriptor loadDescriptor = request.convertToDto(); - boolean result = service.loadData(loadDescriptor); - return new ResponseEntity<>(String.valueOf(result), HttpStatus.OK); + store.loadData(loadDescriptor); + return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK); } - @RequestMapping(value = "/table/select", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<SelectResponse> select(@RequestBody SelectRequest request) - throws StoreException { + throws StoreException, IOException { long start = System.currentTimeMillis(); RequestValidator.validateSelect(request); - SelectDescriptor selectDescriptor = request.convertToDto(); - selectDescriptor.setId(UUID.randomUUID().toString()); - CarbonRow[] result = service.select(selectDescriptor); - Object[][] newResult = new Object[result.length][]; - for (int i = newResult.length - 1; i >= 0; i--) { - newResult[i] = result[i].getData(); + TableIdentifier table = new TableIdentifier(request.getTableName(), request.getDatabaseName()); + CarbonTable carbonTable = store.getTable(table); + Expression expression = Parser.parseFilter(request.getFilter(), carbonTable); + SelectDescriptor selectDescriptor = new SelectDescriptor( + table, request.getSelect(), expression, request.getLimit()); + List<CarbonRow> result = store.select(selectDescriptor); + Iterator<CarbonRow> iterator = result.iterator(); + Object[][] output = new Object[result.size()][]; + int i = 0; + while (iterator.hasNext()) { + output[i] = (iterator.next().getData()); + i++; } long end = System.currentTimeMillis(); - LOGGER.audit("[" + selectDescriptor.getId() + "] HorizonController select " + + LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " + request.getDatabaseName() + "." + request.getTableName() + ", take time: " + (end - start) + " ms"); - return new ResponseEntity<>( - new SelectResponse(selectDescriptor.getId(), newResult), HttpStatus.OK); + + return new ResponseEntity<>(new SelectResponse(request, output), HttpStatus.OK); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java deleted file mode 100644 index ec2c0f4..0000000 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/descriptor/LoadDescriptor.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.horizon.rest.model.descriptor; - -import java.util.Map; - -public class LoadDescriptor { - - private String databaseName; - private String tableName; - private String inputPath; - private Map<String, String> options; - private boolean isOverwrite; - - public LoadDescriptor() { - } - - public LoadDescriptor(String databaseName, String tableName, String inputPaths, - Map<String, String> options, boolean isOverwrite) { - this.databaseName = databaseName; - this.tableName = tableName; - this.inputPath = inputPaths; - this.options = options; - this.isOverwrite = isOverwrite; - } - - public String getDatabaseName() { - return databaseName; - } - - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public String getInputPath() { - return inputPath; - } - - public void setInputPath(String inputPath) { - this.inputPath = inputPath; - } - - public Map<String, String> getOptions() { - return options; - } - - public void setOptions(Map<String, String> options) { - this.options = options; - } - - public boolean isOverwrite() { - return isOverwrite; - } - - public void setOverwrite(boolean overwrite) { - isOverwrite = overwrite; - } -}