Repository: carbondata Updated Branches: refs/heads/carbonstore a6027ae11 -> 9f10122af
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStoreFactory.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStoreFactory.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStoreFactory.java new file mode 100644 index 0000000..fcbd88d --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStoreFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +/** + * Factory class to create {@link CarbonStore} + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public class CarbonStoreFactory { + private static final Map<String, CarbonStore> distributedStore = new ConcurrentHashMap<>(); + private static final Map<String, CarbonStore> localStores = new ConcurrentHashMap<>(); + + private CarbonStoreFactory() { + } + + public static synchronized CarbonStore getDistributedStore( + String storeName, StoreConf storeConf) throws CarbonException { + if (distributedStore.containsKey(storeName)) { + return distributedStore.get(storeName); + } + + // create a new instance + try { + String className = "org.apache.carbondata.sdk.store.DistributedCarbonStore"; + CarbonStore store = createCarbonStore(storeConf, className); + distributedStore.put(storeName, store); + return store; + } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | + InstantiationException e) { + throw new CarbonException(e); + } + } + + public static synchronized void removeDistributedStore(String storeName) throws IOException { + if (distributedStore.containsKey(storeName)) { + distributedStore.get(storeName).close(); + distributedStore.remove(storeName); + } + } + + public static synchronized CarbonStore getLocalStore(String storeName, StoreConf storeConf) + throws CarbonException { + if (localStores.containsKey(storeName)) { + return localStores.get(storeName); + } + + // create a new instance + try { + String className = "org.apache.carbondata.store.impl.LocalCarbonStore"; + CarbonStore store = createCarbonStore(storeConf, className); + localStores.put(storeName, store); + return store; + } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | + InstantiationException e) { + throw new CarbonException(e); + } + } + + private static CarbonStore createCarbonStore(StoreConf storeConf, String className) + throws ClassNotFoundException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor[] constructor = Class.forName(className).getDeclaredConstructors(); + constructor[0].setAccessible(true); + return (CarbonStore) constructor[0].newInstance(storeConf); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/DistributedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/DistributedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/DistributedCarbonStore.java new file mode 100644 index 0000000..7834909 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/DistributedCarbonStore.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.sdk.store.service.ServiceFactory; +import org.apache.carbondata.sdk.store.service.StoreService; + +/** + * A CarbonStore that leverage multiple servers via RPC calls (Master and Workers) + */ +@InterfaceAudience.User +public class DistributedCarbonStore implements CarbonStore { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DistributedCarbonStore.class.getCanonicalName()); + + protected StoreService storeService; + + public DistributedCarbonStore(StoreConf conf) throws IOException { + this.storeService = + ServiceFactory.createStoreService(conf.masterHost(), conf.storeServicePort()); + } + + @Override + public void createTable(TableDescriptor descriptor) throws CarbonException { + try { + storeService.createTable(descriptor); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + @Override + public void dropTable(TableIdentifier table) throws CarbonException { + try { + storeService.dropTable(table); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + @Override + public List<TableDescriptor> listTable() throws CarbonException { + return storeService.listTable(); + } + + @Override + public TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException { + TableInfo tableInfo = storeService.getTable(table); + // TODO: create TableDescriptor from table info + return null; + } + + @Override + public void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException { + storeService.alterTable(table, newTable); + } + + @Override + public void loadData(LoadDescriptor load) throws CarbonException { + storeService.loadData(load); + } + + @Override + public void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException { + throw new UnsupportedOperationException(); + } + + @Override + public void delete(Iterator<PrimaryKey> keys) throws CarbonException { + throw new UnsupportedOperationException(); + } + + @Override + public List<CarbonRow> scan(ScanDescriptor select) throws CarbonException { + try { + return storeService.scan(select); + } catch (Exception e) { + System.out.println(e.getMessage()); + return null; + } + } + + @Override + public Row lookup(PrimaryKey key) throws CarbonException { + throw new UnsupportedOperationException(); + } + + @Override + public List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression) + throws CarbonException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/KeyedRow.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/KeyedRow.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/KeyedRow.java new file mode 100644 index 0000000..bff02e0 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/KeyedRow.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; + +@InterfaceAudience.User +@InterfaceStability.Unstable +public class KeyedRow { + private PrimaryKey key; + private CarbonRow row; + + public void setKey(PrimaryKey key) { + this.key = key; + } + + public void setRow(CarbonRow row) { + this.row = row; + } + + public PrimaryKey getKey() { + return key; + } + + public CarbonRow getRow() { + return row; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/PrimaryKey.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/PrimaryKey.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/PrimaryKey.java new file mode 100644 index 0000000..e18c5ad --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/PrimaryKey.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Unstable +public class PrimaryKey { +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/Row.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/Row.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/Row.java new file mode 100644 index 0000000..97d2e14 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/Row.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface Row { + Object[] get(); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/conf/StoreConf.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/conf/StoreConf.java new file mode 100644 index 0000000..c6c7cf6 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/conf/StoreConf.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.conf; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.sdk.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class StoreConf implements Serializable, Writable { + + public static final String SELECT_PROJECTION = "carbon.select.projection"; + public static final String SELECT_FILTER = "carbon.select.filter"; + public static final String SELECT_LIMIT = "carbon.select.limit"; + + public static final String SELECT_ID = "carbon.select.id"; + + public static final String WORKER_HOST = "carbon.worker.host"; + public static final String WORKER_PORT = "carbon.worker.port"; + public static final String WORKER_CORE_NUM = "carbon.worker.core.num"; + public static final String MASTER_HOST = "carbon.master.host"; + public static final String REGISTRY_PORT = "carbon.master.registry.port"; + public static final String PRUNE_PORT = "carbon.master.prune.port"; + public static final String STORE_PORT = "carbon.master.store.port"; + + public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location"; + public static final String STORE_LOCATION = "carbon.store.location"; + public static final String STORE_NAME = "carbon.store.name"; + + public static final String STORE_CONF_FILE = "carbon.store.confFile"; + + private Map<String, String> conf = new HashMap<>(); + + public StoreConf() { + String storeConfFile = System.getProperty(STORE_CONF_FILE); + if (storeConfFile != null) { + load(storeConfFile); + } + } + + public StoreConf(String storeName, String storeLocation) { + conf.put(STORE_NAME, storeName); + conf.put(STORE_LOCATION, storeLocation); + } + + public StoreConf(String confFilePath) { + load(confFilePath); + } + + public StoreConf conf(String key, String value) { + conf.put(key, value); + return this; + } + + public StoreConf conf(String key, int value) { + conf.put(key, "" + value); + return this; + } + + public void load(String filePath) { + StoreUtil.loadProperties(filePath, this); + } + + public void conf(StoreConf conf) { + this.conf.putAll(conf.conf); + } + + public Object conf(String key) { + return conf.get(key); + } + + public String[] projection() { + return stringArrayValue(SELECT_PROJECTION); + } + + public String filter() { + return stringValue(SELECT_FILTER); + } + + public int limit() { + return intValue(SELECT_LIMIT); + } + + public String masterHost() { + return stringValue(MASTER_HOST); + } + + public int registryServicePort() { + return intValue(REGISTRY_PORT); + } + + public int pruneServicePort() { + return intValue(PRUNE_PORT); + } + + public int storeServicePort() { + return intValue(STORE_PORT); + } + + public String workerHost() { + return stringValue(WORKER_HOST); + } + + public int workerPort() { + return intValue(WORKER_PORT); + } + + public int workerCoreNum() { + return intValue(WORKER_CORE_NUM); + } + + public String storeLocation() { + return stringValue(STORE_LOCATION); + } + + public String[] storeTempLocation() { + return stringArrayValue(STORE_TEMP_LOCATION); + } + + public String selectId() { + return stringValue(SELECT_ID); + } + + public Configuration newHadoopConf() { + Configuration hadoopConf = FileFactory.getConfiguration(); + for (Map.Entry<String, String> entry : conf.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key != null && value != null && key.startsWith("carbon.hadoop.")) { + hadoopConf.set(key.substring("carbon.hadoop.".length()), value); + } + } + return hadoopConf; + } + + private String stringValue(String key) { + Object obj = conf.get(key); + if (obj == null) { + return null; + } + return obj.toString(); + } + + private int intValue(String key) { + String value = conf.get(key); + if (value == null) { + return -1; + } + return Integer.parseInt(value); + } + + private String[] stringArrayValue(String key) { + String value = conf.get(key); + if (value == null) { + return null; + } + return value.split(",", -1); + } + + @Override public void write(DataOutput out) throws IOException { + Set<Map.Entry<String, String>> entries = conf.entrySet(); + WritableUtils.writeVInt(out, conf.size()); + for (Map.Entry<String, String> entry : entries) { + WritableUtils.writeString(out, entry.getKey()); + WritableUtils.writeString(out, entry.getValue()); + } + } + + @Override public void readFields(DataInput in) throws IOException { + if (conf == null) { + conf = new HashMap<>(); + } + + int size = WritableUtils.readVInt(in); + String key, value; + for (int i = 0; i < size; i++) { + key = WritableUtils.readString(in); + value = WritableUtils.readString(in); + conf.put(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/LoadDescriptor.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/LoadDescriptor.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/LoadDescriptor.java new file mode 100644 index 0000000..fc4308e --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/LoadDescriptor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.descriptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class LoadDescriptor implements Writable { + + private TableIdentifier table; + private String inputPath; + private Map<String, String> options; + private boolean isOverwrite; + + private LoadDescriptor() { + } + + public LoadDescriptor(TableIdentifier table, String inputPath, + Map<String, String> options, boolean isOverwrite) { + Objects.requireNonNull(table); + Objects.requireNonNull(inputPath); + this.table = table; + this.inputPath = inputPath; + this.options = options; + this.isOverwrite = isOverwrite; + } + + public TableIdentifier getTable() { + return table; + } + + public void setTable(TableIdentifier table) { + this.table = table; + } + + public String getInputPath() { + return inputPath; + } + + public void setInputPath(String inputPath) { + this.inputPath = inputPath; + } + + public Map<String, String> getOptions() { + return options; + } + + public void setOptions(Map<String, String> options) { + this.options = options; + } + + public boolean isOverwrite() { + return isOverwrite; + } + + public void setOverwrite(boolean overwrite) { + isOverwrite = overwrite; + } + + @Override + public void write(DataOutput out) throws IOException { + table.write(out); + out.writeUTF(inputPath); + out.writeInt(options.size()); + for (Map.Entry<String, String> entry : options.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + out.writeBoolean(isOverwrite); + } + + @Override + public void readFields(DataInput in) throws IOException { + table = new TableIdentifier(); + table.readFields(in); + inputPath = in.readUTF(); + int size = in.readInt(); + options = new HashMap<>(size); + for (int i = 0; i < size; i++) { + options.put(in.readUTF(), in.readUTF()); + } + isOverwrite = in.readBoolean(); + } + + public static class Builder { + private LoadDescriptor load; + private Map<String, String> options; + + private Builder() { + load = new LoadDescriptor(); + options = new HashMap<>(); + } + + public Builder table(TableIdentifier tableIdentifier) { + load.setTable(tableIdentifier); + return this; + } + + public Builder overwrite(boolean isOverwrite) { + load.setOverwrite(isOverwrite); + return this; + } + + public Builder inputPath(String inputPath) { + load.setInputPath(inputPath); + return this; + } + + public Builder options(String key, String value) { + options.put(key, value); + return this; + } + + public LoadDescriptor create() { + load.setOptions(options); + return load; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java new file mode 100644 index 0000000..f580fc5 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.descriptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class ScanDescriptor implements Serializable, Writable { + + private TableIdentifier table; + private String[] projection; + private Expression filter; + private long limit = Long.MAX_VALUE; + + private ScanDescriptor() { + } + + public ScanDescriptor(TableIdentifier table, String[] projection, + Expression filter, long limit) { + Objects.requireNonNull(table); + Objects.requireNonNull(projection); + this.table = table; + this.projection = projection; + this.filter = filter; + this.limit = limit; + } + + public TableIdentifier getTableIdentifier() { + return table; + } + + public void setTableIdentifier(TableIdentifier table) { + this.table = table; + } + + public String[] getProjection() { + return projection; + } + + public void setProjection(String[] projection) { + this.projection = projection; + } + + public Expression getFilter() { + return filter; + } + + public void setFilter(Expression filter) { + this.filter = filter; + } + + public long getLimit() { + return limit; + } + + public void setLimit(long limit) { + this.limit = limit; + } + + @Override + public void write(DataOutput out) throws IOException { + table.write(out); + out.writeInt(projection.length); + for (String s : projection) { + out.writeUTF(s); + } + out.writeBoolean(filter != null); + if (filter != null) { + out.writeUTF(ObjectSerializationUtil.convertObjectToString(filter)); + } + out.writeLong(limit); + } + + @Override + public void readFields(DataInput in) throws IOException { + table = new TableIdentifier(); + table.readFields(in); + int size = in.readInt(); + projection = new String[size]; + for (int i = 0; i < size; i++) { + projection[i] = in.readUTF(); + } + if (in.readBoolean()) { + filter = (Expression) ObjectSerializationUtil.convertStringToObject(in.readUTF()); + } + limit = in.readLong(); + } + + public static class Builder { + private ScanDescriptor select; + + private Builder() { + select = new ScanDescriptor(); + } + + public Builder table(TableIdentifier tableIdentifier) { + select.setTableIdentifier(tableIdentifier); + return this; + } + + public Builder select(String[] columnNames) { + select.setProjection(columnNames); + return this; + } + + public Builder filter(Expression filter) { + select.setFilter(filter); + return this; + } + + public Builder limit(long limit) { + select.setLimit(limit); + return this; + } + + public ScanDescriptor create() { + return select; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableDescriptor.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableDescriptor.java new file mode 100644 index 0000000..bdd5948 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableDescriptor.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.descriptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; + +import org.apache.hadoop.io.Writable; + +public class TableDescriptor implements Writable { + + private TableIdentifier table; + private boolean ifNotExists; + private String tablePath; + private Schema schema; + private Map<String, String> properties; + private String comment; + + private TableDescriptor() { + } + + public TableDescriptor(TableIdentifier table, Schema schema, + Map<String, String> properties, String tablePath, String comment, boolean ifNotExists) { + Objects.requireNonNull(table); + Objects.requireNonNull(schema); + this.table = table; + this.ifNotExists = ifNotExists; + this.schema = schema; + this.properties = properties; + this.tablePath = tablePath; + this.comment = comment; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + public void setIfNotExists(boolean ifNotExists) { + this.ifNotExists = ifNotExists; + } + + public TableIdentifier getTable() { + return table; + } + + public void setTable(TableIdentifier table) { + this.table = table; + } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public Map<String, String> getProperties() { + return properties; + } + + public void setProperties(Map<String, String> properties) { + this.properties = properties; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public void setTablePath(String tablePath) { + this.tablePath = tablePath; + } + + public String getTablePath() { + return tablePath; + } + + @Override + public void write(DataOutput out) throws IOException { + table.write(out); + out.writeBoolean(ifNotExists); + out.writeBoolean(tablePath != null); + if (tablePath != null) { + out.writeUTF(tablePath); + } + out.writeUTF(ObjectSerializationUtil.convertObjectToString(schema)); + out.writeInt(properties.size()); + for (Map.Entry<String, String> entry : properties.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + out.writeUTF(comment); + } + + @Override + public void readFields(DataInput in) throws IOException { + table = new TableIdentifier(); + table.readFields(in); + ifNotExists = in.readBoolean(); + if (in.readBoolean()) { + tablePath = in.readUTF(); + } + schema = (Schema) ObjectSerializationUtil.convertStringToObject(in.readUTF()); + int size = in.readInt(); + properties = new HashMap<>(size); + for (int i = 0; i < size; i++) { + properties.put(in.readUTF(), in.readUTF()); + } + comment = in.readUTF(); + } + + public static class Builder { + + private TableDescriptor table; + private List<Field> fields; + private Map<String, String> tblProperties; + + private Builder() { + table = new TableDescriptor(); + fields = new ArrayList<>(); + tblProperties = new HashMap<>(); + } + + public Builder ifNotExists() { + table.setIfNotExists(true); + return this; + } + + public Builder table(TableIdentifier tableId) { + table.setTable(tableId); + return this; + } + + public Builder tablePath(String tablePath) { + table.setTablePath(tablePath); + return this; + } + + public Builder comment(String tableComment) { + table.setComment(tableComment); + return this; + } + + public Builder column(String name, DataType dataType) { + fields.add(new Field(name, dataType)); + return this; + } + + public Builder column(String name, DataType dataType, String comment) { + Field field = new Field(name, dataType); + field.setColumnComment(comment); + fields.add(field); + return this; + } + + public Builder column(String name, DataType dataType, int precision, int scale, String comment) + { + Field field = new Field(name, dataType); + field.setColumnComment(comment); + field.setScale(scale); + field.setPrecision(precision); + fields.add(field); + return this; + } + + public Builder tblProperties(String key, String value) { + tblProperties.put(key, value); + return this; + } + + public TableDescriptor create() { + Field[] fieldArray = new Field[fields.size()]; + fieldArray = fields.toArray(fieldArray); + Schema schema = new Schema(fieldArray); + table.setSchema(schema); + table.setProperties(tblProperties); + return table; + } + } + + public static Builder builder() { + return new Builder(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableIdentifier.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableIdentifier.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableIdentifier.java new file mode 100644 index 0000000..c67a748 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/TableIdentifier.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.descriptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class TableIdentifier implements Serializable, Writable { + private String tableName; + private String databaseName; + + public TableIdentifier() { + } + + public TableIdentifier(String tableName, String databaseName) { + this.tableName = tableName; + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public String getDatabaseName() { + return databaseName; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(databaseName); + out.writeUTF(tableName); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.databaseName = in.readUTF(); + this.tableName = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/CarbonException.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/CarbonException.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/CarbonException.java new file mode 100644 index 0000000..e4e0b10 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/CarbonException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.exception; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class CarbonException extends Exception { + + public CarbonException() { + super(); + } + + public CarbonException(String message) { + super(message); + } + + public CarbonException(Exception e) { + super(e); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/ExecutionTimeoutException.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/ExecutionTimeoutException.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/ExecutionTimeoutException.java new file mode 100644 index 0000000..3ddd440 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/ExecutionTimeoutException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.exception; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class ExecutionTimeoutException extends RuntimeException { + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/SchedulerException.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/SchedulerException.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/SchedulerException.java new file mode 100644 index 0000000..8a0309f --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/exception/SchedulerException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.exception; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class SchedulerException extends RuntimeException { + + public SchedulerException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/ServiceFactory.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/ServiceFactory.java new file mode 100644 index 0000000..925ed91 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/ServiceFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.service; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +@InterfaceAudience.Internal +public class ServiceFactory { + + public static StoreService createStoreService(String host, int port) throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); + return RPC.getProxy(StoreService.class, StoreService.versionID, address, new Configuration()); + } + + public static void stopStoreService(StoreService service) { + RPC.stopProxy(service); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java new file mode 100644 index 0000000..7e54d2f --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.service; + +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +import org.apache.hadoop.ipc.VersionedProtocol; + +@InterfaceAudience.Internal +public interface StoreService extends VersionedProtocol { + long versionID = 1L; + + void createTable(TableDescriptor descriptor) throws CarbonException; + + void dropTable(TableIdentifier table) throws CarbonException; + + TableInfo getTable(TableIdentifier table) throws CarbonException; + + List<TableDescriptor> listTable() throws CarbonException; + + TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException; + + void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException; + + void loadData(LoadDescriptor loadDescriptor) throws CarbonException; + + List<CarbonRow> scan(ScanDescriptor scanDescriptor) throws CarbonException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/util/StoreUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/util/StoreUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/util/StoreUtil.java new file mode 100644 index 0000000..ca5740e --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/util/StoreUtil.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.Properties; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.sdk.store.conf.StoreConf; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.PropertyConfigurator; + +@InterfaceAudience.Internal +public class StoreUtil { + + private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName()); + + public static void loadProperties(String filePath, StoreConf conf) { + InputStream input = null; + try { + input = new FileInputStream(filePath); + Properties prop = new Properties(); + prop.load(input); + for (Map.Entry<Object, Object> entry : prop.entrySet()) { + conf.conf(entry.getKey().toString(), entry.getValue().toString()); + } + LOGGER.audit("loaded properties: " + filePath); + } catch (IOException ex) { + LOGGER.error(ex, "Failed to load properties file " + filePath); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + LOGGER.error(e); + } + } + } + } + + public static void initLog4j(String propertiesFilePath) { + BasicConfigurator.configure(); + PropertyConfigurator.configure(propertiesFilePath); + } + + public static byte[] serialize(Object object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + try { + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + } catch (IOException e) { + LOGGER.error(e); + } + return baos.toByteArray(); + } + + public static Object deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + return ois.readObject(); + } catch (IOException e) { + LOGGER.error(e); + } catch (ClassNotFoundException e) { + LOGGER.error(e); + } + return null; + } + + public static void configureCSVInputFormat(Configuration configuration, + CarbonLoadModel carbonLoadModel) { + CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar()); + CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter()); + CSVInputFormat.setSkipEmptyLine(configuration, carbonLoadModel.getSkipEmptyLine()); + CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar()); + CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns()); + CSVInputFormat.setNumberOfColumns(configuration, + "" + carbonLoadModel.getCsvHeaderColumns().length); + + CSVInputFormat.setHeaderExtractionEnabled( + configuration, + carbonLoadModel.getCsvHeader() == null || + StringUtils.isEmpty(carbonLoadModel.getCsvHeader())); + + CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar()); + + CSVInputFormat.setReadBufferSize( + configuration, + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CSV_READ_BUFFER_SIZE, + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + } + + public static void clearUnsafeMemory(long taskId) { + UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId); + UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java index 4c3b996..1910e04 100644 --- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java @@ -26,7 +26,7 @@ import org.apache.carbondata.horizon.rest.model.validate.RequestValidator; import org.apache.carbondata.horizon.rest.model.view.SqlRequest; import org.apache.carbondata.horizon.rest.model.view.SqlResponse; import org.apache.carbondata.horizon.rest.sql.SparkSqlWrapper; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.api.exception.CarbonException; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; @@ -46,7 +46,7 @@ public class SqlHorizonController { LogServiceFactory.getLogService(SqlHorizonController.class.getName()); @RequestMapping(value = "/table/sql", produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws StoreException { + public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws CarbonException { RequestValidator.validateSql(request); List<Row> rows; Dataset<Row> sqlDataFrame = null; @@ -56,10 +56,10 @@ public class SqlHorizonController { rows = sqlDataFrame.collectAsList(); } catch (AnalysisException e) { LOGGER.error(e); - throw new StoreException(e.getSimpleMessage()); + throw new CarbonException(e.getSimpleMessage()); } catch (Exception e) { LOGGER.error(e); - throw new StoreException(e.getMessage()); + throw new CarbonException(e.getMessage()); } final String[] fieldNames = sqlDataFrame.schema().fieldNames(); Object[][] responseData = new Object[0][]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java index 82e095a..dc82d6d 100644 --- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java @@ -18,18 +18,18 @@ package org.apache.carbondata.horizon.rest.model.validate; import org.apache.carbondata.horizon.rest.model.view.SqlRequest; -import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.sdk.store.exception.CarbonException; import org.apache.commons.lang.StringUtils; public class RequestValidator { - public static void validateSql(SqlRequest request) throws StoreException { + public static void validateSql(SqlRequest request) throws CarbonException { if (request == null) { - throw new StoreException("Select should not be null"); + throw new CarbonException("Select should not be null"); } if (StringUtils.isEmpty(request.getSqlStatement())) { - throw new StoreException("sql statement is invalid"); + throw new CarbonException("sql statement is invalid"); } } }