[CARBONDATA-2690][CarbonStore] implement RESTful API: create table, load data and select
This PR adds: 1.basic framework rewrite the carbon store's Master, Worker and Scheduler code in Java 2.RESTful API support create a table by using file meta store support load data to a table in single work support select data with a filter This closes #2440 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4437920a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4437920a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4437920a Branch: refs/heads/carbonstore Commit: 4437920a3174921d6397c4f596ff941e2cd0faa0 Parents: d9b40bf Author: QiangCai <qiang...@qq.com> Authored: Tue Jul 3 20:21:18 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Jul 18 10:10:15 2018 +0800 ---------------------------------------------------------------------- .../schema/table/CarbonTableBuilder.java | 9 +- .../schema/table/TableSchemaBuilder.java | 27 +- .../expression/conditional/ListExpression.java | 8 +- .../conditional/NotEqualsExpression.java | 7 +- .../scan/expression/logical/OrExpression.java | 2 +- .../expression/logical/RangeExpression.java | 2 +- .../carbondata/core/scan/model/QueryModel.java | 3 +- dev/findbugs-exclude.xml | 4 + dev/javastyle-suppressions.xml | 2 + .../carbondata/hadoop/CarbonRecordReader.java | 1 + .../hadoop/util/CarbonInputFormatUtil.java | 20 +- .../preaggregate/TestPreAggregateDrop.scala | 3 +- .../TestPreAggregateTableSelection.scala | 8 +- .../carbondata/store/SparkCarbonStore.scala | 40 +- pom.xml | 6 + .../loading/model/CarbonLoadModelBuilder.java | 16 +- .../processing/util/CarbonLoaderUtil.java | 2 +- store/conf/log4j.properties | 10 + store/conf/store.conf | 10 + store/core/pom.xml | 8 +- .../apache/carbondata/store/conf/StoreConf.java | 185 +++ .../exception/ExecutionTimeoutException.java | 22 + .../store/exception/StoreException.java | 29 + .../store/exception/WorkerTooBusyException.java | 26 + .../apache/carbondata/store/master/Master.java | 522 ++++++++ .../carbondata/store/rpc/QueryService.java | 33 - .../carbondata/store/rpc/RegistryService.java | 4 +- .../carbondata/store/rpc/ServiceFactory.java | 4 +- .../carbondata/store/rpc/StoreService.java | 40 + .../store/rpc/impl/IndexedRecordReader.java | 22 + .../store/rpc/impl/QueryServiceImpl.java | 56 - .../store/rpc/impl/RegistryServiceImpl.java | 4 +- .../store/rpc/impl/RequestHandler.java | 105 +- .../store/rpc/impl/StoreServiceImpl.java | 78 ++ .../store/rpc/model/BaseResponse.java | 69 ++ .../store/rpc/model/LoadDataRequest.java | 60 + .../store/rpc/model/QueryResponse.java | 21 +- .../store/rpc/model/RegisterWorkerRequest.java | 4 + .../carbondata/store/scheduler/Schedulable.java | 74 ++ .../carbondata/store/scheduler/Scheduler.java | 136 +++ .../apache/carbondata/store/util/StoreUtil.java | 132 +++ .../apache/carbondata/store/worker/Worker.java | 166 +++ .../org/apache/carbondata/store/Master.scala | 283 ----- .../org/apache/carbondata/store/Scheduler.scala | 147 --- .../org/apache/carbondata/store/Worker.scala | 113 -- .../carbondata/store/SchedulerSuite.scala | 155 --- store/horizon/pom.xml | 95 ++ store/horizon/src/main/anltr/Expression.g4 | 163 +++ .../horizon/antlr/ANTLRNoCaseStringStream.java | 38 + .../carbondata/horizon/antlr/FilterVisitor.java | 215 ++++ .../horizon/antlr/gen/Expression.tokens | 51 + .../antlr/gen/ExpressionBaseVisitor.java | 168 +++ .../horizon/antlr/gen/ExpressionLexer.java | 228 ++++ .../horizon/antlr/gen/ExpressionLexer.tokens | 51 + .../horizon/antlr/gen/ExpressionParser.java | 1117 ++++++++++++++++++ .../horizon/antlr/gen/ExpressionVisitor.java | 162 +++ .../horizon/rest/controller/Horizon.java | 36 + .../rest/controller/HorizonController.java | 92 ++ .../rest/model/descriptor/LoadDescriptor.java | 81 ++ .../rest/model/descriptor/SelectDescriptor.java | 88 ++ .../rest/model/descriptor/TableDescriptor.java | 90 ++ .../rest/model/validate/RequestValidator.java | 70 ++ .../rest/model/view/CreateTableRequest.java | 174 +++ .../horizon/rest/model/view/FieldRequest.java | 114 ++ .../horizon/rest/model/view/LoadRequest.java | 132 +++ .../horizon/rest/model/view/SelectRequest.java | 130 ++ .../horizon/rest/model/view/SelectResponse.java | 49 + .../horizon/rest/service/HorizonService.java | 162 +++ .../carbondata/horizon/FilterParseTest.java | 161 +++ .../apache/carbondata/horizon/HorizonTest.java | 153 +++ store/horizon/src/test/resources/data1.csv | 11 + .../carbondata/sdk/file/CSVCarbonWriter.java | 4 +- .../sdk/file/CarbonWriterBuilder.java | 2 +- .../org/apache/carbondata/sdk/file/Schema.java | 67 ++ 74 files changed, 5717 insertions(+), 865 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java index e1d2162..cd27cf1 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java @@ -59,12 +59,11 @@ public class CarbonTableBuilder { return this; } - public CarbonTable build() { + public TableInfo buildTableInfo() { Objects.requireNonNull(tablePath, "tablePath should not be null"); Objects.requireNonNull(tableSchema, "tableSchema should not be null"); Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null"); - TableInfo tableInfo = new TableInfo(); tableInfo.setDatabaseName(databaseName); tableInfo.setTableUniqueName(databaseName + "_" + tableName); @@ -73,6 +72,10 @@ public class CarbonTableBuilder { tableInfo.setTransactionalTable(isTransactionalTable); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0)); - return CarbonTable.buildFromTableInfo(tableInfo); + return tableInfo; + } + + public CarbonTable build() { + return CarbonTable.buildFromTableInfo(buildTableInfo()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 4780e60..6f575a4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -54,6 +54,8 @@ public class TableSchemaBuilder { private List<ColumnSchema> measures = new LinkedList<>(); + private Map<String, String> tableProperties; + private int blockSize; private int blockletSize; @@ -62,6 +64,14 @@ public class TableSchemaBuilder { private boolean isLocalDictionaryEnabled; private String localDictionaryThreshold; + public TableSchemaBuilder properties(Map<String, String> tableProperties) { + if (tableProperties == null) { + throw new IllegalArgumentException("blockSize should not be null"); + } + this.tableProperties = tableProperties; + return this; + } + public TableSchemaBuilder blockSize(int blockSize) { if (blockSize <= 0) { throw new IllegalArgumentException("blockSize should be greater than 0"); @@ -111,22 +121,25 @@ public class TableSchemaBuilder { allColumns.addAll(measures); schema.setListOfColumns(allColumns); - Map<String, String> property = new HashMap<>(); + if (tableProperties == null) { + tableProperties = new HashMap<>(); + } if (blockSize > 0) { - property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize)); + tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize)); } if (blockletSize > 0) { - property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize)); + tableProperties.put( + CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize)); } // Adding local dictionary, applicable only for String(dictionary exclude) if (isLocalDictionaryEnabled) { - property.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, String.valueOf(isLocalDictionaryEnabled)); String localdictionaryThreshold = localDictionaryThreshold.equalsIgnoreCase("0") ? CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT : localDictionaryThreshold; - property.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, localdictionaryThreshold); + tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, localdictionaryThreshold); for (int index = 0; index < allColumns.size(); index++) { ColumnSchema colSchema = allColumns.get(index); if (colSchema.getDataType() == DataTypes.STRING @@ -136,8 +149,8 @@ public class TableSchemaBuilder { } } } - if (property.size() != 0) { - schema.setTableProperties(property); + if (tableProperties.size() != 0) { + schema.setTableProperties(tableProperties); } return schema; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java index 32b5028..a686543 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java @@ -69,8 +69,14 @@ public class ListExpression extends Expression { public String getStatement() { StringBuffer value = new StringBuffer(); value.append("("); + boolean start = false; for (Expression expr : children) { - value.append(expr.getString()).append(";"); + if (start) { + value.append(", "); + } else { + start = true; + } + value.append(expr.getStatement()); } value.append(')'); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java index eae8019..438337c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java @@ -100,6 +100,11 @@ public class NotEqualsExpression extends BinaryConditionalExpression { @Override public String getStatement() { - return left.getStatement() + " <> " + right.getStatement(); + if (isNotNull) { + return left.getStatement() + " is not " + right.getStatement(); + } else { + return left.getStatement() + " <> " + right.getStatement(); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java index 148081a..5b7dda0 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java @@ -59,6 +59,6 @@ public class OrExpression extends BinaryLogicalExpression { @Override public String getStatement() { - return "(" + left.getString() + " or " + right.getString() + ")"; + return "(" + left.getStatement() + " or " + right.getStatement() + ")"; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java index b49b6f9..f411722 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java @@ -65,7 +65,7 @@ public class RangeExpression extends BinaryConditionalExpression { @Override public String getStatement() { - return left.getStatement() + " between " + right.getStatement(); + return left.getStatement() + " and " + right.getStatement(); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index b15ce02..72666e3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -376,6 +376,7 @@ public class QueryModel { return String.format("scan on table %s.%s, %d projection columns with filter (%s)", table.getDatabaseName(), table.getTableName(), projection.getDimensions().size() + projection.getMeasures().size(), - filterExpressionResolverTree.getFilterExpression().toString()); + filterExpressionResolverTree == null ? "" : + filterExpressionResolverTree.getFilterExpression().toString()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/dev/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml index 63b6bd5..e3f0523 100644 --- a/dev/findbugs-exclude.xml +++ b/dev/findbugs-exclude.xml @@ -21,6 +21,10 @@ </Match> <Match> + <Package name="org.apache.carbondata.horizon.antlr.gen"/> + </Match> + + <Match> <Source name="~.*\.scala" /> </Match> http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/dev/javastyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/dev/javastyle-suppressions.xml b/dev/javastyle-suppressions.xml index 9780dcb..bb4e6f1 100644 --- a/dev/javastyle-suppressions.xml +++ b/dev/javastyle-suppressions.xml @@ -32,4 +32,6 @@ files="org/apache/carbondata/processing/newflow/sort/unsafe/sort/TimSort.java"/> <suppress checks=".*" files="org/apache/carbondata/core/memory/HeapMemoryAllocator"/> + <suppress checks=".*" + files="org/apache/carbondata/horizon/antlr/gen"/> </suppressions> http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 6c796b5..2e3d7aa 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -129,6 +129,7 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { DataMapStoreManager.getInstance().clearDataMaps( queryModel.getTable().getAbsoluteTableIdentifier()); } + // close read support readSupport.close(); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index af7397b..3076cd7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -92,6 +92,19 @@ public class CarbonInputFormatUtil { Expression filterExpression, List<PartitionSpec> partitionNames, DataMapJob dataMapJob) throws IOException, InvalidConfigurationException { + return createCarbonTableInputFormat(job, carbonTable, projectionColumns, filterExpression, + partitionNames, dataMapJob, false); + } + + + public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat( + Job job, + CarbonTable carbonTable, + String[] projectionColumns, + Expression filterExpression, + List<PartitionSpec> partitionNames, + DataMapJob dataMapJob, + boolean isSearchMode) throws IOException, InvalidConfigurationException { Configuration conf = job.getConfiguration(); CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo()); CarbonInputFormat.setDatabaseName(conf, carbonTable.getTableInfo().getDatabaseName()); @@ -103,7 +116,7 @@ public class CarbonInputFormatUtil { .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable()); CarbonProjection columnProjection = new CarbonProjection(projectionColumns); return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(), - filterExpression, columnProjection, dataMapJob); + filterExpression, columnProjection, dataMapJob, isSearchMode); } private static <V> CarbonTableInputFormat<V> createInputFormat( @@ -111,7 +124,8 @@ public class CarbonInputFormatUtil { AbsoluteTableIdentifier identifier, Expression filterExpression, CarbonProjection columnProjection, - DataMapJob dataMapJob) throws InvalidConfigurationException, IOException { + DataMapJob dataMapJob, + boolean isSearchMode) throws InvalidConfigurationException, IOException { CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>(); CarbonInputFormat.setTablePath( conf, @@ -121,7 +135,7 @@ public class CarbonInputFormatUtil { CarbonInputFormat.setColumnProjection(conf, columnProjection); if (dataMapJob != null) { DataMapUtil.setDataMapJob(conf, dataMapJob); - } else { + } else if (!isSearchMode) { setDataMapJobIfConfigured(conf); } // when validate segments is disabled in thread local update it to CarbonTableInputFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala index f73a587..2a9fdcd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala @@ -91,8 +91,7 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll { sql( "create datamap preagg2 on table maintable using 'preaggregate' as select" + " a,sum(c) from maintable group by a") - sql("drop table if exists maintable") - checkExistence(sql("show tables"), false, "maintable_preagg1", "maintable", "maintable_preagg2") + checkExistence(sql("show tables").select("database", "tableName"), false, "defaultmaintable_preagg1", "defaultmaintable", "defaultmaintable_preagg2") } test("drop datamap with 'if exists' when datamap not exists") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index ebb2491..9e4a0d0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -16,8 +16,6 @@ */ package org.apache.carbondata.integration.spark.testsuite.preaggregate -import java.io.File - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation @@ -416,7 +414,7 @@ class TestPreAggregateTableSelection extends SparkQueryTest with BeforeAndAfterA | - agg1 (preaggregate) |Table Scan on maintable_agg1 | - total blocklets: 1 - | - filter: (maintable_name <> null and maintable_name = a) + | - filter: (maintable_name is not null and maintable_name = a) | - pruned by Main DataMap | - skipped blocklets: 1 |""".stripMargin)(rows(0).getString(0)) @@ -432,14 +430,14 @@ class TestPreAggregateTableSelection extends SparkQueryTest with BeforeAndAfterA """ |Table Scan on maintable | - total blocklets: 1 - | - filter: ((id <> null and id < 3) and name <> null) + | - filter: ((id is not null and id < 3) and name is not null) | - pruned by Main DataMap | - skipped blocklets: 0""".stripMargin)) assert(rows(0).getString(0).contains( """ |Table Scan on maintableavg | - total blocklets: 1 - | - filter: name <> null + | - filter: name is not null | - pruned by Main DataMap | - skipped blocklets: 0""".stripMargin)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala index d99081d..a4124a2 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -22,7 +22,7 @@ import java.net.InetAddress import scala.collection.JavaConverters._ -import org.apache.spark.{CarbonInputMetrics, SparkConf} +import org.apache.spark.{CarbonInputMetrics, SparkConf, SparkEnv} import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.SparkSession @@ -34,6 +34,10 @@ import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.spark.util.Util +import org.apache.carbondata.store.conf.StoreConf +import org.apache.carbondata.store.master.Master +import org.apache.carbondata.store.worker.Worker /** * A CarbonStore implementation that uses Spark as underlying compute engine @@ -110,7 +114,11 @@ class SparkCarbonStore extends MetaCachedCarbonStore { def startSearchMode(): Unit = { LOG.info("Starting search mode master") - master = new Master() + val conf = new StoreConf() + conf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress) + conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) + conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) + master = Master.getInstance(conf) master.startService() startAllWorkers() } @@ -154,7 +162,33 @@ class SparkCarbonStore extends MetaCachedCarbonStore { val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors) .mapPartitions { f => // start worker - Worker.init(masterIp, CarbonProperties.getSearchMasterPort) + val conf = new StoreConf() + conf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress) + conf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort) + conf.conf(StoreConf.WORKER_CORE_NUM, 2) + conf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) + conf.conf(StoreConf.MASTER_HOST, masterIp) + conf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) + + var storeLocation: String = null + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + + val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations.mkString(",") + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + conf.conf(StoreConf.STORE_TEMP_LOCATION, storeLocation) + + val worker = new Worker(conf) + worker.start() new Iterator[Int] { override def hasNext: Boolean = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index aeb87d3..b1cf347 100644 --- a/pom.xml +++ b/pom.xml @@ -585,6 +585,12 @@ </build> </profile> <profile> + <id>horizon</id> + <modules> + <module>store/horizon</module> + </modules> + </profile> + <profile> <id>include-all</id> </profile> <profile> http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 0b60fa5..1ff7752 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -51,10 +51,20 @@ public class CarbonLoadModelBuilder { private CarbonTable table; + private String inputPath; + public CarbonLoadModelBuilder(CarbonTable table) { this.table = table; } + public String getInputPath() { + return inputPath; + } + + public void setInputPath(String inputPath) { + this.inputPath = inputPath; + } + /** * build CarbonLoadModel for data loading * @param options Load options from user input @@ -65,7 +75,9 @@ public class CarbonLoadModelBuilder { throws InvalidLoadOptionException, IOException { Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options); - if (!options.containsKey("fileheader")) { + if (inputPath == null && + !options.containsKey("header") && + !options.containsKey("fileheader")) { List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName()); String[] columns = new String[csvHeader.size()]; for (int i = 0; i < columns.length; i++) { @@ -74,10 +86,12 @@ public class CarbonLoadModelBuilder { optionsFinal.put("fileheader", Strings.mkString(columns, ",")); } optionsFinal.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options, table)); + CarbonLoadModel model = new CarbonLoadModel(); model.setCarbonTransactionalTable(table.isTransactionalTable()); model.setFactTimeStamp(UUID); model.setTaskNo(taskNo); + model.setFactFilePath(inputPath); // we have provided 'fileheader', so it hadoopConf can be null build(options, optionsFinal, model, null); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 8a0c2b6..0ff71c7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -577,7 +577,7 @@ public final class CarbonLoaderUtil { * @param noOfNodesInput -1 if number of nodes has to be decided * based on block location information * @param blockAssignmentStrategy strategy used to assign blocks - * @param loadMinSize the property load_min_size_inmb specified by the user + * @param expectedMinSizePerNode expected minimum size per node * @return a map that maps node to blocks */ public static Map<String, List<Distributable>> nodeBlockMapping( http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/store/conf/log4j.properties b/store/conf/log4j.properties new file mode 100755 index 0000000..101ef37 --- /dev/null +++ b/store/conf/log4j.properties @@ -0,0 +1,10 @@ +log4j.rootLogger=INFO, C1, F1 +log4j.appender.C1=org.apache.log4j.ConsoleAppender +log4j.appender.C1.layout=org.apache.log4j.PatternLayout +log4j.appender.C1.layout.ConversionPattern=%d %p %m%n + +log4j.appender.F1=org.apache.log4j.DailyRollingFileAppender +log4j.appender.F1.file=${log.path} +log4j.appender.F1.append=true +log4j.appender.F1.layout=org.apache.log4j.PatternLayout +log4j.appender.F1.layout.ConversionPattern=%d %p %m%n http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/conf/store.conf ---------------------------------------------------------------------- diff --git a/store/conf/store.conf b/store/conf/store.conf new file mode 100644 index 0000000..7f18076 --- /dev/null +++ b/store/conf/store.conf @@ -0,0 +1,10 @@ +# worker +carbon.worker.host=127.0.0.1 +carbon.worker.port=10021 +carbon.worker.core.num=2 +carbon.store.temp.location=/tmp/carbon.store.temp + +# worker and master +carbon.master.host=127.0.0.1 +carbon.master.port=10020 +carbon.store.location=/tmp/carbon.store \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/pom.xml ---------------------------------------------------------------------- diff --git a/store/core/pom.xml b/store/core/pom.xml index 0bee84f..6b2703e 100644 --- a/store/core/pom.xml +++ b/store/core/pom.xml @@ -16,19 +16,14 @@ <properties> <dev.path>${basedir}/../../dev</dev.path> + <spring.version>2.0.2.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-store-sdk</artifactId> <version>${project.version}</version> - <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> @@ -43,7 +38,6 @@ </dependencies> <build> - <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java new file mode 100644 index 0000000..da2a697 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.conf; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class StoreConf implements Serializable, Writable { + + public static final String SELECT_PROJECTION = "carbon.select.projection"; + public static final String SELECT_FILTER = "carbon.select.filter"; + public static final String SELECT_LIMIT = "carbon.select.limit"; + + public static final String SELECT_ID = "carbon.select.id"; + + public static final String WORKER_HOST = "carbon.worker.host"; + public static final String WORKER_PORT = "carbon.worker.port"; + public static final String WORKER_CORE_NUM = "carbon.worker.core.num"; + public static final String MASTER_HOST = "carbon.master.host"; + public static final String MASTER_PORT = "carbon.master.port"; + + public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location"; + public static final String STORE_LOCATION = "carbon.store.location"; + + private Map<String, String> conf = new HashMap<>(); + + public StoreConf() { + } + + public StoreConf(String filePath) { + load(filePath); + } + + public StoreConf conf(String key, String value) { + conf.put(key, value); + return this; + } + + public StoreConf conf(String key, int value) { + conf.put(key, "" + value); + return this; + } + + public void load(String filePath) { + StoreUtil.loadProperties(filePath, this); + } + + public void conf(StoreConf conf) { + this.conf.putAll(conf.conf); + } + + public Object conf(String key) { + return conf.get(key); + } + + public String[] projection() { + return stringArrayValue(SELECT_PROJECTION); + } + + public String filter() { + return stringValue(SELECT_FILTER); + } + + public int limit() { + return intValue(SELECT_LIMIT); + } + + public String masterHost() { + return stringValue(MASTER_HOST); + } + + public int masterPort() { + return intValue(MASTER_PORT); + } + + public String workerHost() { + return stringValue(WORKER_HOST); + } + + public int workerPort() { + return intValue(WORKER_PORT); + } + + public int workerCoreNum() { + return intValue(WORKER_CORE_NUM); + } + + public String storeLocation() { + return stringValue(STORE_LOCATION); + } + + public String[] storeTempLocation() { + return stringArrayValue(STORE_TEMP_LOCATION); + } + + public String selectId() { + return stringValue(SELECT_ID); + } + + public Configuration newHadoopConf() { + Configuration hadoopConf = FileFactory.getConfiguration(); + for (Map.Entry<String, String> entry : conf.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key != null && value != null && key.startsWith("carbon.hadoop.")) { + hadoopConf.set(key.substring("carbon.hadoop.".length()), value); + } + } + return hadoopConf; + } + + private String stringValue(String key) { + Object obj = conf.get(key); + if (obj == null) { + return null; + } + return obj.toString(); + } + + private int intValue(String key) { + String value = conf.get(key); + if (value == null) { + return -1; + } + return Integer.parseInt(value); + } + + private String[] stringArrayValue(String key) { + String value = conf.get(key); + if (value == null) { + return null; + } + return value.split(",", -1); + } + + @Override public void write(DataOutput out) throws IOException { + Set<Map.Entry<String, String>> entries = conf.entrySet(); + WritableUtils.writeVInt(out, conf.size()); + for (Map.Entry<String, String> entry : entries) { + WritableUtils.writeString(out, entry.getKey()); + WritableUtils.writeString(out, entry.getValue()); + } + } + + @Override public void readFields(DataInput in) throws IOException { + if (conf == null) { + conf = new HashMap<>(); + } + + int size = WritableUtils.readVInt(in); + String key, value; + for (int i = 0; i < size; i++) { + key = WritableUtils.readString(in); + value = WritableUtils.readString(in); + conf.put(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java new file mode 100644 index 0000000..c7a4d6b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.exception; + +public class ExecutionTimeoutException extends RuntimeException { + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java new file mode 100644 index 0000000..c55fa7c --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.exception; + +public class StoreException extends Exception { + + public StoreException() { + super(); + } + + public StoreException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java new file mode 100644 index 0000000..b366a67 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.exception; + +public class WorkerTooBusyException extends RuntimeException { + + public WorkerTooBusyException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/master/Master.java b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java new file mode 100644 index 0000000..0a724d9 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/master/Master.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.master; + +import java.io.File; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.net.BindException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.locks.CarbonLockUtil; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.ThriftWriter; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; +import org.apache.carbondata.store.conf.StoreConf; +import org.apache.carbondata.store.exception.ExecutionTimeoutException; +import org.apache.carbondata.store.exception.StoreException; +import org.apache.carbondata.store.rpc.RegistryService; +import org.apache.carbondata.store.rpc.ServiceFactory; +import org.apache.carbondata.store.rpc.StoreService; +import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl; +import org.apache.carbondata.store.rpc.impl.Status; +import org.apache.carbondata.store.rpc.model.BaseResponse; +import org.apache.carbondata.store.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.scheduler.Schedulable; +import org.apache.carbondata.store.scheduler.Scheduler; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; + +/** + * Master of CarbonSearch. + * It provides a Registry service for worker to register. + * And it provides search API to fire RPC call to workers. + */ + +public class Master { + + private static Master instance = null; + + private static LogService LOGGER = LogServiceFactory.getLogService(Master.class.getName()); + + private Map<String, SoftReference<CarbonTable>> cacheTables; + + // worker host address map to EndpointRef + private StoreConf conf; + private Configuration hadoopConf; + private Random random = new Random(); + private RPC.Server registryServer = null; + private Scheduler scheduler = new Scheduler(); + + private Master(StoreConf conf) { + cacheTables = new HashMap<>(); + this.conf = conf; + this.hadoopConf = this.conf.newHadoopConf(); + } + + /** + * start service and listen on port passed in constructor + */ + public void startService() throws IOException { + if (registryServer == null) { + + BindException exception; + // we will try to create service at worse case 100 times + int numTry = 100; + String host = conf.masterHost(); + int port = conf.masterPort(); + LOGGER.info("building registry-service on " + host + ":" + port); + + RegistryService registryService = new RegistryServiceImpl(this); + do { + try { + registryServer = new RPC.Builder(hadoopConf).setBindAddress(host).setPort(port) + .setProtocol(RegistryService.class).setInstance(registryService).build(); + + registryServer.start(); + numTry = 0; + exception = null; + } catch (BindException e) { + // port is occupied, increase the port number and try again + exception = e; + LOGGER.error(e, "start registry-service failed"); + port = port + 1; + numTry = numTry - 1; + } + } while (numTry > 0); + if (exception != null) { + // we have tried many times, but still failed to find an available port + throw exception; + } + LOGGER.info("registry-service started"); + } else { + LOGGER.info("Search mode master has already started"); + } + } + + public void stopService() throws InterruptedException { + if (registryServer != null) { + registryServer.stop(); + registryServer.join(); + registryServer = null; + } + } + + public void stopAllWorkers() throws IOException { + for (Schedulable worker : getWorkers()) { + try { + worker.service.shutdown(new ShutdownRequest("user")); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + scheduler.removeWorker(worker.getAddress()); + } + } + + /** + * A new searcher is trying to register, add it to the map and connect to this searcher + */ + public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) throws IOException { + LOGGER.info( + "Receive Register request from worker " + request.getHostAddress() + ":" + request.getPort() + + " with " + request.getCores() + " cores"); + String workerId = UUID.randomUUID().toString(); + String workerAddress = request.getHostAddress(); + int workerPort = request.getPort(); + LOGGER.info( + "connecting to worker " + request.getHostAddress() + ":" + request.getPort() + ", workerId " + + workerId); + + StoreService searchService = ServiceFactory.createStoreService(workerAddress, workerPort); + scheduler.addWorker( + new Schedulable(workerId, workerAddress, workerPort, request.getCores(), searchService)); + LOGGER.info("worker " + request + " registered"); + return new RegisterWorkerResponse(workerId); + } + + private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> output, long globalLimit) + throws IOException { + // in case of RPC success, collect all rows in response message + if (result.getQueryId() != queryId) { + throw new IOException( + "queryId in response does not match request: " + result.getQueryId() + " != " + queryId); + } + if (result.getStatus() != Status.SUCCESS.ordinal()) { + throw new IOException("failure in worker: " + result.getMessage()); + } + int rowCount = 0; + Object[][] rows = result.getRows(); + for (Object[] row : rows) { + output.add(new CarbonRow(row)); + rowCount++; + if (rowCount >= globalLimit) { + break; + } + } + LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + rowCount); + return rowCount; + } + + private void onFailure(Throwable e) throws IOException { + throw new IOException("exception in worker: " + e.getMessage()); + } + + private void onTimeout() { + throw new ExecutionTimeoutException(); + } + + public String getTableFolder(String database, String tableName) { + return conf.storeLocation() + File.separator + database + File.separator + tableName; + } + + public CarbonTable getTable(String database, String tableName) throws StoreException { + String tablePath = getTableFolder(database, tableName); + CarbonTable carbonTable; + SoftReference<CarbonTable> reference = cacheTables.get(tablePath); + if (reference != null) { + carbonTable = reference.get(); + if (carbonTable != null) { + return carbonTable; + } + } + + try { + org.apache.carbondata.format.TableInfo tableInfo = + CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath)); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); + tableInfo1.setTablePath(tablePath); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo1); + cacheTables.put(tablePath, new SoftReference<>(carbonTable)); + return carbonTable; + } catch (IOException e) { + String message = "Failed to get table from " + tablePath; + LOGGER.error(e, message); + throw new StoreException(message); + } + } + + public boolean createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException { + AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); + boolean tableExists = FileFactory.isFileExist(identifier.getTablePath()); + if (tableExists) { + if (ifNotExists) { + return true; + } else { + throw new IOException( + "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable() + .getTableName() + ", because it already exists"); + } + } + + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + String databaseName = tableInfo.getDatabaseName(); + String tableName = tableInfo.getFactTable().getTableName(); + org.apache.carbondata.format.TableInfo thriftTableInfo = + schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName); + + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); + String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); + FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); + try { + if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { + boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType); + if (!isDirCreated) { + throw new IOException("Failed to create the metadata directory " + schemaMetadataPath); + } + } + ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); + thriftWriter.open(FileWriteOperation.OVERWRITE); + thriftWriter.write(thriftTableInfo); + thriftWriter.close(); + return true; + } catch (IOException e) { + LOGGER.error(e, "Failed to handle create table"); + throw e; + } + } + + private void openSegment(CarbonLoadModel loadModel, boolean isOverwriteTable) throws IOException { + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, isOverwriteTable); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle load data"); + throw e; + } + } + + private void closeSegment(CarbonLoadModel loadModel) throws IOException { + try { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel, ""); + } catch (IOException e) { + LOGGER.error(e, "Failed to close segment"); + throw e; + } + } + + private void commitSegment(CarbonLoadModel loadModel) throws IOException { + CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + String segmentId = loadModel.getSegmentId(); + String segmentFileName = SegmentFileStore + .writeSegmentFile(carbonTable, segmentId, String.valueOf(loadModel.getFactTimeStamp())); + + AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); + String tablePath = absoluteTableIdentifier.getTablePath(); + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); + + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + try { + if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { + LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = + SegmentStatusManager.readLoadMetadata(metadataPath); + LoadMetadataDetails loadMetadataDetails = null; + for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { + // if the segments is in the list of marked for delete then update the status. + if (segmentId.equals(detail.getLoadName())) { + loadMetadataDetails = detail; + detail.setSegmentFile(segmentFileName); + break; + } + } + if (loadMetadataDetails == null) { + throw new IOException("can not find segment: " + segmentId); + } + + CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonLoaderUtil + .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, carbonTable); + + SegmentStatusManager + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + } else { + LOGGER.error( + "Not able to acquire the lock for Table status updation for table path " + tablePath); + } + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + tablePath); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + tablePath + " during table status updation"); + } + } + } + + public boolean loadData(CarbonLoadModel loadModel, boolean isOverwrite) throws IOException { + Schedulable worker = scheduler.pickNexWorker(); + try { + if (loadModel.getFactTimeStamp() == 0) { + loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()); + } + openSegment(loadModel, isOverwrite); + LoadDataRequest request = new LoadDataRequest(loadModel); + BaseResponse response = scheduler.sendRequest(worker, request); + if (Status.SUCCESS.ordinal() == response.getStatus()) { + commitSegment(loadModel); + return true; + } else { + closeSegment(loadModel); + throw new IOException(response.getMessage()); + } + } finally { + worker.workload.decrementAndGet(); + } + } + + /** + * Execute search by firing RPC call to worker, return the result rows + * + * @param table table to search + * @param columns projection column names + * @param filter filter expression + * @param globalLimit max number of rows required in Master + * @param localLimit max number of rows required in Worker + * @return CarbonRow array + */ + public CarbonRow[] search(CarbonTable table, String[] columns, Expression filter, + long globalLimit, long localLimit) throws IOException { + Objects.requireNonNull(table); + Objects.requireNonNull(columns); + if (globalLimit < 0 || localLimit < 0) { + throw new IllegalArgumentException("limit should be positive"); + } + + int queryId = random.nextInt(); + + List<CarbonRow> output = new ArrayList<>(); + + // prune data and get a mapping of worker hostname to list of blocks, + // then add these blocks to the QueryRequest and fire the RPC call + Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, columns, filter); + Set<Map.Entry<String, List<Distributable>>> entries = nodeBlockMapping.entrySet(); + List<Future<QueryResponse>> futures = new ArrayList<>(entries.size()); + List<Schedulable> workers = new ArrayList<>(entries.size()); + for (Map.Entry<String, List<Distributable>> entry : entries) { + CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(entry.getValue(), entry.getKey()); + QueryRequest request = + new QueryRequest(queryId, split, table.getTableInfo(), columns, filter, localLimit); + + // Find an Endpoind and send the request to it + // This RPC is non-blocking so that we do not need to wait before send to next worker + Schedulable worker = scheduler.pickWorker(entry.getKey()); + workers.add(worker); + futures.add(scheduler.sendRequestAsync(worker, request)); + } + + int rowCount = 0; + int length = futures.size(); + for (int i = 0; i < length; i++) { + Future<QueryResponse> future = futures.get(i); + Schedulable worker = workers.get(i); + if (rowCount < globalLimit) { + // wait for worker + QueryResponse response = null; + try { + response = future + .get((long) (CarbonProperties.getInstance().getQueryTimeout()), TimeUnit.SECONDS); + } catch (ExecutionException | InterruptedException e) { + onFailure(e); + } catch (TimeoutException t) { + onTimeout(); + } finally { + worker.workload.decrementAndGet(); + } + LOGGER.info("[QueryId: " + queryId + "] receive search response from worker " + worker); + rowCount += onSuccess(queryId, response, output, globalLimit); + } + } + CarbonRow[] rows = new CarbonRow[output.size()]; + return output.toArray(rows); + } + + /** + * Prune data by using CarbonInputFormat.getSplit + * Return a mapping of host address to list of block + */ + private Map<String, List<Distributable>> pruneBlock(CarbonTable table, String[] columns, + Expression filter) throws IOException { + JobConf jobConf = new JobConf(new Configuration()); + Job job = new Job(jobConf); + CarbonTableInputFormat format; + try { + format = CarbonInputFormatUtil + .createCarbonTableInputFormat(job, table, columns, filter, null, null, true); + } catch (InvalidConfigurationException e) { + throw new IOException(e.getMessage()); + } + + // We will do FG pruning in reader side, so don't do it here + CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false); + List<InputSplit> splits = format.getSplits(job); + List<Distributable> blockInfos = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + blockInfos.add((Distributable) split); + } + return CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, getWorkerAddresses(), + CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null); + } + + /** + * return hostname of all workers + */ + public List<Schedulable> getWorkers() { + return scheduler.getAllWorkers(); + } + + private List<String> getWorkerAddresses() { + return scheduler.getAllWorkerAddresses(); + } + + public static synchronized Master getInstance(StoreConf conf) { + if (instance == null) { + instance = new Master(conf); + } + return instance; + } + + public static void main(String[] args) throws InterruptedException { + if (args.length != 2) { + System.err.println("Usage: Master <log4j file> <properties file>"); + return; + } + + StoreUtil.initLog4j(args[0]); + StoreConf conf = new StoreConf(args[1]); + Master master = getInstance(conf); + master.stopService(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java deleted file mode 100644 index faaa746..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.rpc.model.ShutdownResponse; - -import org.apache.hadoop.ipc.VersionedProtocol; - -@InterfaceAudience.Internal -public interface QueryService extends VersionedProtocol { - long versionID = 1L; - QueryResponse query(QueryRequest request); - ShutdownResponse shutdown(ShutdownRequest request); -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java index 4d17686..08a0e97 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java @@ -17,6 +17,8 @@ package org.apache.carbondata.store.rpc; +import java.io.IOException; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; @@ -26,5 +28,5 @@ import org.apache.hadoop.ipc.VersionedProtocol; @InterfaceAudience.Internal public interface RegistryService extends VersionedProtocol { long versionID = 1L; - RegisterWorkerResponse registerWorker(RegisterWorkerRequest request); + RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java index a50ab8b..d9d0f3e 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java @@ -29,10 +29,10 @@ import org.apache.hadoop.ipc.RPC; @InterfaceAudience.Internal public class ServiceFactory { - public static QueryService createSearchService(String host, int port) throws IOException { + public static StoreService createStoreService(String host, int port) throws IOException { InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); return RPC.getProxy( - QueryService.class, QueryService.versionID, address, new Configuration()); + StoreService.class, StoreService.versionID, address, new Configuration()); } public static RegistryService createRegistryService(String host, int port) throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java new file mode 100644 index 0000000..48dec79 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/StoreService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.rpc.model.BaseResponse; +import org.apache.carbondata.store.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.rpc.model.ShutdownResponse; + +import org.apache.hadoop.ipc.VersionedProtocol; + +@InterfaceAudience.Internal +public interface StoreService extends VersionedProtocol { + + long versionID = 1L; + + BaseResponse loadData(LoadDataRequest request); + + QueryResponse query(QueryRequest request); + + ShutdownResponse shutdown(ShutdownRequest request); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java index 2c768d1..45e2dce 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.datamap.DataMapChooser; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.Segment; @@ -41,6 +43,7 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; @@ -158,4 +161,23 @@ public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> { queryModel.setTableBlockInfos(blockToRead); return queryModel; } + + @Override public void close() throws IOException { + logStatistics(rowCount, queryModel.getStatisticsRecorder()); + // clear dictionary cache + Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); + if (null != columnToDictionaryMapping) { + for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { + CarbonUtil.clearDictionaryCache(entry.getValue()); + } + } + + // close read support + readSupport.close(); + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java deleted file mode 100644 index b191331..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.rpc.impl; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.rpc.QueryService; -import org.apache.carbondata.store.rpc.model.QueryRequest; -import org.apache.carbondata.store.rpc.model.QueryResponse; -import org.apache.carbondata.store.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.rpc.model.ShutdownResponse; - -import org.apache.hadoop.ipc.ProtocolSignature; - -@InterfaceAudience.Internal -public class QueryServiceImpl implements QueryService { - - @Override - public QueryResponse query(QueryRequest request) { - RequestHandler handler = new RequestHandler(); - return handler.handleSearch(request); - } - - @Override - public ShutdownResponse shutdown(ShutdownRequest request) { - RequestHandler handler = new RequestHandler(); - return handler.handleShutdown(request); - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return versionID; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java index 12f48ba..03f9b2c 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java @@ -20,7 +20,7 @@ package org.apache.carbondata.store.rpc.impl; import java.io.IOException; import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.Master; +import org.apache.carbondata.store.master.Master; import org.apache.carbondata.store.rpc.RegistryService; import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; @@ -37,7 +37,7 @@ public class RegistryServiceImpl implements RegistryService { } @Override - public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) { + public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException { return master.addWorker(request); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java index 29ee546..3b98019 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java @@ -18,10 +18,12 @@ package org.apache.carbondata.store.rpc.impl; import java.io.IOException; +import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -37,10 +39,29 @@ import org.apache.carbondata.core.util.CarbonTaskInfo; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.store.conf.StoreConf; +import org.apache.carbondata.store.rpc.model.BaseResponse; +import org.apache.carbondata.store.rpc.model.LoadDataRequest; import org.apache.carbondata.store.rpc.model.QueryRequest; import org.apache.carbondata.store.rpc.model.QueryResponse; import org.apache.carbondata.store.rpc.model.ShutdownRequest; import org.apache.carbondata.store.rpc.model.ShutdownResponse; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; /** * It handles request from master. @@ -48,27 +69,35 @@ import org.apache.carbondata.store.rpc.model.ShutdownResponse; @InterfaceAudience.Internal class RequestHandler { - private static final LogService LOG = + private StoreConf conf; + private Configuration hadoopConf; + + public RequestHandler(StoreConf conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + } + + private static final LogService LOGGER = LogServiceFactory.getLogService(RequestHandler.class.getName()); QueryResponse handleSearch(QueryRequest request) { try { - LOG.info(String.format("[QueryId:%d] receive search request", request.getRequestId())); + LOGGER.info(String.format("[QueryId:%d] receive search request", request.getRequestId())); List<CarbonRow> rows = handleRequest(request); - LOG.info(String.format("[QueryId:%d] sending success response", request.getRequestId())); + LOGGER.info(String.format("[QueryId:%d] sending success response", request.getRequestId())); return createSuccessResponse(request, rows); } catch (IOException e) { - LOG.error(e); - LOG.info(String.format("[QueryId:%d] sending failure response", request.getRequestId())); + LOGGER.error(e); + LOGGER.info(String.format("[QueryId:%d] sending failure response", request.getRequestId())); return createFailureResponse(request, e); } } ShutdownResponse handleShutdown(ShutdownRequest request) { - LOG.info("Shutting down worker..."); + LOGGER.info("Shutting down worker..."); SearchModeDetailQueryExecutor.shutdownThreadPool(); SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); - LOG.info("Worker shut down"); + LOGGER.info("Worker shut down"); return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); } @@ -85,13 +114,13 @@ class RequestHandler { CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); QueryModel queryModel = createQueryModel(table, request); - LOG.info(String.format("[QueryId:%d] %s, number of block: %d", - request.getRequestId(), queryModel.toString(), mbSplit.getAllSplits().size())); + LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", request.getRequestId(), + queryModel.toString(), mbSplit.getAllSplits().size())); // read all rows by the reader List<CarbonRow> rows = new LinkedList<>(); - try (CarbonRecordReader<CarbonRow> reader = - new IndexedRecordReader(request.getRequestId(), table, queryModel)) { + try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(request.getRequestId(), + table, queryModel)) { reader.initialize(mbSplit, null); // loop to read required number of rows. @@ -104,22 +133,18 @@ class RequestHandler { } catch (InterruptedException e) { throw new IOException(e); } - LOG.info(String.format("[QueryId:%d] scan completed, return %d rows", + LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows", request.getRequestId(), rows.size())); return rows; } - - private QueryModel createQueryModel(CarbonTable table, QueryRequest request) { String[] projectColumns = request.getProjectColumns(); Expression filter = null; if (request.getFilterExpression() != null) { filter = request.getFilterExpression(); } - return new QueryModelBuilder(table) - .projectColumns(projectColumns) - .filterExpression(filter) + return new QueryModelBuilder(table).projectColumns(projectColumns).filterExpression(filter) .build(); } @@ -144,4 +169,50 @@ class RequestHandler { return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), "", output); } + public BaseResponse handleLoadData(LoadDataRequest request) { + DataLoadExecutor executor = null; + try { + CarbonLoadModel model = request.getModel(); + + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + CarbonInputFormatUtil.createJobTrackerID(new Date()); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); + Configuration configuration = new Configuration(hadoopConf); + StoreUtil.configureCSVInputFormat(configuration, model); + configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); + // Set up the attempt context required to use in the output committer. + TaskAttemptContext hadoopAttemptContext = + new TaskAttemptContextImpl(configuration, taskAttemptId); + + CSVInputFormat format = new CSVInputFormat(); + List<InputSplit> splits = format.getSplits(hadoopAttemptContext); + + CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; + for (int index = 0; index < splits.size(); index++) { + readerIterators[index] = new CSVRecordReaderIterator( + format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), + hadoopAttemptContext); + } + + executor = new DataLoadExecutor(); + executor.execute(model, conf.storeTempLocation(), readerIterators); + + return new BaseResponse(Status.SUCCESS.ordinal(), ""); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle load data"); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (InterruptedException e) { + LOGGER.error(e, "Interrupted handle load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } catch (Exception e) { + LOGGER.error(e, "Failed to execute load data "); + return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage()); + } finally { + if (executor != null) { + executor.close(); + StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); + } + } + } }