http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java new file mode 100644 index 0000000..2650bbf --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.service.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class ShutdownResponse implements Serializable, Writable { + private int status; + private String message; + + public ShutdownResponse() { + } + + public ShutdownResponse(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(status); + out.writeUTF(message); + } + + @Override + public void readFields(DataInput in) throws IOException { + status = in.readInt(); + message = in.readUTF(); + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java new file mode 100644 index 0000000..d1b8f43 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.worker; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.sdk.store.util.StoreUtil; +import org.apache.carbondata.store.impl.DataOperation; +import org.apache.carbondata.store.impl.Status; +import org.apache.carbondata.store.impl.service.DataService; +import org.apache.carbondata.store.impl.service.model.BaseResponse; +import org.apache.carbondata.store.impl.service.model.LoadDataRequest; +import org.apache.carbondata.store.impl.service.model.ScanRequest; +import org.apache.carbondata.store.impl.service.model.ScanResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +@InterfaceAudience.Internal +public class DataServiceImpl implements DataService { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataServiceImpl.class.getCanonicalName()); + + // temp location for loading (writing sort temp files) + private String[] storeTempLocation; + private Configuration hadoopConf; + + DataServiceImpl(Worker worker) { + this.hadoopConf = worker.getHadoopConf(); + this.storeTempLocation = worker.getConf().storeTempLocation(); + } + + @Override + public BaseResponse loadData(LoadDataRequest request) { + DataLoadExecutor executor = null; + try { + CarbonLoadModel model = request.getModel(); + + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + CarbonInputFormatUtil.createJobTrackerID(new Date()); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); + Configuration configuration = new Configuration(hadoopConf); + StoreUtil.configureCSVInputFormat(configuration, model); + configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); + // Set up the attempt context required to use in the output committer. + TaskAttemptContext hadoopAttemptContext = + new TaskAttemptContextImpl(configuration, taskAttemptId); + + CSVInputFormat format = new CSVInputFormat(); + List<InputSplit> splits = format.getSplits(hadoopAttemptContext); + + CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; + for (int index = 0; index < splits.size(); index++) { + readerIterators[index] = new CSVRecordReaderIterator( + format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), + hadoopAttemptContext); + } + + executor = new DataLoadExecutor(); + executor.execute(model, storeTempLocation, readerIterators); + + return new BaseResponse(Status.SUCCESS.ordinal(), ""); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle load data"); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (InterruptedException e) { + LOGGER.error(e, "Interrupted handle load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (Exception e) { + LOGGER.error(e, "Failed to execute load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } finally { + if (executor != null) { + executor.close(); + StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); + } + } + } + + @Override + public ScanResponse scan(ScanRequest scan) { + try { + LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId())); + List<CarbonRow> rows = DataOperation.scan(scan.getTableInfo(), scan); + LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId())); + return createSuccessResponse(scan, rows); + } catch (IOException e) { + LOGGER.error(e); + LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId())); + return createFailureResponse(scan, e); + } + } + + /** + * create a failure response + */ + private ScanResponse createFailureResponse(ScanRequest scan, Throwable throwable) { + return new ScanResponse(scan.getRequestId(), Status.FAILURE.ordinal(), + throwable.getMessage(), new Object[0][]); + } + + /** + * create a success response with result rows + */ + private ScanResponse createSuccessResponse(ScanRequest scan, List<CarbonRow> rows) { + Iterator<CarbonRow> itor = rows.iterator(); + Object[][] output = new Object[rows.size()][]; + int i = 0; + while (itor.hasNext()) { + output[i++] = itor.next().getData(); + } + return new ScanResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } + + @Override + public void close() throws IOException { + LOGGER.info("Shutting down worker..."); + SearchModeDetailQueryExecutor.shutdownThreadPool(); + SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); + LOGGER.info("Worker shut down"); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java deleted file mode 100644 index fd13b20..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.impl.worker; - -import java.io.IOException; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.processing.loading.DataLoadExecutor; -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; -import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.impl.CarbonStoreBase; -import org.apache.carbondata.store.impl.Status; -import org.apache.carbondata.store.impl.rpc.model.BaseResponse; -import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.impl.rpc.model.QueryResponse; -import org.apache.carbondata.store.impl.rpc.model.Scan; -import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * It handles request from master. - */ -@InterfaceAudience.Internal -class RequestHandler { - - private StoreConf storeConf; - private Configuration hadoopConf; - - RequestHandler(StoreConf conf, Configuration hadoopConf) { - this.storeConf = conf; - this.hadoopConf = hadoopConf; - } - - private static final LogService LOGGER = - LogServiceFactory.getLogService(RequestHandler.class.getName()); - - QueryResponse handleScan(Scan scan) { - try { - LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId())); - CarbonTable table = CarbonTable.buildFromTableInfo(scan.getTableInfo()); - List<CarbonRow> rows = CarbonStoreBase.scan(table, scan); - LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId())); - return createSuccessResponse(scan, rows); - } catch (IOException e) { - LOGGER.error(e); - LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId())); - return createFailureResponse(scan, e); - } - } - - ShutdownResponse handleShutdown(ShutdownRequest request) { - LOGGER.info("Shutting down worker..."); - SearchModeDetailQueryExecutor.shutdownThreadPool(); - SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); - LOGGER.info("Worker shut down"); - return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); - } - - /** - * create a failure response - */ - private QueryResponse createFailureResponse(Scan scan, Throwable throwable) { - return new QueryResponse(scan.getRequestId(), Status.FAILURE.ordinal(), - throwable.getMessage(), new Object[0][]); - } - - /** - * create a success response with result rows - */ - private QueryResponse createSuccessResponse(Scan scan, List<CarbonRow> rows) { - Iterator<CarbonRow> itor = rows.iterator(); - Object[][] output = new Object[rows.size()][]; - int i = 0; - while (itor.hasNext()) { - output[i++] = itor.next().getData(); - } - return new QueryResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output); - } - - public BaseResponse handleLoadData(LoadDataRequest request) { - DataLoadExecutor executor = null; - try { - CarbonLoadModel model = request.getModel(); - - JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); - CarbonInputFormatUtil.createJobTrackerID(new Date()); - TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); - TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); - Configuration configuration = new Configuration(hadoopConf); - StoreUtil.configureCSVInputFormat(configuration, model); - configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); - // Set up the attempt context required to use in the output committer. - TaskAttemptContext hadoopAttemptContext = - new TaskAttemptContextImpl(configuration, taskAttemptId); - - CSVInputFormat format = new CSVInputFormat(); - List<InputSplit> splits = format.getSplits(hadoopAttemptContext); - - CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; - for (int index = 0; index < splits.size(); index++) { - readerIterators[index] = new CSVRecordReaderIterator( - format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), - hadoopAttemptContext); - } - - executor = new DataLoadExecutor(); - executor.execute(model, storeConf.storeTempLocation(), readerIterators); - - return new BaseResponse(Status.SUCCESS.ordinal(), ""); - } catch (IOException e) { - LOGGER.error(e, "Failed to handle load data"); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } catch (InterruptedException e) { - LOGGER.error(e, "Interrupted handle load data "); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } catch (Exception e) { - LOGGER.error(e, "Failed to execute load data "); - return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); - } finally { - if (executor != null) { - executor.close(); - StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java deleted file mode 100644 index 26f252c..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.impl.worker; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.impl.rpc.StoreService; -import org.apache.carbondata.store.impl.rpc.model.BaseResponse; -import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.impl.rpc.model.QueryResponse; -import org.apache.carbondata.store.impl.rpc.model.Scan; -import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; - -import org.apache.hadoop.ipc.ProtocolSignature; - -@InterfaceAudience.Internal -public class StoreServiceImpl implements StoreService { - - private Worker worker; - RequestHandler handler; - - public StoreServiceImpl(Worker worker) { - this.worker = worker; - this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf()); - } - - @Override - public BaseResponse loadData(LoadDataRequest request) { - return handler.handleLoadData(request); - } - - @Override - public QueryResponse query(Scan scan) { - return handler.handleScan(scan); - } - - @Override - public ShutdownResponse shutdown(ShutdownRequest request) { - return handler.handleShutdown(request); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return null; - } - - public Worker getWorker() { - return worker; - } - - public void setWorker(Worker worker) { - this.worker = worker; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java index a360e36..e3546d9 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java @@ -23,13 +23,13 @@ import java.net.BindException; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.impl.rpc.RegistryService; -import org.apache.carbondata.store.impl.rpc.ServiceFactory; -import org.apache.carbondata.store.impl.rpc.StoreService; -import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; -import org.apache.carbondata.store.util.StoreUtil; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.util.StoreUtil; +import org.apache.carbondata.store.impl.service.DataService; +import org.apache.carbondata.store.impl.service.RegistryService; +import org.apache.carbondata.store.impl.service.ServiceFactory; +import org.apache.carbondata.store.impl.service.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; @@ -66,15 +66,15 @@ public class Worker { int coreNum = conf.workerCoreNum(); String host = conf.workerHost(); int port = conf.workerPort(); - StoreService queryService = new StoreServiceImpl(this); + DataService dataService = new DataServiceImpl(this); do { try { server = new RPC.Builder(hadoopConf) .setNumHandlers(coreNum) .setBindAddress(host) .setPort(port) - .setProtocol(StoreService.class) - .setInstance(queryService) + .setProtocol(DataService.class) + .setInstance(dataService) .build(); server.start(); @@ -116,9 +116,11 @@ public class Worker { } private void registerToMaster() throws IOException { - LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort()); + LOGGER.info("trying to register to master " + + conf.masterHost() + ":" + conf.registryServicePort()); if (registry == null) { - registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort()); + registry = ServiceFactory.createRegistryService( + conf.masterHost(), conf.registryServicePort()); } RegisterWorkerRequest request = new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java deleted file mode 100644 index 775669f..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.util; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.Map; -import java.util.Properties; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.memory.UnsafeMemoryManager; -import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.store.api.conf.StoreConf; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.PropertyConfigurator; - -@InterfaceAudience.Internal -public class StoreUtil { - - private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName()); - - public static void loadProperties(String filePath, StoreConf conf) { - InputStream input = null; - try { - input = new FileInputStream(filePath); - Properties prop = new Properties(); - prop.load(input); - for (Map.Entry<Object, Object> entry : prop.entrySet()) { - conf.conf(entry.getKey().toString(), entry.getValue().toString()); - } - LOGGER.audit("loaded properties: " + filePath); - } catch (IOException ex) { - LOGGER.error(ex, "Failed to load properties file " + filePath); - } finally { - if (input != null) { - try { - input.close(); - } catch (IOException e) { - LOGGER.error(e); - } - } - } - } - - public static void initLog4j(String propertiesFilePath) { - BasicConfigurator.configure(); - PropertyConfigurator.configure(propertiesFilePath); - } - - public static byte[] serialize(Object object) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - try { - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - } catch (IOException e) { - LOGGER.error(e); - } - return baos.toByteArray(); - } - - public static Object deserialize(byte[] bytes) { - if (bytes == null) { - return null; - } - try { - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); - return ois.readObject(); - } catch (IOException e) { - LOGGER.error(e); - } catch (ClassNotFoundException e) { - LOGGER.error(e); - } - return null; - } - - public static void configureCSVInputFormat(Configuration configuration, - CarbonLoadModel carbonLoadModel) { - CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar()); - CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter()); - CSVInputFormat.setSkipEmptyLine(configuration, carbonLoadModel.getSkipEmptyLine()); - CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar()); - CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns()); - CSVInputFormat.setNumberOfColumns(configuration, - "" + carbonLoadModel.getCsvHeaderColumns().length); - - CSVInputFormat.setHeaderExtractionEnabled( - configuration, - carbonLoadModel.getCsvHeader() == null || - StringUtils.isEmpty(carbonLoadModel.getCsvHeader())); - - CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar()); - - CSVInputFormat.setReadBufferSize( - configuration, - CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CSV_READ_BUFFER_SIZE, - CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); - } - - public static void clearUnsafeMemory(long taskId) { - UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId); - UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java index 2448660..7f80a33 100644 --- a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java +++ b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java @@ -27,14 +27,15 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; -import org.apache.carbondata.store.api.CarbonStore; -import org.apache.carbondata.store.api.CarbonStoreFactory; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.api.descriptor.LoadDescriptor; -import org.apache.carbondata.store.api.descriptor.SelectDescriptor; -import org.apache.carbondata.store.api.descriptor.TableDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.CarbonStore; +import org.apache.carbondata.sdk.store.CarbonStoreFactory; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.store.impl.master.Master; import org.apache.carbondata.store.impl.worker.Worker; import org.junit.After; @@ -50,24 +51,36 @@ public class DistributedCarbonStoreTest { private static CarbonStore store; @BeforeClass - public static void beforeAll() throws IOException, StoreException { + public static void beforeAll() throws IOException, CarbonException, InterruptedException { projectFolder = new File(DistributedCarbonStoreTest.class.getResource("/").getPath() + "../../../../").getCanonicalPath(); + String confFile = projectFolder + "/store/conf/store.conf"; StoreConf storeConf = new StoreConf(confFile); - store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf); - projectFolder = new File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../") - .getCanonicalPath(); + new Thread(() -> { + try { + Master.main(new String[]{"", confFile}); + } catch (InterruptedException | IOException e) { + throw new RuntimeException("failed to start master"); + } + }).start(); + Thread.sleep(1000); // start worker Worker worker = new Worker(storeConf); worker.start(); + + Thread.sleep(1000); + + store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf); } @AfterClass public static void afterAll() throws IOException { - store.close(); + if (store != null) { + store.close(); + } } @Before @@ -81,13 +94,13 @@ public class DistributedCarbonStoreTest { } @Test - public void testSelect() throws IOException, StoreException { + public void testSelect() throws CarbonException { TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default"); store.dropTable(tableIdentifier); - TableDescriptor table = TableDescriptor + TableDescriptor descriptor = TableDescriptor .builder() - .ifNotExists() .table(tableIdentifier) + .ifNotExists() .comment("first table") .column("shortField", DataTypes.SHORT, "short field") .column("intField", DataTypes.INT, "int field") @@ -101,7 +114,7 @@ public class DistributedCarbonStoreTest { .column("floatField", DataTypes.DOUBLE, "float field") .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") .create(); - store.createTable(table); + store.createTable(descriptor); // load one segment LoadDescriptor load = LoadDescriptor @@ -114,26 +127,26 @@ public class DistributedCarbonStoreTest { store.loadData(load); // select row - SelectDescriptor select = SelectDescriptor + ScanDescriptor select = ScanDescriptor .builder() .table(tableIdentifier) - .select("intField", "stringField") + .select(new String[]{"intField", "stringField"}) .limit(5) .create(); - List<CarbonRow> result = store.select(select); + List<CarbonRow> result = store.scan(select); Assert.assertEquals(5, result.size()); // select row with filter - SelectDescriptor select2 = SelectDescriptor + ScanDescriptor select2 = ScanDescriptor .builder() .table(tableIdentifier) - .select("intField", "stringField") + .select(new String[]{"intField", "stringField"}) .filter(new EqualToExpression( new ColumnExpression("intField", DataTypes.INT), new LiteralExpression(11, DataTypes.INT))) .limit(5) .create(); - List<CarbonRow> result2 = store.select(select2); + List<CarbonRow> result2 = store.scan(select2); Assert.assertEquals(1, result2.size()); store.dropTable(tableIdentifier); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java index 420c8cf..8cf7e14 100644 --- a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java +++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java @@ -22,20 +22,19 @@ import java.io.IOException; import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; -import org.apache.carbondata.store.api.CarbonStore; -import org.apache.carbondata.store.api.CarbonStoreFactory; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.api.descriptor.LoadDescriptor; -import org.apache.carbondata.store.api.descriptor.SelectDescriptor; -import org.apache.carbondata.store.api.descriptor.TableDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.sdk.store.CarbonStore; +import org.apache.carbondata.sdk.store.CarbonStoreFactory; +import org.apache.carbondata.sdk.store.conf.StoreConf; import org.junit.After; import org.junit.AfterClass; @@ -50,7 +49,7 @@ public class LocalCarbonStoreTest { private static CarbonStore store; @BeforeClass - public static void setup() throws IOException, StoreException { + public static void setup() throws IOException, CarbonException { StoreConf conf = new StoreConf("test", "./"); conf.conf(StoreConf.STORE_TEMP_LOCATION, "./temp"); store = CarbonStoreFactory.getLocalStore("LocalCarbonStoreTest", conf); @@ -74,13 +73,13 @@ public class LocalCarbonStoreTest { } @Test - public void testWriteAndReadFiles() throws IOException, StoreException { + public void testWriteAndReadFiles() throws IOException, CarbonException { TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default"); store.dropTable(tableIdentifier); - TableDescriptor table = TableDescriptor + TableDescriptor descriptor = TableDescriptor .builder() - .ifNotExists() .table(tableIdentifier) + .ifNotExists() .comment("first table") .column("shortField", DataTypes.SHORT, "short field") .column("intField", DataTypes.INT, "int field") @@ -94,7 +93,7 @@ public class LocalCarbonStoreTest { .column("floatField", DataTypes.DOUBLE, "float field") .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField") .create(); - store.createTable(table); + store.createTable(descriptor); // load one segment LoadDescriptor load = LoadDescriptor @@ -107,26 +106,26 @@ public class LocalCarbonStoreTest { store.loadData(load); // select row - SelectDescriptor select = SelectDescriptor + ScanDescriptor select = ScanDescriptor .builder() .table(tableIdentifier) - .select("intField", "stringField") + .select(new String[]{"intField", "stringField"}) .limit(5) .create(); - List<CarbonRow> result = store.select(select); + List<CarbonRow> result = store.scan(select); Assert.assertEquals(5, result.size()); // select row with filter - SelectDescriptor select2 = SelectDescriptor + ScanDescriptor select2 = ScanDescriptor .builder() .table(tableIdentifier) - .select("intField", "stringField") + .select(new String[]{"intField", "stringField"}) .filter(new EqualToExpression( new ColumnExpression("intField", DataTypes.INT), new LiteralExpression(11, DataTypes.INT))) .limit(5) .create(); - List<CarbonRow> result2 = store.select(select2); + List<CarbonRow> result2 = store.scan(select2); Assert.assertEquals(1, result2.size()); store.dropTable(tableIdentifier); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java index f73591c..dffc8a7 100644 --- a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java +++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java @@ -18,21 +18,11 @@ package org.apache.carbondata.store; import java.io.File; -import java.io.FileFilter; import java.io.IOException; -import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.sdk.file.CarbonWriter; -import org.apache.carbondata.sdk.file.CarbonWriterBuilder; -import org.apache.carbondata.sdk.file.Schema; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.util.StoreUtil; - -import org.junit.Assert; public class TestUtil { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java index eaa4583..e1b039e 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java @@ -28,7 +28,7 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.exception.CarbonException; /** * Client to send REST request to Horizon service @@ -40,9 +40,9 @@ public interface HorizonClient extends Closeable { /** * Create a Table * @param create descriptor for create table operation - * @throws IOException if network or disk IO error occurs + * @throws CarbonException if network or disk IO error occurs */ - void createTable(CreateTableRequest create) throws IOException, StoreException; + void createTable(CreateTableRequest create) throws CarbonException; /** * Drop a Table, and remove all data in it @@ -56,7 +56,7 @@ public interface HorizonClient extends Closeable { * @param load descriptor for load operation * @throws IOException if network or disk IO error occurs */ - void loadData(LoadRequest load) throws IOException, StoreException; + void loadData(LoadRequest load) throws IOException, CarbonException; /** * Scan a Table and return matched rows @@ -64,7 +64,7 @@ public interface HorizonClient extends Closeable { * @return matched rows * @throws IOException if network or disk IO error occurs */ - List<CarbonRow> select(SelectRequest select) throws IOException, StoreException; + List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException; /** * Executor a SQL statement http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java index 076df70..b24c8d2 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java @@ -31,7 +31,7 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; import org.apache.carbondata.horizon.rest.model.view.SelectResponse; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.exception.CarbonException; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; @@ -52,7 +52,7 @@ public class SimpleHorizonClient implements HorizonClient { } @Override - public void createTable(CreateTableRequest create) throws IOException, StoreException { + public void createTable(CreateTableRequest create) throws CarbonException { Objects.requireNonNull(create); restTemplate.postForEntity(serviceUri + "/table/create", create, String.class); } @@ -64,18 +64,18 @@ public class SimpleHorizonClient implements HorizonClient { } @Override - public void loadData(LoadRequest load) throws IOException, StoreException { + public void loadData(LoadRequest load) throws IOException, CarbonException { Objects.requireNonNull(load); restTemplate.postForEntity(serviceUri + "/table/load", load, String.class); } @Override - public List<CarbonRow> select(SelectRequest select) throws IOException, StoreException { + public List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException { Objects.requireNonNull(select); ResponseEntity<SelectResponse> response = restTemplate.postForEntity(serviceUri + "/table/select", select, SelectResponse.class); - Object[][] rows = Objects.requireNonNull(response.getBody()).getRows(); - List<CarbonRow> output = new ArrayList<>(rows.length); + List<Object[]> rows = Objects.requireNonNull(response.getBody()).getRows(); + List<CarbonRow> output = new ArrayList<>(rows.size()); for (Object[] row : rows) { output.add(new CarbonRow(row)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java index a30b587..cffca07 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java @@ -17,7 +17,8 @@ package org.apache.carbondata.horizon.rest.controller; -import org.apache.carbondata.store.api.conf.StoreConf; + +import org.apache.carbondata.sdk.store.conf.StoreConf; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java index a273f54..d3d1bd7 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java @@ -18,7 +18,8 @@ package org.apache.carbondata.horizon.rest.controller; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -36,14 +37,17 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; import org.apache.carbondata.horizon.rest.model.view.SelectResponse; -import org.apache.carbondata.store.api.CarbonStore; -import org.apache.carbondata.store.api.CarbonStoreFactory; -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.api.descriptor.LoadDescriptor; -import org.apache.carbondata.store.api.descriptor.SelectDescriptor; -import org.apache.carbondata.store.api.descriptor.TableDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.store.devapi.InternalCarbonStore; +import org.apache.carbondata.store.devapi.InternalCarbonStoreFactory; +import org.apache.carbondata.store.devapi.ResultBatch; +import org.apache.carbondata.store.devapi.ScanUnit; +import org.apache.carbondata.store.devapi.Scanner; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -59,15 +63,15 @@ public class HorizonController { private static LogService LOGGER = LogServiceFactory.getLogService(HorizonController.class.getName()); - private CarbonStore store; + private InternalCarbonStore store; - public HorizonController() throws StoreException { + public HorizonController() throws CarbonException { String storeFile = System.getProperty("carbonstore.conf.file"); StoreConf storeConf = new StoreConf(); try { storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath()) .conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost().getHostAddress()) - .conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort()) + .conf(StoreConf.STORE_PORT, CarbonProperties.getSearchMasterPort()) .conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost().getHostAddress()) .conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort()) .conf(StoreConf.WORKER_CORE_NUM, 2); @@ -76,13 +80,10 @@ public class HorizonController { storeConf.load(storeFile); } - } catch (UnknownHostException e) { - throw new StoreException(e); + store = InternalCarbonStoreFactory.getStore(storeConf); } catch (IOException e) { - throw new StoreException(e); + throw new CarbonException(e); } - - store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf); } @RequestMapping(value = "echo") @@ -92,7 +93,7 @@ public class HorizonController { @RequestMapping(value = "/table/create", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<String> createTable( - @RequestBody CreateTableRequest request) throws StoreException, IOException { + @RequestBody CreateTableRequest request) throws CarbonException { RequestValidator.validateTable(request); TableDescriptor tableDescriptor = request.convertToDto(); store.createTable(tableDescriptor); @@ -101,7 +102,7 @@ public class HorizonController { @RequestMapping(value = "/table/drop", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<String> dropTable( - @RequestBody DropTableRequest request) throws StoreException, IOException { + @RequestBody DropTableRequest request) throws CarbonException { RequestValidator.validateDrop(request); store.dropTable(new TableIdentifier(request.getTableName(), request.getDatabaseName())); return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK); @@ -109,7 +110,7 @@ public class HorizonController { @RequestMapping(value = "/table/load", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<String> load(@RequestBody LoadRequest request) - throws StoreException, IOException { + throws CarbonException, IOException { RequestValidator.validateLoad(request); LoadDescriptor loadDescriptor = request.convertToDto(); store.loadData(loadDescriptor); @@ -118,22 +119,29 @@ public class HorizonController { @RequestMapping(value = "/table/select", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<SelectResponse> select(@RequestBody SelectRequest request) - throws StoreException, IOException { + throws CarbonException { long start = System.currentTimeMillis(); RequestValidator.validateSelect(request); TableIdentifier table = new TableIdentifier(request.getTableName(), request.getDatabaseName()); - CarbonTable carbonTable = store.getTable(table); + CarbonTable carbonTable = store.getCarbonTable(table); Expression expression = Parser.parseFilter(request.getFilter(), carbonTable); - SelectDescriptor selectDescriptor = new SelectDescriptor( + Scanner<CarbonRow> scanner = store.newScanner(table); + List<ScanUnit> scanUnits = scanner.prune(table, expression); + ScanDescriptor scanDescriptor = new ScanDescriptor( table, request.getSelect(), expression, request.getLimit()); - List<CarbonRow> result = store.select(selectDescriptor); - Iterator<CarbonRow> iterator = result.iterator(); - Object[][] output = new Object[result.size()][]; - int i = 0; - while (iterator.hasNext()) { - output[i] = (iterator.next().getData()); - i++; + ArrayList<Object[]> output = new ArrayList<>(); + for (ScanUnit scanUnit : scanUnits) { + Iterator<? extends ResultBatch<CarbonRow>> iterator = scanner.scan( + scanUnit, scanDescriptor, new HashMap<String, String>()); + + while (iterator.hasNext()) { + ResultBatch<CarbonRow> rows = iterator.next(); + while (rows.hasNext()) { + output.add(rows.next().getData()); + } + } } + long end = System.currentTimeMillis(); LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " + request.getDatabaseName() + "." + request.getTableName() + http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java index fbba57b..15a30cb 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java @@ -21,63 +21,63 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; import org.apache.carbondata.horizon.rest.model.view.DropTableRequest; import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.exception.CarbonException; import org.apache.commons.lang.StringUtils; public class RequestValidator { - public static void validateSelect(SelectRequest request) throws StoreException { + public static void validateSelect(SelectRequest request) throws CarbonException { if (request == null) { - throw new StoreException("Select should not be null"); + throw new CarbonException("Select should not be null"); } if (StringUtils.isEmpty(request.getDatabaseName())) { - throw new StoreException("database name is invalid"); + throw new CarbonException("database name is invalid"); } if (StringUtils.isEmpty(request.getTableName())) { - throw new StoreException("table name is invalid"); + throw new CarbonException("table name is invalid"); } } - public static void validateTable(CreateTableRequest request) throws StoreException { + public static void validateTable(CreateTableRequest request) throws CarbonException { if (request == null) { - throw new StoreException("TableDescriptor should not be null"); + throw new CarbonException("TableDescriptor should not be null"); } if (StringUtils.isEmpty(request.getDatabaseName())) { - throw new StoreException("database name is invalid"); + throw new CarbonException("database name is invalid"); } if (StringUtils.isEmpty(request.getTableName())) { - throw new StoreException("table name is invalid"); + throw new CarbonException("table name is invalid"); } if (request.getFields() == null || request.getFields().length == 0) { - throw new StoreException("fields should not be empty"); + throw new CarbonException("fields should not be empty"); } } - public static void validateLoad(LoadRequest request) throws StoreException { + public static void validateLoad(LoadRequest request) throws CarbonException { if (request == null) { - throw new StoreException("LoadDescriptor should not be null"); + throw new CarbonException("LoadDescriptor should not be null"); } if (StringUtils.isEmpty(request.getDatabaseName())) { - throw new StoreException("database name is invalid"); + throw new CarbonException("database name is invalid"); } if (StringUtils.isEmpty(request.getTableName())) { - throw new StoreException("table name is invalid"); + throw new CarbonException("table name is invalid"); } if (StringUtils.isEmpty(request.getInputPath())) { - throw new StoreException("input path is invalid"); + throw new CarbonException("input path is invalid"); } } - public static void validateDrop(DropTableRequest request) throws StoreException { + public static void validateDrop(DropTableRequest request) throws CarbonException { if (request == null) { - throw new StoreException("DropTableRequest should not be null"); + throw new CarbonException("DropTableRequest should not be null"); } if (StringUtils.isEmpty(request.getDatabaseName())) { - throw new StoreException("database name is invalid"); + throw new CarbonException("database name is invalid"); } if (StringUtils.isEmpty(request.getTableName())) { - throw new StoreException("table name is invalid"); + throw new CarbonException("table name is invalid"); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java index cf59f7f..623abee 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.carbondata.sdk.file.Field; import org.apache.carbondata.sdk.file.Schema; -import org.apache.carbondata.store.api.descriptor.TableDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; public class CreateTableRequest extends Request { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java index b809d9e..200b3a2 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java @@ -108,7 +108,7 @@ public class FieldRequest { field.setPrecision(precision); field.setScale(scale); field.setColumnComment(comment); - field.setChildren(new LinkedList<StructField>()); + field.setChildren(new LinkedList<Field>()); return field; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java index c91f5f5..dfe21f6 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java @@ -20,8 +20,8 @@ package org.apache.carbondata.horizon.rest.model.view; import java.util.HashMap; import java.util.Map; -import org.apache.carbondata.store.api.descriptor.LoadDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; public class LoadRequest extends Request { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java index 6bf5c75..e4f8cf8 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java @@ -17,23 +17,22 @@ package org.apache.carbondata.horizon.rest.model.view; +import java.util.List; + public class SelectResponse extends Response { - private Object[][] rows; + private List<Object[]> rows; public SelectResponse() { } - public SelectResponse(SelectRequest request, String message, Object[][] rows) { + public SelectResponse(SelectRequest request, String message, List<Object[]> rows) { super(request, message); this.rows = rows; } - public Object[][] getRows() { + public List<Object[]> getRows() { return rows; } - public void setRows(Object[][] rows) { - this.rows = rows; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java index 91a9dba..59c4bd1 100644 --- a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java +++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java @@ -32,9 +32,9 @@ import org.apache.carbondata.horizon.rest.model.view.LoadRequest; import org.apache.carbondata.horizon.rest.model.view.SelectRequest; import org.apache.carbondata.horizon.rest.model.view.SelectResponse; import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.api.exception.CarbonException; import org.apache.carbondata.store.impl.worker.Worker; -import org.apache.carbondata.store.util.StoreUtil; +import org.apache.carbondata.sdk.store.util.StoreUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -109,13 +109,13 @@ public class HorizonTest { SelectRequest select = createSelectRequest(5, null, "intField", "stringField"); SelectResponse result = restTemplate.postForObject(serviceUri + "/table/select", select, SelectResponse.class); - Assert.assertEquals(5, result.getRows().length); + Assert.assertEquals(5, result.getRows().size()); // select row with filter SelectRequest filter = createSelectRequest(5, "intField = 11", "intField", "stringField"); SelectResponse filterResult = restTemplate.postForObject(serviceUri + "/table/select", filter, SelectResponse.class); - Assert.assertEquals(1, filterResult.getRows().length); + Assert.assertEquals(1, filterResult.getRows().size()); request = createDropTableRequest(); response = restTemplate.postForObject(serviceUri + "/table/drop", request, String.class); @@ -173,7 +173,7 @@ public class HorizonTest { } @Test - public void testHorizonClient() throws IOException, StoreException { + public void testHorizonClient() throws IOException, CarbonException { HorizonClient client = new SimpleHorizonClient(serviceUri); DropTableRequest drop = createDropTableRequest(); client.dropTable(drop); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index aecf7e2..b16a290 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -51,8 +51,8 @@ <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index fdd1f5a..771896b 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; @@ -30,6 +31,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDi import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; @@ -239,9 +241,9 @@ public class AvroCarbonWriter extends CarbonWriter { return new Field(FieldName, DataTypes.DOUBLE); case RECORD: // recursively get the sub fields - ArrayList<StructField> structSubFields = new ArrayList<>(); + ArrayList<Field> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); if (structField != null) { structSubFields.add(structField); } @@ -249,9 +251,9 @@ public class AvroCarbonWriter extends CarbonWriter { return new Field(FieldName, "struct", structSubFields); case ARRAY: // recursively get the sub fields - ArrayList<StructField> arraySubField = new ArrayList<>(); + ArrayList<Field> arraySubField = new ArrayList<>(); // array will have only one sub field. - StructField structField = prepareSubFields("val", childSchema.getElementType()); + Field structField = prepareSubFields("val", childSchema.getElementType()); if (structField != null) { arraySubField.add(structField); return new Field(FieldName, "array", arraySubField); @@ -266,51 +268,51 @@ public class AvroCarbonWriter extends CarbonWriter { } } - private static StructField prepareSubFields(String FieldName, Schema childSchema) { + private static Field prepareSubFields(String FieldName, Schema childSchema) { Schema.Type type = childSchema.getType(); LogicalType logicalType = childSchema.getLogicalType(); switch (type) { case BOOLEAN: - return new StructField(FieldName, DataTypes.BOOLEAN); + return new Field(FieldName, DataTypes.BOOLEAN); case INT: if (logicalType instanceof LogicalTypes.Date) { - return new StructField(FieldName, DataTypes.DATE); + return new Field(FieldName, DataTypes.DATE); } else { LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema .getName()); - return new StructField(FieldName, DataTypes.INT); + return new Field(FieldName, DataTypes.INT); } case LONG: if (logicalType instanceof LogicalTypes.TimestampMillis || logicalType instanceof LogicalTypes.TimestampMicros) { - return new StructField(FieldName, DataTypes.TIMESTAMP); + return new Field(FieldName, DataTypes.TIMESTAMP); } else { LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema .getName()); - return new StructField(FieldName, DataTypes.LONG); + return new Field(FieldName, DataTypes.LONG); } case DOUBLE: - return new StructField(FieldName, DataTypes.DOUBLE); + return new Field(FieldName, DataTypes.DOUBLE); case STRING: - return new StructField(FieldName, DataTypes.STRING); + return new Field(FieldName, DataTypes.STRING); case FLOAT: - return new StructField(FieldName, DataTypes.DOUBLE); + return new Field(FieldName, DataTypes.DOUBLE); case RECORD: // recursively get the sub fields - ArrayList<StructField> structSubFields = new ArrayList<>(); + ArrayList<Field> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); if (structField != null) { structSubFields.add(structField); } } - return (new StructField(FieldName, DataTypes.createStructType(structSubFields))); + return (new Field(FieldName, createStructType(structSubFields))); case ARRAY: // recursively get the sub fields // array will have only one sub field. DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType()); if (subType != null) { - return (new StructField(FieldName, DataTypes.createArrayType(subType))); + return (new Field(FieldName, DataTypes.createArrayType(subType))); } else { return null; } @@ -322,6 +324,14 @@ public class AvroCarbonWriter extends CarbonWriter { } } + private static StructType createStructType(List<Field> fields) { + List<StructField> f = fields.stream().map(field -> + new StructField(field.getFieldName(), field.getDataType(), + createStructType(field.getChildren()).getFields()) + ).collect(Collectors.toList()); + return DataTypes.createStructType(f); + } + private static DataType getMappingDataTypeForArrayRecord(Schema childSchema) { LogicalType logicalType = childSchema.getLogicalType(); switch (childSchema.getType()) { @@ -360,14 +370,14 @@ public class AvroCarbonWriter extends CarbonWriter { return DataTypes.DOUBLE; case RECORD: // recursively get the sub fields - ArrayList<StructField> structSubFields = new ArrayList<>(); + ArrayList<Field> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); if (structField != null) { structSubFields.add(structField); } } - return DataTypes.createStructType(structSubFields); + return createStructType(structSubFields); case ARRAY: // array will have only one sub field. DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index a9d725f..92ed0d8 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -513,8 +513,8 @@ public class CarbonWriterBuilder { } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) { // Loop through the inner columns and for a StructData List<StructField> structFieldsArray = - new ArrayList<StructField>(field.getChildren().size()); - for (StructField childFld : field.getChildren()) { + new ArrayList<>(field.getChildren().size()); + for (Field childFld : field.getChildren()) { structFieldsArray .add(new StructField(childFld.getFieldName(), childFld.getDataType())); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java index add10c1..0d70c3b 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -17,13 +17,13 @@ package org.apache.carbondata.sdk.file; +import java.io.Serializable; import java.util.LinkedList; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.DataTypeUtil; @@ -32,11 +32,11 @@ import org.apache.carbondata.core.util.DataTypeUtil; */ @InterfaceAudience.User @InterfaceStability.Unstable -public class Field { +public class Field implements Serializable { private String name; private DataType type; - private List<StructField> children; + private List<Field> children; private String parent; private String storeType = "columnnar"; private int schemaOrdinal = -1; @@ -54,11 +54,11 @@ public class Field { this(name, DataTypeUtil.valueOf(type)); } - public Field(String name, String type, List<StructField> fields) { + public Field(String name, String type, List<Field> fields) { this(name, DataTypeUtil.valueOf(type), fields); } - public Field(String name, DataType type, List<StructField> fields) { + public Field(String name, DataType type, List<Field> fields) { this.name = name; this.type = type; this.children = fields; @@ -91,11 +91,11 @@ public class Field { return type; } - public List<StructField> getChildren() { + public List<Field> getChildren() { return children; } - public void setChildren(List<StructField> children) { + public void setChildren(List<Field> children) { this.children = children; } @@ -150,4 +150,5 @@ public class Field { public void setColumnComment(String columnComment) { this.columnComment = columnComment; } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java index c9622e1..075ae71 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java @@ -18,6 +18,7 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -44,7 +45,7 @@ import org.apache.commons.lang.StringUtils; */ @InterfaceAudience.User @InterfaceStability.Unstable -public class Schema { +public class Schema implements Serializable { private Field[] fields; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java new file mode 100644 index 0000000..0472b75 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +/** + * Public Interface of CarbonStore + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface CarbonStore extends Closeable { + + //////////////////////////////////////////////////////////////////// + ///// Metadata Operation ///// + //////////////////////////////////////////////////////////////////// + + /** + * Create a Table + * @param descriptor descriptor for create table operation + * @throws CarbonException if any error occurs + */ + void createTable(TableDescriptor descriptor) throws CarbonException; + + /** + * Drop a Table, and remove all data in it + * @param table table identifier + * @throws CarbonException if any error occurs + */ + void dropTable(TableIdentifier table) throws CarbonException; + + /** + * @return all table created + * @throws CarbonException if any error occurs + */ + List<TableDescriptor> listTable() throws CarbonException; + + /** + * Return table descriptor by specified identifier + * @param table table identifier + * @return table descriptor + * @throws CarbonException if any error occurs + */ + TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException; + + /** + * Alter table operation + * @param table table identifier + * @param newTable new table descriptor to alter to + * @throws CarbonException if any error occurs + */ + void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException; + + + //////////////////////////////////////////////////////////////////// + ///// Write Operation ///// + //////////////////////////////////////////////////////////////////// + + /** + * Trigger a Load into the table specified by load descriptor + * @param load descriptor for load operation + * @throws CarbonException if any error occurs + */ + void loadData(LoadDescriptor load) throws CarbonException; + + /** + * Return true if this table has primary key defined when create table using + * {@link #createTable(TableDescriptor)} + * + * For such table, upsert, delete and lookup is supported + * + * @return true if this table has primary key. + */ + default boolean isPrimaryKeyDefined(TableIdentifier identifier) { + return false; + } + + /** + * Insert a batch of rows if key is not exist, otherwise update the row + * @param row rows to be upsert + * @param schema schema of the input row (fields without the primary key) + * @throws CarbonException if any error occurs + */ + void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException; + + /** + * Delete a batch of rows + * @param keys keys to be deleted + * @throws CarbonException if any error occurs + */ + void delete(Iterator<PrimaryKey> keys) throws CarbonException; + + + //////////////////////////////////////////////////////////////////// + ///// Read Operation ///// + //////////////////////////////////////////////////////////////////// + + /** + * Scan the specified table and return matched rows + * + * @param select descriptor for scan operation + * @return matched rows + * @throws CarbonException if any error occurs + */ + List<CarbonRow> scan(ScanDescriptor select) throws CarbonException; + + /** + * Lookup and return a row with specified primary key + * @param key key to lookup + * @return matched row for the specified key + * @throws CarbonException if any error occurs + */ + Row lookup(PrimaryKey key) throws CarbonException; + + /** + * Lookup by filter expression and return a list of matched row + * + * @param tableIdentifier table identifier + * @param filterExpression filter expression, like "col3 = 1" + * @return matched row for the specified filter + * @throws CarbonException if any error occurs + */ + List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression) throws CarbonException; +}