http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java new file mode 100644 index 0000000..304fd0f --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/QueryResponse.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Internal +public class QueryResponse extends BaseResponse implements Serializable, Writable { + private int queryId; + private Object[][] rows; + + public QueryResponse() { + super(); + } + + public QueryResponse(int queryId, int status, String message, Object[][] rows) { + super(status, message); + this.queryId = queryId; + this.rows = rows; + } + + public int getQueryId() { + return queryId; + } + + + public Object[][] getRows() { + return rows; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(queryId); + WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows)); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + queryId = in.readInt(); + try { + rows = (Object[][])ObjectSerializationUtil.deserialize( + WritableUtils.readCompressedByteArray(in)); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java new file mode 100644 index 0000000..5f223d6 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class RegisterWorkerRequest implements Serializable, Writable { + private String hostAddress; + private int port; + private int cores; + + public RegisterWorkerRequest() { + } + + public RegisterWorkerRequest(String hostAddress, int port, int cores) { + this.hostAddress = hostAddress; + this.port = port; + this.cores = cores; + } + + public String getHostAddress() { + return hostAddress; + } + + public int getPort() { + return port; + } + + public int getCores() { + return cores; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(hostAddress); + out.writeInt(port); + out.writeInt(cores); + } + + @Override + public void readFields(DataInput in) throws IOException { + hostAddress = in.readUTF(); + port = in.readInt(); + cores = in.readInt(); + } + + @Override public String toString() { + return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + ", port=" + port + '}'; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java new file mode 100644 index 0000000..16915e9 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/RegisterWorkerResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class RegisterWorkerResponse implements Serializable, Writable { + + private String workerId; + + public RegisterWorkerResponse() { + } + + public RegisterWorkerResponse(String workerId) { + this.workerId = workerId; + } + + public String getWorkerId() { + return workerId; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(workerId); + } + + @Override + public void readFields(DataInput in) throws IOException { + workerId = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java new file mode 100644 index 0000000..86767e5 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/Scan.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class Scan implements Serializable, Writable { + private int requestId; + private CarbonMultiBlockSplit split; + private TableInfo tableInfo; + private String[] projectColumns; + private Expression filterExpression; + private long limit; + + public Scan() { + } + + public Scan(int requestId, CarbonMultiBlockSplit split, + TableInfo tableInfo, String[] projectColumns, Expression filterExpression, long limit) { + this.requestId = requestId; + this.split = split; + this.tableInfo = tableInfo; + this.projectColumns = projectColumns; + this.filterExpression = filterExpression; + this.limit = limit; + } + + public int getRequestId() { + return requestId; + } + + public CarbonMultiBlockSplit getSplit() { + return split; + } + + public TableInfo getTableInfo() { + return tableInfo; + } + + public String[] getProjectColumns() { + return projectColumns; + } + + public Expression getFilterExpression() { + return filterExpression; + } + + public long getLimit() { + return limit; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(requestId); + split.write(out); + tableInfo.write(out); + out.writeInt(projectColumns.length); + for (String projectColumn : projectColumns) { + out.writeUTF(projectColumn); + } + String filter = ObjectSerializationUtil.convertObjectToString(filterExpression); + out.writeUTF(filter); + out.writeLong(limit); + } + + @Override + public void readFields(DataInput in) throws IOException { + requestId = in.readInt(); + split = new CarbonMultiBlockSplit(); + split.readFields(in); + tableInfo = new TableInfo(); + tableInfo.readFields(in); + projectColumns = new String[in.readInt()]; + for (int i = 0; i < projectColumns.length; i++) { + projectColumns[i] = in.readUTF(); + } + String filter = in.readUTF(); + filterExpression = (Expression) ObjectSerializationUtil.convertStringToObject(filter); + limit = in.readLong(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java new file mode 100644 index 0000000..311963d --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class ShutdownRequest implements Serializable, Writable { + private String reason; + + public ShutdownRequest() { + } + + public ShutdownRequest(String reason) { + this.reason = reason; + } + + public String getReason() { + return reason; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(reason); + } + + @Override + public void readFields(DataInput in) throws IOException { + reason = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java new file mode 100644 index 0000000..0143f48 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/ShutdownResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class ShutdownResponse implements Serializable, Writable { + private int status; + private String message; + + public ShutdownResponse() { + } + + public ShutdownResponse(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(status); + out.writeUTF(message); + } + + @Override + public void readFields(DataInput in) throws IOException { + status = in.readInt(); + message = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java new file mode 100644 index 0000000..fd13b20 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.worker; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.impl.CarbonStoreBase; +import org.apache.carbondata.store.impl.Status; +import org.apache.carbondata.store.impl.rpc.model.BaseResponse; +import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.impl.rpc.model.QueryResponse; +import org.apache.carbondata.store.impl.rpc.model.Scan; +import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * It handles request from master. + */ +@InterfaceAudience.Internal +class RequestHandler { + + private StoreConf storeConf; + private Configuration hadoopConf; + + RequestHandler(StoreConf conf, Configuration hadoopConf) { + this.storeConf = conf; + this.hadoopConf = hadoopConf; + } + + private static final LogService LOGGER = + LogServiceFactory.getLogService(RequestHandler.class.getName()); + + QueryResponse handleScan(Scan scan) { + try { + LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId())); + CarbonTable table = CarbonTable.buildFromTableInfo(scan.getTableInfo()); + List<CarbonRow> rows = CarbonStoreBase.scan(table, scan); + LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId())); + return createSuccessResponse(scan, rows); + } catch (IOException e) { + LOGGER.error(e); + LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId())); + return createFailureResponse(scan, e); + } + } + + ShutdownResponse handleShutdown(ShutdownRequest request) { + LOGGER.info("Shutting down worker..."); + SearchModeDetailQueryExecutor.shutdownThreadPool(); + SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); + LOGGER.info("Worker shut down"); + return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); + } + + /** + * create a failure response + */ + private QueryResponse createFailureResponse(Scan scan, Throwable throwable) { + return new QueryResponse(scan.getRequestId(), Status.FAILURE.ordinal(), + throwable.getMessage(), new Object[0][]); + } + + /** + * create a success response with result rows + */ + private QueryResponse createSuccessResponse(Scan scan, List<CarbonRow> rows) { + Iterator<CarbonRow> itor = rows.iterator(); + Object[][] output = new Object[rows.size()][]; + int i = 0; + while (itor.hasNext()) { + output[i++] = itor.next().getData(); + } + return new QueryResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output); + } + + public BaseResponse handleLoadData(LoadDataRequest request) { + DataLoadExecutor executor = null; + try { + CarbonLoadModel model = request.getModel(); + + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + CarbonInputFormatUtil.createJobTrackerID(new Date()); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); + Configuration configuration = new Configuration(hadoopConf); + StoreUtil.configureCSVInputFormat(configuration, model); + configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); + // Set up the attempt context required to use in the output committer. + TaskAttemptContext hadoopAttemptContext = + new TaskAttemptContextImpl(configuration, taskAttemptId); + + CSVInputFormat format = new CSVInputFormat(); + List<InputSplit> splits = format.getSplits(hadoopAttemptContext); + + CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; + for (int index = 0; index < splits.size(); index++) { + readerIterators[index] = new CSVRecordReaderIterator( + format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), + hadoopAttemptContext); + } + + executor = new DataLoadExecutor(); + executor.execute(model, storeConf.storeTempLocation(), readerIterators); + + return new BaseResponse(Status.SUCCESS.ordinal(), ""); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle load data"); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (InterruptedException e) { + LOGGER.error(e, "Interrupted handle load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (Exception e) { + LOGGER.error(e, "Failed to execute load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } finally { + if (executor != null) { + executor.close(); + StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java new file mode 100644 index 0000000..26f252c --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.worker; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.impl.rpc.StoreService; +import org.apache.carbondata.store.impl.rpc.model.BaseResponse; +import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.impl.rpc.model.QueryResponse; +import org.apache.carbondata.store.impl.rpc.model.Scan; +import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; + +import org.apache.hadoop.ipc.ProtocolSignature; + +@InterfaceAudience.Internal +public class StoreServiceImpl implements StoreService { + + private Worker worker; + RequestHandler handler; + + public StoreServiceImpl(Worker worker) { + this.worker = worker; + this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf()); + } + + @Override + public BaseResponse loadData(LoadDataRequest request) { + return handler.handleLoadData(request); + } + + @Override + public QueryResponse query(Scan scan) { + return handler.handleScan(scan); + } + + @Override + public ShutdownResponse shutdown(ShutdownRequest request) { + return handler.handleShutdown(request); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } + + public Worker getWorker() { + return worker; + } + + public void setWorker(Worker worker) { + this.worker = worker; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java new file mode 100644 index 0000000..a360e36 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.worker; + +import java.io.IOException; +import java.net.BindException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.impl.rpc.RegistryService; +import org.apache.carbondata.store.impl.rpc.ServiceFactory; +import org.apache.carbondata.store.impl.rpc.StoreService; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +@InterfaceAudience.Internal +public class Worker { + + private static LogService LOGGER = LogServiceFactory.getLogService(Worker.class.getName()); + + private String id; + private RegistryService registry; + private StoreConf conf; + private Configuration hadoopConf; + private RPC.Server server; + + public Worker(StoreConf conf) { + this.conf = conf; + this.hadoopConf = this.conf.newHadoopConf(); + } + + public void start() { + try { + startService(); + registerToMaster(); + } catch (IOException e) { + LOGGER.error(e, "worker failed to start"); + } + } + + private void startService() throws IOException { + BindException exception; + // we will try to create service at worse case 100 times + int numTry = 100; + int coreNum = conf.workerCoreNum(); + String host = conf.workerHost(); + int port = conf.workerPort(); + StoreService queryService = new StoreServiceImpl(this); + do { + try { + server = new RPC.Builder(hadoopConf) + .setNumHandlers(coreNum) + .setBindAddress(host) + .setPort(port) + .setProtocol(StoreService.class) + .setInstance(queryService) + .build(); + server.start(); + + numTry = 0; + exception = null; + } catch (BindException e) { + // port is occupied, increase the port number and try again + exception = e; + port = port + 1; + numTry = numTry - 1; + } + } while (numTry > 0); + + if (exception != null) { + // we have tried many times, but still failed to find an available port + LOGGER.error(exception, "worker failed to start"); + throw exception; + } + + conf.conf(StoreConf.WORKER_PORT, port); + LOGGER.info("worker started on " + host + ":" + port + " successfully"); + + } + + public void stop() { + try { + stopService(); + } catch (InterruptedException e) { + LOGGER.error(e, "worker failed to start"); + } + } + + private void stopService() throws InterruptedException { + if (server != null) { + server.stop(); + server.join(); + server = null; + } + } + + private void registerToMaster() throws IOException { + LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort()); + if (registry == null) { + registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort()); + } + RegisterWorkerRequest request = + new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum()); + try { + RegisterWorkerResponse response = registry.registerWorker(request); + id = response.getWorkerId(); + } catch (Throwable throwable) { + LOGGER.error(throwable, "worker failed to register"); + throw new IOException(throwable); + } + + LOGGER.info("worker " + id + " registered successfully"); + } + + public String getId() { + return id; + } + + public static void main(String[] args) { + if (args.length != 2) { + System.err.println("Usage: Worker <log4j file> <properties file>"); + return; + } + + StoreUtil.initLog4j(args[0]); + Worker worker = new Worker(new StoreConf(args[1])); + worker.start(); + } + + public StoreConf getConf() { + return conf; + } + + public void setConf(StoreConf conf) { + this.conf = conf; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public void setHadoopConf(Configuration hadoopConf) { + this.hadoopConf = hadoopConf; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java deleted file mode 100644 index 0a724d9..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java +++ /dev/null @@ -1,522 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.master; - -import java.io.File; -import java.io.IOException; -import java.lang.ref.SoftReference; -import java.net.BindException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.block.Distributable; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.exception.InvalidConfigurationException; -import org.apache.carbondata.core.fileoperations.FileWriteOperation; -import org.apache.carbondata.core.locks.CarbonLockUtil; -import org.apache.carbondata.core.locks.ICarbonLock; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.SegmentFileStore; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatus; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.ThriftWriter; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.api.CarbonInputFormat; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.processing.util.CarbonLoaderUtil; -import org.apache.carbondata.store.conf.StoreConf; -import org.apache.carbondata.store.exception.ExecutionTimeoutException; -import org.apache.carbondata.store.exception.StoreException; -import org.apache.carbondata.store.rpc.RegistryService; -import org.apache.carbondata.store.rpc.ServiceFactory; -import org.apache.carbondata.store.rpc.StoreService; -import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl; -import org.apache.carbondata.store.rpc.impl.Status; -import org.apache.carbondata.store.rpc.model.BaseResponse; -import org.apache.carbondata.store.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.scheduler.Schedulable; -import org.apache.carbondata.store.scheduler.Scheduler; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; - -/** - * Master of CarbonSearch. - * It provides a Registry service for worker to register. - * And it provides search API to fire RPC call to workers. - */ - -public class Master { - - private static Master instance = null; - - private static LogService LOGGER = LogServiceFactory.getLogService(Master.class.getName()); - - private Map<String, SoftReference<CarbonTable>> cacheTables; - - // worker host address map to EndpointRef - private StoreConf conf; - private Configuration hadoopConf; - private Random random = new Random(); - private RPC.Server registryServer = null; - private Scheduler scheduler = new Scheduler(); - - private Master(StoreConf conf) { - cacheTables = new HashMap<>(); - this.conf = conf; - this.hadoopConf = this.conf.newHadoopConf(); - } - - /** - * start service and listen on port passed in constructor - */ - public void startService() throws IOException { - if (registryServer == null) { - - BindException exception; - // we will try to create service at worse case 100 times - int numTry = 100; - String host = conf.masterHost(); - int port = conf.masterPort(); - LOGGER.info("building registry-service on " + host + ":" + port); - - RegistryService registryService = new RegistryServiceImpl(this); - do { - try { - registryServer = new RPC.Builder(hadoopConf).setBindAddress(host).setPort(port) - .setProtocol(RegistryService.class).setInstance(registryService).build(); - - registryServer.start(); - numTry = 0; - exception = null; - } catch (BindException e) { - // port is occupied, increase the port number and try again - exception = e; - LOGGER.error(e, "start registry-service failed"); - port = port + 1; - numTry = numTry - 1; - } - } while (numTry > 0); - if (exception != null) { - // we have tried many times, but still failed to find an available port - throw exception; - } - LOGGER.info("registry-service started"); - } else { - LOGGER.info("Search mode master has already started"); - } - } - - public void stopService() throws InterruptedException { - if (registryServer != null) { - registryServer.stop(); - registryServer.join(); - registryServer = null; - } - } - - public void stopAllWorkers() throws IOException { - for (Schedulable worker : getWorkers()) { - try { - worker.service.shutdown(new ShutdownRequest("user")); - } catch (Throwable throwable) { - throw new IOException(throwable); - } - scheduler.removeWorker(worker.getAddress()); - } - } - - /** - * A new searcher is trying to register, add it to the map and connect to this searcher - */ - public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) throws IOException { - LOGGER.info( - "Receive Register request from worker " + request.getHostAddress() + ":" + request.getPort() - + " with " + request.getCores() + " cores"); - String workerId = UUID.randomUUID().toString(); - String workerAddress = request.getHostAddress(); - int workerPort = request.getPort(); - LOGGER.info( - "connecting to worker " + request.getHostAddress() + ":" + request.getPort() + ", workerId " - + workerId); - - StoreService searchService = ServiceFactory.createStoreService(workerAddress, workerPort); - scheduler.addWorker( - new Schedulable(workerId, workerAddress, workerPort, request.getCores(), searchService)); - LOGGER.info("worker " + request + " registered"); - return new RegisterWorkerResponse(workerId); - } - - private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> output, long globalLimit) - throws IOException { - // in case of RPC success, collect all rows in response message - if (result.getQueryId() != queryId) { - throw new IOException( - "queryId in response does not match request: " + result.getQueryId() + " != " + queryId); - } - if (result.getStatus() != Status.SUCCESS.ordinal()) { - throw new IOException("failure in worker: " + result.getMessage()); - } - int rowCount = 0; - Object[][] rows = result.getRows(); - for (Object[] row : rows) { - output.add(new CarbonRow(row)); - rowCount++; - if (rowCount >= globalLimit) { - break; - } - } - LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + rowCount); - return rowCount; - } - - private void onFailure(Throwable e) throws IOException { - throw new IOException("exception in worker: " + e.getMessage()); - } - - private void onTimeout() { - throw new ExecutionTimeoutException(); - } - - public String getTableFolder(String database, String tableName) { - return conf.storeLocation() + File.separator + database + File.separator + tableName; - } - - public CarbonTable getTable(String database, String tableName) throws StoreException { - String tablePath = getTableFolder(database, tableName); - CarbonTable carbonTable; - SoftReference<CarbonTable> reference = cacheTables.get(tablePath); - if (reference != null) { - carbonTable = reference.get(); - if (carbonTable != null) { - return carbonTable; - } - } - - try { - org.apache.carbondata.format.TableInfo tableInfo = - CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath)); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); - tableInfo1.setTablePath(tablePath); - carbonTable = CarbonTable.buildFromTableInfo(tableInfo1); - cacheTables.put(tablePath, new SoftReference<>(carbonTable)); - return carbonTable; - } catch (IOException e) { - String message = "Failed to get table from " + tablePath; - LOGGER.error(e, message); - throw new StoreException(message); - } - } - - public boolean createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException { - AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); - boolean tableExists = FileFactory.isFileExist(identifier.getTablePath()); - if (tableExists) { - if (ifNotExists) { - return true; - } else { - throw new IOException( - "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable() - .getTableName() + ", because it already exists"); - } - } - - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - String databaseName = tableInfo.getDatabaseName(); - String tableName = tableInfo.getFactTable().getTableName(); - org.apache.carbondata.format.TableInfo thriftTableInfo = - schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName); - - String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); - String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); - FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); - try { - if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType); - if (!isDirCreated) { - throw new IOException("Failed to create the metadata directory " + schemaMetadataPath); - } - } - ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); - thriftWriter.open(FileWriteOperation.OVERWRITE); - thriftWriter.write(thriftTableInfo); - thriftWriter.close(); - return true; - } catch (IOException e) { - LOGGER.error(e, "Failed to handle create table"); - throw e; - } - } - - private void openSegment(CarbonLoadModel loadModel, boolean isOverwriteTable) throws IOException { - try { - CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, isOverwriteTable); - } catch (IOException e) { - LOGGER.error(e, "Failed to handle load data"); - throw e; - } - } - - private void closeSegment(CarbonLoadModel loadModel) throws IOException { - try { - CarbonLoaderUtil.updateTableStatusForFailure(loadModel, ""); - } catch (IOException e) { - LOGGER.error(e, "Failed to close segment"); - throw e; - } - } - - private void commitSegment(CarbonLoadModel loadModel) throws IOException { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); - String segmentId = loadModel.getSegmentId(); - String segmentFileName = SegmentFileStore - .writeSegmentFile(carbonTable, segmentId, String.valueOf(loadModel.getFactTimeStamp())); - - AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); - String tablePath = absoluteTableIdentifier.getTablePath(); - String metadataPath = CarbonTablePath.getMetadataPath(tablePath); - String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); - - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); - int retryCount = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); - int maxTimeout = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); - try { - if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { - LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metadataPath); - LoadMetadataDetails loadMetadataDetails = null; - for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { - // if the segments is in the list of marked for delete then update the status. - if (segmentId.equals(detail.getLoadName())) { - loadMetadataDetails = detail; - detail.setSegmentFile(segmentFileName); - break; - } - } - if (loadMetadataDetails == null) { - throw new IOException("can not find segment: " + segmentId); - } - - CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, SegmentStatus.SUCCESS, - loadModel.getFactTimeStamp(), true); - CarbonLoaderUtil - .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, carbonTable); - - SegmentStatusManager - .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); - } else { - LOGGER.error( - "Not able to acquire the lock for Table status updation for table path " + tablePath); - } - } finally { - if (carbonLock.unlock()) { - LOGGER.info("Table unlocked successfully after table status updation" + tablePath); - } else { - LOGGER.error( - "Unable to unlock Table lock for table" + tablePath + " during table status updation"); - } - } - } - - public boolean loadData(CarbonLoadModel loadModel, boolean isOverwrite) throws IOException { - Schedulable worker = scheduler.pickNexWorker(); - try { - if (loadModel.getFactTimeStamp() == 0) { - loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()); - } - openSegment(loadModel, isOverwrite); - LoadDataRequest request = new LoadDataRequest(loadModel); - BaseResponse response = scheduler.sendRequest(worker, request); - if (Status.SUCCESS.ordinal() == response.getStatus()) { - commitSegment(loadModel); - return true; - } else { - closeSegment(loadModel); - throw new IOException(response.getMessage()); - } - } finally { - worker.workload.decrementAndGet(); - } - } - - /** - * Execute search by firing RPC call to worker, return the result rows - * - * @param table table to search - * @param columns projection column names - * @param filter filter expression - * @param globalLimit max number of rows required in Master - * @param localLimit max number of rows required in Worker - * @return CarbonRow array - */ - public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter, - long globalLimit, long localLimit) throws IOException { - Objects.requireNonNull(table); - Objects.requireNonNull(columns); - if (globalLimit < 0 || localLimit < 0) { - throw new IllegalArgumentException("limit should be positive"); - } - - int queryId = random.nextInt(); - - List<CarbonRow> output = new ArrayList<>(); - - // prune data and get a mapping of worker hostname to list of blocks, - // then add these blocks to the QueryRequest and fire the RPC call - Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter); - Set<Map.Entry<String, List<Distributable>>> entries = nodeBlockMapping.entrySet(); - List<Future<QueryResponse>> futures = new ArrayList<>(entries.size()); - List<Schedulable> workers = new ArrayList<>(entries.size()); - for (Map.Entry<String, List<Distributable>> entry : entries) { - CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(entry.getValue(), entry.getKey()); - QueryRequest request = - new QueryRequest(queryId, split, table.getTableInfo(), columns, filter, localLimit); - - // Find an Endpoind and send the request to it - // This RPC is non-blocking so that we do not need to wait before send to next worker - Schedulable worker = scheduler.pickWorker(entry.getKey()); - workers.add(worker); - futures.add(scheduler.sendRequestAsync(worker, request)); - } - - int rowCount = 0; - int length = futures.size(); - for (int i = 0; i < length; i++) { - Future<QueryResponse> future = futures.get(i); - Schedulable worker = workers.get(i); - if (rowCount < globalLimit) { - // wait for worker - QueryResponse response = null; - try { - response = future - .get((long) (CarbonProperties.getInstance().getQueryTimeout()), TimeUnit.SECONDS); - } catch (ExecutionException | InterruptedException e) { - onFailure(e); - } catch (TimeoutException t) { - onTimeout(); - } finally { - worker.workload.decrementAndGet(); - } - LOGGER.info("[QueryId: " + queryId + "] receive search response from worker " + worker); - rowCount += onSuccess(queryId, response, output, globalLimit); - } - } - CarbonRow[] rows = new CarbonRow[output.size()]; - return output.toArray(rows); - } - - /** - * Prune data by using CarbonInputFormat.getSplit - * Return a mapping of host address to list of block - */ - private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns, - Expression filter) throws IOException { - JobConf jobConf = new JobConf(new Configuration()); - Job job = new Job(jobConf); - CarbonTableInputFormat format; - try { - format = CarbonInputFormatUtil - .createCarbonTableInputFormat(job, table, columns, filter, null, null, true); - } catch (InvalidConfigurationException e) { - throw new IOException(e.getMessage()); - } - - // We will do FG pruning in reader side, so don't do it here - CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false); - List<InputSplit> splits = format.getSplits(job); - List<Distributable> blockInfos = new ArrayList<>(splits.size()); - for (InputSplit split : splits) { - blockInfos.add((Distributable) split); - } - return CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, getWorkerAddresses(), - CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null); - } - - /** - * return hostname of all workers - */ - public List<Schedulable> getWorkers() { - return scheduler.getAllWorkers(); - } - - private List<String> getWorkerAddresses() { - return scheduler.getAllWorkerAddresses(); - } - - public static synchronized Master getInstance(StoreConf conf) { - if (instance == null) { - instance = new Master(conf); - } - return instance; - } - - public static void main(String[] args) throws InterruptedException { - if (args.length != 2) { - System.err.println("Usage: Master <log4j file> <properties file>"); - return; - } - - StoreUtil.initLog4j(args[0]); - StoreConf conf = new StoreConf(args[1]); - Master master = getInstance(conf); - master.stopService(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java deleted file mode 100644 index 08a0e97..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; - -import org.apache.hadoop.ipc.VersionedProtocol; - -@InterfaceAudience.Internal -public interface RegistryService extends VersionedProtocol { - long versionID = 1L; - RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java deleted file mode 100644 index d9d0f3e..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; - -@InterfaceAudience.Internal -public class ServiceFactory { - - public static StoreService createStoreService(String host, int port) throws IOException { - InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); - return RPC.getProxy( - StoreService.class, StoreService.versionID, address, new Configuration()); - } - - public static RegistryService createRegistryService(String host, int port) throws IOException { - InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); - return RPC.getProxy( - RegistryService.class, RegistryService.versionID, address, new Configuration()); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java deleted file mode 100644 index 48dec79..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.rpc.model.BaseResponse; -import org.apache.carbondata.store.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.rpc.model.ShutdownResponse; - -import org.apache.hadoop.ipc.VersionedProtocol; - -@InterfaceAudience.Internal -public interface StoreService extends VersionedProtocol { - - long versionID = 1L; - - BaseResponse loadData(LoadDataRequest request); - - QueryResponse query(QueryRequest request); - - ShutdownResponse shutdown(ShutdownRequest request); -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java deleted file mode 100644 index 45e2dce..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; -import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; - -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * This is a special RecordReader that leverages FGDataMap before reading carbondata file - * and return CarbonRow object - */ -public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> { - - private static final LogService LOG = - LogServiceFactory.getLogService(RequestHandler.class.getName()); - - private int queryId; - private CarbonTable table; - - public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) { - super(queryModel, new CarbonRowReadSupport()); - this.queryId = queryId; - this.table = table; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit; - List<CarbonInputSplit> splits = mbSplit.getAllSplits(); - List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); - queryModel.setTableBlockInfos(list); - - // prune the block with FGDataMap is there is one based on the filter condition - DataMapExprWrapper fgDataMap = chooseFGDataMap(table, - queryModel.getFilterExpressionResolverTree()); - if (fgDataMap != null) { - queryModel = prune(table, queryModel, mbSplit, fgDataMap); - } else { - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splits); - queryModel.setTableBlockInfos(tableBlockInfoList); - } - - readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); - try { - carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); - } catch (QueryExecutionException e) { - throw new InterruptedException(e.getMessage()); - } - } - - private DataMapExprWrapper chooseFGDataMap( - CarbonTable table, - FilterResolverIntf filterInterface) { - DataMapChooser chooser = null; - try { - chooser = new DataMapChooser(table); - return chooser.chooseFGDataMap(filterInterface); - } catch (IOException e) { - LOG.error(e); - return null; - } - } - - /** - * If there is FGDataMap defined for this table and filter condition in the query, - * prune the splits by the DataMap and set the pruned split into the QueryModel and return - */ - private QueryModel prune(CarbonTable table, QueryModel queryModel, - CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { - Objects.requireNonNull(datamap); - List<Segment> segments = new LinkedList<>(); - HashMap<String, Integer> uniqueSegments = new HashMap<>(); - LoadMetadataDetails[] loadMetadataDetails = - SegmentStatusManager.readLoadMetadata( - CarbonTablePath.getMetadataPath(table.getTablePath())); - for (CarbonInputSplit split : mbSplit.getAllSplits()) { - String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); - if (uniqueSegments.get(segmentId) == null) { - segments.add(Segment.toSegment(segmentId, - new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), - loadMetadataDetails))); - uniqueSegments.put(segmentId, 1); - } else { - uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); - } - } - - List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); - List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); - for (int i = 0; i < distributables.size(); i++) { - DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); - prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); - } - - HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); - for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { - pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); - } - - List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); - List<TableBlockInfo> blockToRead = new LinkedList<>(); - for (TableBlockInfo block : blocks) { - if (pathToRead.keySet().contains(block.getFilePath())) { - // If not set this, it won't create FineGrainBlocklet object in - // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData - block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); - blockToRead.add(block); - } - } - LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, - blockToRead.size())); - queryModel.setTableBlockInfos(blockToRead); - return queryModel; - } - - @Override public void close() throws IOException { - logStatistics(rowCount, queryModel.getStatisticsRecorder()); - // clear dictionary cache - Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); - if (null != columnToDictionaryMapping) { - for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { - CarbonUtil.clearDictionaryCache(entry.getValue()); - } - } - - // close read support - readSupport.close(); - try { - queryExecutor.finish(); - } catch (QueryExecutionException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java deleted file mode 100644 index 03f9b2c..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.master.Master; -import org.apache.carbondata.store.rpc.RegistryService; -import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; - -import org.apache.hadoop.ipc.ProtocolSignature; - -@InterfaceAudience.Internal -public class RegistryServiceImpl implements RegistryService { - - private Master master; - - public RegistryServiceImpl(Master master) { - this.master = master; - } - - @Override - public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException { - return master.addWorker(request); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java deleted file mode 100644 index 3b98019..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import java.io.IOException; -import java.util.Date; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.model.QueryModelBuilder; -import org.apache.carbondata.core.util.CarbonTaskInfo; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.processing.loading.DataLoadExecutor; -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; -import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.store.conf.StoreConf; -import org.apache.carbondata.store.rpc.model.BaseResponse; -import org.apache.carbondata.store.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.rpc.model.ShutdownResponse; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * It handles request from master. - */ -@InterfaceAudience.Internal -class RequestHandler { - - private StoreConf conf; - private Configuration hadoopConf; - - public RequestHandler(StoreConf conf, Configuration hadoopConf) { - this.conf = conf; - this.hadoopConf = hadoopConf; - } - - private static final LogService LOGGER = - LogServiceFactory.getLogService(RequestHandler.class.getName()); - - QueryResponse handleSearch(QueryRequest request) { - try { - LOGGER.info(String.format("[QueryId:%d] receive search request", request.getRequestId())); - List<CarbonRow> rows = handleRequest(request); - LOGGER.info(String.format("[QueryId:%d] sending success response", request.getRequestId())); - return createSuccessResponse(request, rows); - } catch (IOException e) { - LOGGER.error(e); - LOGGER.info(String.format("[QueryId:%d] sending failure response", request.getRequestId())); - return createFailureResponse(request, e); - } - } - - ShutdownResponse handleShutdown(ShutdownRequest request) { - LOGGER.info("Shutting down worker..."); - SearchModeDetailQueryExecutor.shutdownThreadPool(); - SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); - LOGGER.info("Worker shut down"); - return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); - } - - /** - * Builds {@link QueryModel} and read data from files - */ - private List<CarbonRow> handleRequest(QueryRequest request) throws IOException { - CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo(); - carbonTaskInfo.setTaskId(System.nanoTime()); - ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo); - CarbonMultiBlockSplit mbSplit = request.getSplit(); - long limit = request.getLimit(); - TableInfo tableInfo = request.getTableInfo(); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); - QueryModel queryModel = createQueryModel(table, request); - - LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", request.getRequestId(), - queryModel.toString(), mbSplit.getAllSplits().size())); - - // read all rows by the reader - List<CarbonRow> rows = new LinkedList<>(); - try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(request.getRequestId(), - table, queryModel)) { - reader.initialize(mbSplit, null); - - // loop to read required number of rows. - // By default, if user does not specify the limit value, limit is Long.MaxValue - long rowCount = 0; - while (reader.nextKeyValue() && rowCount < limit) { - rows.add(reader.getCurrentValue()); - rowCount++; - } - } catch (InterruptedException e) { - throw new IOException(e); - } - LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows", - request.getRequestId(), rows.size())); - return rows; - } - - private QueryModel createQueryModel(CarbonTable table, QueryRequest request) { - String[] projectColumns = request.getProjectColumns(); - Expression filter = null; - if (request.getFilterExpression() != null) { - filter = request.getFilterExpression(); - } - return new QueryModelBuilder(table).projectColumns(projectColumns).filterExpression(filter) - .build(); - } - - /** - * create a failure response - */ - private QueryResponse createFailureResponse(QueryRequest request, Throwable throwable) { - return new QueryResponse(request.getRequestId(), Status.FAILURE.ordinal(), - throwable.getMessage(), new Object[0][]); - } - - /** - * create a success response with result rows - */ - private QueryResponse createSuccessResponse(QueryRequest request, List<CarbonRow> rows) { - Iterator<CarbonRow> itor = rows.iterator(); - Object[][] output = new Object[rows.size()][]; - int i = 0; - while (itor.hasNext()) { - output[i++] = itor.next().getData(); - } - return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), "", output); - } - - public BaseResponse handleLoadData(LoadDataRequest request) { - DataLoadExecutor executor = null; - try { - CarbonLoadModel model = request.getModel(); - - JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); - CarbonInputFormatUtil.createJobTrackerID(new Date()); - TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); - TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); - Configuration configuration = new Configuration(hadoopConf); - StoreUtil.configureCSVInputFormat(configuration, model); - configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); - // Set up the attempt context required to use in the output committer. - TaskAttemptContext hadoopAttemptContext = - new TaskAttemptContextImpl(configuration, taskAttemptId); - - CSVInputFormat format = new CSVInputFormat(); - List<InputSplit> splits = format.getSplits(hadoopAttemptContext); - - CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; - for (int index = 0; index < splits.size(); index++) { - readerIterators[index] = new CSVRecordReaderIterator( - format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), - hadoopAttemptContext); - } - - executor = new DataLoadExecutor(); - executor.execute(model, conf.storeTempLocation(), readerIterators); - - return new BaseResponse(Status.SUCCESS.ordinal(), ""); - } catch (IOException e) { - LOGGER.error(e, "Failed to handle load data"); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } catch (InterruptedException e) { - LOGGER.error(e, "Interrupted handle load data "); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } catch (Exception e) { - LOGGER.error(e, "Failed to execute load data "); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } finally { - if (executor != null) { - executor.close(); - StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java deleted file mode 100644 index 9bcd397..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -/** - * Status of RPC response - */ -@InterfaceAudience.Internal -public enum Status { - SUCCESS, FAILURE -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/044a995a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java deleted file mode 100644 index ac3b199..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.rpc.StoreService; -import org.apache.carbondata.store.rpc.model.BaseResponse; -import org.apache.carbondata.store.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.rpc.model.ShutdownResponse; -import org.apache.carbondata.store.worker.Worker; - -import org.apache.hadoop.ipc.ProtocolSignature; - -@InterfaceAudience.Internal -public class StoreServiceImpl implements StoreService { - - private Worker worker; - RequestHandler handler; - - public StoreServiceImpl(Worker worker) { - this.worker = worker; - this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf()); - } - - @Override - public BaseResponse loadData(LoadDataRequest request) { - return handler.handleLoadData(request); - } - - @Override - public QueryResponse query(QueryRequest request) { - return handler.handleSearch(request); - } - - @Override - public ShutdownResponse shutdown(ShutdownRequest request) { - return handler.handleShutdown(request); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return null; - } - - public Worker getWorker() { - return worker; - } - - public void setWorker(Worker worker) { - this.worker = worker; - } -}