http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
--
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 8d394db..e69de29 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, List, Locale}
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl,
TaskAttemptContextImpl}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionedFile}
-import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil,
DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
- val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * get data loading options and initialise default value
- */
- def getDataLoadingOptions(
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String]): mutable.Map[String, String] = {
-val optionsFinal = scala.collection.mutable.Map[String, String]()
-optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
-optionsFinal.put("escapechar",
- CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
-optionsFinal.put(
- "serialization_null_format",
-
[CARBONDATA-1997] Add CarbonWriter SDK API
Added a new module called store-sdk, and added a CarbonWriter API, it can be
used to write Carbondata files to a specified folder, without Spark and Hadoop
dependency. User can use this API in any environment.
This closes #1967
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fccdabf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fccdabf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fccdabf
Branch: refs/heads/carbonstore
Commit: 5fccdabfc1cc4656d75e51867dcfcb250c505c91
Parents: fc31be7
Author: Jacky Li
Authored: Sat Feb 10 19:44:23 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:13 2018 +0800
--
.../org/apache/carbondata/common/Strings.java | 40
.../apache/carbondata/common/StringsSuite.java | 53 +
.../core/metadata/schema/table/CarbonTable.java | 7 +
.../schema/table/CarbonTableBuilder.java| 72 +++
.../core/metadata/schema/table/TableSchema.java | 7 +
.../schema/table/TableSchemaBuilder.java| 107 ++
.../schema/table/CarbonTableBuilderSuite.java | 86
.../metadata/schema/table/CarbonTableTest.java | 12 +-
.../schema/table/TableSchemaBuilderSuite.java | 56 ++
.../carbondata/spark/util/DataLoadingUtil.scala | 45 +
pom.xml | 7 +
store/sdk/pom.xml | 130 +
.../carbondata/sdk/file/CSVCarbonWriter.java| 89 +
.../carbondata/sdk/file/CarbonWriter.java | 51 +
.../sdk/file/CarbonWriterBuilder.java | 194 +++
.../org/apache/carbondata/sdk/file/Field.java | 74 +++
.../org/apache/carbondata/sdk/file/Schema.java | 74 +++
.../sdk/file/CSVCarbonWriterSuite.java | 127
18 files changed, 1225 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fccdabf/common/src/main/java/org/apache/carbondata/common/Strings.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java
b/common/src/main/java/org/apache/carbondata/common/Strings.java
new file mode 100644
index 000..23288dd
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common;
+
+import java.util.Objects;
+
+public class Strings {
+
+ /**
+ * Provide same function as mkString in Scala.
+ * This is added to avoid JDK 8 dependency.
+ */
+ public static String mkString(String[] strings, String delimeter) {
+Objects.requireNonNull(strings);
+Objects.requireNonNull(delimeter);
+StringBuilder builder = new StringBuilder();
+for (int i = 0; i < strings.length; i++) {
+ builder.append(strings[i]);
+ if (i != strings.length - 1) {
+builder.append(delimeter);
+ }
+}
+return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fccdabf/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
--
diff --git
a/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
new file mode 100644
index 000..65da32b
--- /dev/null
+++ b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
[CARBONDATA-1968] Add external table support
This PR adds support for creating external table with existing carbondata
files, using Hive syntax.
CREATE EXTERNAL TABLE tableName STORED BY 'carbondata' LOCATION 'path'
This closes #1749
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c75ab73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c75ab73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c75ab73
Branch: refs/heads/carbonstore
Commit: 0c75ab7359ad89a16f749e84bd42416523d5255a
Parents: 5663e91
Author: Jacky Li
Authored: Tue Jan 2 23:46:14 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:30:31 2018 +0800
--
.../core/metadata/schema/table/CarbonTable.java | 9 ++
.../createTable/TestCreateExternalTable.scala | 91
.../TestDataWithDicExcludeAndInclude.scala | 10 ---
.../command/table/CarbonDropTableCommand.scala | 5 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 64 +-
5 files changed, 147 insertions(+), 32 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c75ab73/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 09ff440..6036569 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -785,6 +785,15 @@ public class CarbonTable implements Serializable {
&& !tableInfo.getParentRelationIdentifiers().isEmpty();
}
+ /**
+ * Return true if this is an external table (table with property
"_external"="true", this is
+ * an internal table property set during table creation)
+ */
+ public boolean isExternalTable() {
+String external =
tableInfo.getFactTable().getTableProperties().get("_external");
+return external != null && external.equalsIgnoreCase("true");
+ }
+
public long size() throws IOException {
Map dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
Long dataSize =
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c75ab73/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
--
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
new file mode 100644
index 000..67370eb
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll {
+
+ var originDataPath: String = _
+
+ override def beforeAll(): Unit = {
+sql("DROP TABLE IF EXISTS origin")
+// create carbon table and insert data
+sql("CREATE TABLE origin(key INT, value STRING) STORED BY 'carbondata'")
+sql("INSERT INTO origin select 100,'spark'")
+sql("INSERT INTO origin select 200,'hive'")
+originDataPath = s"$storeLocation/origin"
+ }
+
+ override def afterAll(): Unit = {
+sql("DROP TABLE IF EXISTS origin")
+ }
+
+ test("create
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index de97e82..540607d 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
@@ -58,7 +58,7 @@ import org.apache.carbondata.core.scan.filter.intf.RowImpl;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
import org.apache.carbondata.core.util.BitSetGroup;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -68,20 +68,20 @@ public class RowLevelFilterExecuterImpl implements
FilterExecuter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(RowLevelFilterExecuterImpl.class.getName());
- protected List dimColEvaluatorInfoList;
- protected List msrColEvalutorInfoList;
+ List dimColEvaluatorInfoList;
+ List msrColEvalutorInfoList;
protected Expression exp;
protected AbsoluteTableIdentifier tableIdentifier;
protected SegmentProperties segmentProperties;
/**
* it has index at which given dimension is stored in file
*/
- protected int[] dimensionBlocksIndex;
+ int[] dimensionChunkIndex;
/**
* it has index at which given measure is stored in file
*/
- protected int[] measureBlocksIndex;
+ int[] measureChunkIndex;
private Map complexDimensionInfoMap;
@@ -89,18 +89,18 @@ public class RowLevelFilterExecuterImpl implements
FilterExecuter {
* flag to check whether the filter dimension is present in current block
list of dimensions.
* Applicable for restructure scenarios
*/
- protected boolean[] isDimensionPresentInCurrentBlock;
+ boolean[] isDimensionPresentInCurrentBlock;
/**
* flag to check whether the filter measure is present in current block list
of measures.
* Applicable for restructure scenarios
*/
- protected boolean[] isMeasurePresentInCurrentBlock;
+ boolean[] isMeasurePresentInCurrentBlock;
/**
* is dimension column data is natural sorted
*/
- protected boolean isNaturalSorted;
+ boolean isNaturalSorted;
/**
* date direct dictionary generator
@@ -124,10 +124,10 @@ public class RowLevelFilterExecuterImpl implements
FilterExecuter {
}
if (this.dimColEvaluatorInfoList.size() > 0) {
this.isDimensionPresentInCurrentBlock = new
boolean[dimColEvaluatorInfoList.size()];
- this.dimensionBlocksIndex = new int[dimColEvaluatorInfoList.size()];
+ this.dimensionChunkIndex = new int[dimColEvaluatorInfoList.size()];
} else {
this.isDimensionPresentInCurrentBlock = new boolean[]{false};
- this.dimensionBlocksIndex = new int[]{0};
+ this.dimensionChunkIndex = new int[]{0};
}
if (null == msrColEvalutorInfoList) {
this.msrColEvalutorInfoList = new
ArrayList(20);
@@ -136,10 +136,10 @@ public class RowLevelFilterExecuterImpl implements
FilterExecuter {
}
if (this.msrColEvalutorInfoList.size() > 0) {
this.isMeasurePresentInCurrentBlock = new
boolean[msrColEvalutorInfoList.size()];
- this.measureBlocksIndex = new int[msrColEvalutorInfoList.size()];
+ this.measureChunkIndex = new int[msrColEvalutorInfoList.size()];
} else {
this.isMeasurePresentInCurrentBlock = new boolean[]{false};
- this.measureBlocksIndex = new int[] {0};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
--
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 000..fbb93b6
--- /dev/null
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+ private CarbonTable table;
+
+ public CarbonLoadModelBuilder(CarbonTable table) {
+this.table = table;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @return a new CarbonLoadModel instance
+ */
+ public CarbonLoadModel build(
+ Map options) throws InvalidLoadOptionException,
IOException {
+Map optionsFinal =
LoadOption.fillOptionWithDefaultValue(options);
+optionsFinal.put("sort_scope", "no_sort");
+if (!options.containsKey("fileheader")) {
+ List csvHeader =
table.getCreateOrderColumn(table.getTableName());
+ String[] columns = new String[csvHeader.size()];
+ for (int i = 0; i < columns.length; i++) {
+columns[i] = csvHeader.get(i).getColName();
+ }
+ optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+}
+CarbonLoadModel model = new CarbonLoadModel();
+
+// we have provided 'fileheader', so it hadoopConf can be null
+build(options, optionsFinal, model, null);
+
+// set default values
+
model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options,
"onepass", "false")));
+model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost",
null));
+try {
+
model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options,
"dictport", "-1")));
+} catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+}
+return model;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for
optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there
'fileheader' is not set in
+ * user provided load options
+ */
+ public void build(
+ Map options,
+ Map optionsFinal,
+ CarbonLoadModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
--
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index f605b22..000
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.DataField;
-import
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
-import
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
-import
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be
merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data
separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends
AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(
-
UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private BucketingInfo bucketingInfo;
-
- public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[]
inputDataFields,
- BucketingInfo bucketingInfo) {
-this.bucketingInfo = bucketingInfo;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
-this.sortParameters = sortParameters;
- }
-
- @Override public Iterator[] sort(Iterator[]
iterators)
- throws CarbonDataLoadingException {
-UnsafeSortDataRows[] sortDataRows = new
UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
-UnsafeIntermediateMerger[] intermediateFileMergers =
-new UnsafeIntermediateMerger[sortDataRows.length];
-int inMemoryChunkSizeInMB =
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-inMemoryChunkSizeInMB = inMemoryChunkSizeInMB /
bucketingInfo.getNumberOfBuckets();
-if (inMemoryChunkSizeInMB < 5) {
- inMemoryChunkSizeInMB = 5;
-}
-try {
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-SortParameters parameters = sortParameters.getCopy();
-parameters.setPartitionID(i + "");
-setTempLocation(parameters);
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 69f5ceb..22d1df1 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -43,10 +43,9 @@ import
org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -64,8 +63,8 @@ import
org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -121,7 +120,6 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
queryProperties.queryStatisticsRecorder =
CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
-QueryUtil.resolveQueryModel(queryModel);
QueryStatistic queryStatistic = new QueryStatistic();
// sort the block info
// so block will be loaded in sorted order this will be required for
@@ -168,12 +166,12 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR,
System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
// calculating the total number of aggeragted columns
-int measureCount = queryModel.getQueryMeasures().size();
+int measureCount = queryModel.getProjectionMeasures().size();
int currentIndex = 0;
DataType[] dataTypes = new DataType[measureCount];
-for (QueryMeasure carbonMeasure : queryModel.getQueryMeasures()) {
+for (ProjectionMeasure carbonMeasure : queryModel.getProjectionMeasures())
{
// adding the data type and aggregation type of all the measure this
// can be used
// to select the aggregator
@@ -198,9 +196,11 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
queryStatistic = new QueryStatistic();
// dictionary column unique column id to dictionary mapping
// which will be used to get column actual data
-queryProperties.columnToDictionayMapping = QueryUtil
-.getDimensionDictionaryDetail(queryModel.getQueryDimension(),
-queryProperties.complexFilterDimension,
queryModel.getAbsoluteTableIdentifier(),
+queryProperties.columnToDictionayMapping =
+QueryUtil.getDimensionDictionaryDetail(
+queryModel.getProjectionDimensions(),
+queryProperties.complexFilterDimension,
+queryModel.getAbsoluteTableIdentifier(),
tableProvider);
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_DICTIONARY,
System.currentTimeMillis());
@@ -263,8 +263,8 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
// and query will be executed based on that infos
for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
- BlockletDataRefNodeWrapper dataRefNode =
- (BlockletDataRefNodeWrapper) abstractIndex.getDataRefNode();
+ BlockletDataRefNode dataRefNode =
+ (BlockletDataRefNode) abstractIndex.getDataRefNode();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java
index 8c8d08f..a689d8e 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeFixedLengthDimensionDataChunkStore.java
@@ -124,22 +124,22 @@ public class UnsafeFixedLengthDimensionDataChunkStore
/**
* to compare the two byte array
*
- * @param indexindex of first byte array
+ * @param rowIdindex of first byte array
* @param compareValue value of to be compared
* @return compare result
*/
- @Override public int compareTo(int index, byte[] compareValue) {
+ @Override public int compareTo(int rowId, byte[] compareValue) {
// based on index we need to calculate the actual position in memory block
-index = index * columnValueSize;
+rowId = rowId * columnValueSize;
int compareResult = 0;
for (int i = 0; i < compareValue.length; i++) {
compareResult = (CarbonUnsafe.getUnsafe()
- .getByte(dataPageMemoryBlock.getBaseObject(),
dataPageMemoryBlock.getBaseOffset() + index)
+ .getByte(dataPageMemoryBlock.getBaseObject(),
dataPageMemoryBlock.getBaseOffset() + rowId)
& 0xff) - (compareValue[i] & 0xff);
if (compareResult != 0) {
break;
}
- index++;
+ rowId++;
}
return compareResult;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
index 36b2bd8..e1eb378 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
@@ -189,11 +189,11 @@ public class UnsafeVariableLengthDimesionDataChunkStore
/**
* to compare the two byte array
*
- * @param index index of first byte array
+ * @param rowId index of first byte array
* @param compareValue value of to be compared
* @return compare result
*/
- @Override public int compareTo(int index, byte[] compareValue) {
+ @Override public int compareTo(int rowId, byte[] compareValue) {
// now to get the row from memory block we need to do following thing
// 1. first get the current offset
// 2. if it's not a last row- get the next row offset
@@ -201,13 +201,13 @@ public class UnsafeVariableLengthDimesionDataChunkStore
// else subtract the current row offset
// with complete data length get the offset of set of data
int currentDataOffset =
CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
-dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets +
((long)index
+dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets +
((long) rowId
* CarbonCommonConstants.INT_SIZE_IN_BYTE * 1L));
short length = 0;
// calculating the length of data
-if (index < numberOfRows - 1) {
+if (rowId < numberOfRows - 1) {
int OffsetOfNextdata =
CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
- dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets +
((index + 1)
+ dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets +
((rowId + 1)
* CarbonCommonConstants.INT_SIZE_IN_BYTE));
length = (short) (OffsetOfNextdata - (currentDataOffset
+ CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnGroupModel.java
index 74d268a..e2a4161 100644
---
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.
We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.
This closes #1808
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe8ab4c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe8ab4c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe8ab4c
Branch: refs/heads/carbonstore
Commit: 8fe8ab4c078de0ccd218f8ba41352896aebd5202
Parents: 28b5720
Author: xuchuanyin
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:13 2018 +0800
--
.../constants/CarbonLoadOptionConstants.java| 10 +
.../core/datastore/block/TableBlockInfo.java| 29 ++
.../carbondata/core/util/CarbonProperties.java | 11 +
docs/useful-tips-on-carbondata.md | 1 +
.../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +-
.../spark/sql/hive/DistributionUtil.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala| 18 +-
.../merger/NodeMultiBlockRelation.java | 40 ++
.../processing/util/CarbonLoaderUtil.java | 480 ---
.../processing/util/CarbonLoaderUtilTest.java | 125 +
10 files changed, 545 insertions(+), 175 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
+ /**
+ * enable block size based block allocation while loading data. By default,
carbondata assigns
+ * blocks to node based on block number. If this option is set to `true`,
carbondata will
+ * consider block size first and make sure that all the nodes will process
almost equal size of
+ * data. This option is especially useful when you encounter skewed data.
+ */
+ @CarbonProperty
+ public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+ = "carbon.load.skewedDataOptimization.enabled";
+ public static final String
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable,
Serializable {
private String dataMapWriterPath;
+ /**
+ * comparator to sort by block size in descending order.
+ * Since each line is not exactly the same, the size of a InputSplit may
differs,
+ * so we allow some deviation for these splits.
+ */
+ public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+ new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+ long diff =
+ ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo)
o2).getBlockLength();
+ return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+ };
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId,
String[] locations, long blockLength, ColumnarFormatVersion version,
String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.
We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.
This closes #1808
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fdd5d0f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fdd5d0f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fdd5d0f
Branch: refs/heads/carbonstore
Commit: 3fdd5d0f567e8d07cc502202ced7d490fa85e2ad
Parents: 0bb4aed
Author: xuchuanyin
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:12 2018 +0800
--
.../constants/CarbonLoadOptionConstants.java| 10 +
.../core/datastore/block/TableBlockInfo.java| 29 ++
.../carbondata/core/util/CarbonProperties.java | 11 +
docs/useful-tips-on-carbondata.md | 1 +
.../CarbonIndexFileMergeTestCase.scala | 4 -
.../StandardPartitionTableLoadingTestCase.scala | 2 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +-
.../spark/sql/hive/DistributionUtil.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala| 18 +-
.../merger/NodeMultiBlockRelation.java | 40 ++
.../processing/util/CarbonLoaderUtil.java | 494 ---
.../processing/util/CarbonLoaderUtilTest.java | 125 +
12 files changed, 552 insertions(+), 188 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
+ /**
+ * enable block size based block allocation while loading data. By default,
carbondata assigns
+ * blocks to node based on block number. If this option is set to `true`,
carbondata will
+ * consider block size first and make sure that all the nodes will process
almost equal size of
+ * data. This option is especially useful when you encounter skewed data.
+ */
+ @CarbonProperty
+ public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+ = "carbon.load.skewedDataOptimization.enabled";
+ public static final String
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable,
Serializable {
private String dataMapWriterPath;
+ /**
+ * comparator to sort by block size in descending order.
+ * Since each line is not exactly the same, the size of a InputSplit may
differs,
+ * so we allow some deviation for these splits.
+ */
+ public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+ new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+ long diff =
+ ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo)
o2).getBlockLength();
+ return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+ };
+
public TableBlockInfo(String filePath, long blockOffset, String segmentId,
String[] locations, long blockLength,
Revert "[CARBONDATA-2023][DataLoad] Add size base block allocation in data
loading"
This reverts commit 6dd8b038fc898dbf48ad30adfc870c19eb38e3d0.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1d85e916
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1d85e916
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1d85e916
Branch: refs/heads/carbonstore
Commit: 1d85e916f6a0f070960555fb18ee4cd8acbfa315
Parents: 6216294
Author: Jacky Li
Authored: Sat Feb 10 10:34:59 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:13 2018 +0800
--
.../constants/CarbonLoadOptionConstants.java| 10 -
.../core/datastore/block/TableBlockInfo.java| 29 --
.../carbondata/core/util/CarbonProperties.java | 11 -
docs/useful-tips-on-carbondata.md | 1 -
.../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +-
.../spark/sql/hive/DistributionUtil.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala| 18 +-
.../merger/NodeMultiBlockRelation.java | 40 --
.../processing/util/CarbonLoaderUtil.java | 494 +++
.../processing/util/CarbonLoaderUtilTest.java | 125 -
10 files changed, 183 insertions(+), 551 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1d85e916/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..bcfeba0 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,14 +114,4 @@ public final class CarbonLoadOptionConstants {
*/
public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
- /**
- * enable block size based block allocation while loading data. By default,
carbondata assigns
- * blocks to node based on block number. If this option is set to `true`,
carbondata will
- * consider block size first and make sure that all the nodes will process
almost equal size of
- * data. This option is especially useful when you encounter skewed data.
- */
- @CarbonProperty
- public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
- = "carbon.load.skewedDataOptimization.enabled";
- public static final String
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1d85e916/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c0cebe0..a7bfdba 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.core.datastore.block;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@@ -100,20 +98,6 @@ public class TableBlockInfo implements Distributable,
Serializable {
private String dataMapWriterPath;
- /**
- * comparator to sort by block size in descending order.
- * Since each line is not exactly the same, the size of a InputSplit may
differs,
- * so we allow some deviation for these splits.
- */
- public static final Comparator DATA_SIZE_DESC_COMPARATOR =
- new Comparator() {
-@Override public int compare(Distributable o1, Distributable o2) {
- long diff =
- ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo)
o2).getBlockLength();
- return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
-}
- };
-
public TableBlockInfo(String filePath, long blockOffset, String segmentId,
String[] locations, long blockLength, ColumnarFormatVersion version,
String[] deletedDeltaFilePath) {
@@ -450,17 +434,4 @@ public class TableBlockInfo implements Distributable,
Serializable {
public void setDataMapWriterPath(String dataMapWriterPath) {
this.dataMapWriterPath = dataMapWriterPath;
}
-
- @Override
- public String toString() {
-final StringBuilder sb = new StringBuilder("TableBlockInfo{");
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 000..fde4e55
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator> {
+
+ /**
+ * iterator which will be used to iterate over blocklets
+ */
+ private BlockletIterator blockletIterator;
+
+ /**
+ * result collector which will be used to aggregate the scanned result
+ */
+ private ScannedResultCollector scannerResultAggregator;
+
+ /**
+ * processor which will be used to process the block processing can be
+ * filter processing or non filter processing
+ */
+ private BlockletScanner blockletScanner;
+
+ /**
+ * batch size of result
+ */
+ private int batchSize;
+
+ private ExecutorService executorService;
+
+ private Future future;
+
+ private Future futureIo;
+
+ private BlockletScannedResult scannedResult;
+
+ private BlockExecutionInfo blockExecutionInfo;
+
+ private FileReader fileReader;
+
+ private AtomicBoolean nextBlock;
+
+ private AtomicBoolean nextRead;
+
+ public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader
fileReader,
+ int batchSize, QueryStatisticsModel queryStatisticsModel,
ExecutorService executorService) {
+this.blockExecutionInfo = blockExecutionInfo;
+this.fileReader = fileReader;
+blockletIterator = new
BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+blockExecutionInfo.getNumberOfBlockToScan());
+if (blockExecutionInfo.getFilterExecuterTree() != null) {
+ blockletScanner = new BlockletFilterScanner(blockExecutionInfo,
queryStatisticsModel);
+} else {
+ blockletScanner = new BlockletFullScanner(blockExecutionInfo,
queryStatisticsModel);
+}
+this.scannerResultAggregator =
+ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+this.batchSize = batchSize;
+this.executorService = executorService;
+this.nextBlock = new AtomicBoolean(false);
+this.nextRead = new AtomicBoolean(false);
+ }
+
+ @Override
+ public List next() {
+List collectedResult = null;
+if (updateScanner()) {
+ collectedResult =
this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+ while (collectedResult.size() < batchSize && updateScanner()) {
+List data = this.scannerResultAggregator
+
[CARBONDATA-2186] Add InterfaceAudience.Internal to annotate internal interface
This closes #1986
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/623a1f93
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/623a1f93
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/623a1f93
Branch: refs/heads/carbonstore
Commit: 623a1f93bf50bbbf665d98d71fe2190a4742
Parents: 503e0d9
Author: Jacky Li
Authored: Tue Feb 20 11:16:53 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:14 2018 +0800
--
.../java/org/apache/carbondata/common/Maps.java | 2 +-
.../org/apache/carbondata/common/Strings.java| 2 +-
.../common/annotations/InterfaceAudience.java| 19 ++-
.../common/annotations/InterfaceStability.java | 2 +-
.../loading/model/CarbonLoadModelBuilder.java| 2 +-
.../processing/loading/model/LoadOption.java | 2 +-
.../carbondata/sdk/file/CSVCarbonWriter.java | 4 +---
7 files changed, 20 insertions(+), 13 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/623a1f93/common/src/main/java/org/apache/carbondata/common/Maps.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Maps.java
b/common/src/main/java/org/apache/carbondata/common/Maps.java
index 14fc329..4e76192 100644
--- a/common/src/main/java/org/apache/carbondata/common/Maps.java
+++ b/common/src/main/java/org/apache/carbondata/common/Maps.java
@@ -21,7 +21,7 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
-@InterfaceAudience.Developer
+@InterfaceAudience.Internal
public class Maps {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/623a1f93/common/src/main/java/org/apache/carbondata/common/Strings.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java
b/common/src/main/java/org/apache/carbondata/common/Strings.java
index 08fdc3c..23c7f9f 100644
--- a/common/src/main/java/org/apache/carbondata/common/Strings.java
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -21,7 +21,7 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
-@InterfaceAudience.Developer
+@InterfaceAudience.Internal
public class Strings {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/623a1f93/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
--
diff --git
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
index fa9729d..8d214ff 100644
---
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
+++
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
@@ -25,10 +25,10 @@ import java.lang.annotation.RetentionPolicy;
* This annotation is ported and modified from Apache Hadoop project.
*
* Annotation to inform users of a package, class or method's intended
audience.
- * Currently the audience can be {@link User}, {@link Developer}
+ * Currently the audience can be {@link User}, {@link Developer}, {@link
Internal}
*
* Public classes that are not marked with this annotation must be
- * considered by default as {@link Developer}.
+ * considered by default as {@link Internal}.
*
* External applications must only use classes that are marked {@link User}.
*
@@ -47,12 +47,21 @@ public class InterfaceAudience {
public @interface User { }
/**
- * Intended only for developers to extend interface for CarbonData project
- * For example, new Datamap implementations.
+ * Intended for developers to develop extension for Apache CarbonData project
+ * For example, "Index DataMap" to add a new index implementation, etc
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
- public @interface Developer { }
+ public @interface Developer {
+String[] value();
+ }
+
+ /**
+ * Intended only for internal usage within Apache CarbonData project.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Internal { }
private InterfaceAudience() { } // Audience can't exist on its own
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/623a1f93/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
--
diff --git
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
deleted file mode 100644
index 6629d31..000
---
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.chunk.impl;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
-import
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-
-/**
- * This class is gives access to fixed length dimension data chunk store
- */
-public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
-
- /**
- * Constructor
- *
- * @param dataChunkdata chunk
- * @param invertedIndexinverted index
- * @param invertedIndexReverse reverse inverted index
- * @param numberOfRows number of rows
- * @param columnValueSize size of each column value
- */
- public FixedLengthDimensionDataChunk(byte[] dataChunk, int[] invertedIndex,
- int[] invertedIndexReverse, int numberOfRows, int columnValueSize) {
-long totalSize = null != invertedIndex ?
-dataChunk.length + (2 * numberOfRows *
CarbonCommonConstants.INT_SIZE_IN_BYTE) :
-dataChunk.length;
-dataChunkStore = DimensionChunkStoreFactory.INSTANCE
-.getDimensionChunkStore(columnValueSize, null != invertedIndex,
numberOfRows, totalSize,
-DimensionStoreType.FIXEDLENGTH);
-dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
- }
-
- /**
- * Below method will be used to fill the data based on offset and row id
- *
- * @param data data to filed
- * @param offset offset from which data need to be filed
- * @param indexrow id of the chunk
- * @param keyStructureInfo define the structure of the key
- * @return how many bytes was copied
- */
- @Override public int fillChunkData(byte[] data, int offset, int index,
- KeyStructureInfo keyStructureInfo) {
-dataChunkStore.fillRow(index, data, offset);
-return dataChunkStore.getColumnValueSize();
- }
-
- /**
- * Converts to column dictionary integer value
- *
- * @param rowId
- * @param columnIndex
- * @param row
- * @param restructuringInfo
- * @return
- */
- @Override public int fillConvertedChunkData(int rowId, int columnIndex,
int[] row,
- KeyStructureInfo restructuringInfo) {
-row[columnIndex] = dataChunkStore.getSurrogate(rowId);
-return columnIndex + 1;
- }
-
- /**
- * Fill the data to vector
- *
- * @param vectorInfo
- * @param column
- * @param restructuringInfo
- * @return next column index
- */
- @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo,
int column,
- KeyStructureInfo restructuringInfo) {
-ColumnVectorInfo columnVectorInfo = vectorInfo[column];
-int offset = columnVectorInfo.offset;
-int vectorOffset = columnVectorInfo.vectorOffset;
-int len = columnVectorInfo.size + offset;
-CarbonColumnVector vector = columnVectorInfo.vector;
-for (int j = offset; j < len; j++) {
- int dict = dataChunkStore.getSurrogate(j);
- if (columnVectorInfo.directDictionaryGenerator == null) {
-vector.putInt(vectorOffset++, dict);
- } else {
-Object
[CARBONDATA-1543] Supported DataMap chooser and expression for supporting
multiple datamaps in single query
This PR supports 3 features.
1.Load datamaps from the DataMapSchema which are created through DDL.
2.DataMap Chooser: It chooses the datamap out of available datamaps based on
simple logic. Like if there is filter condition on column1 then for supposing 2
datamaps(1. column1 2. column1+column2) are supporting this column then we
choose the datamap which has fewer columns that is the first datamap.
3.Expression support: Based on the filter expressions we convert them to the
possible DataMap expressions and do apply expression on it.
For example, there are 2 datamaps available on table1
Datamap1 : column1
Datamap2 : column2
Query: select * from table1 where column1 ='a' and column2 =b
For the above query, we create datamap expression as
AndDataMapExpression(Datamap1, DataMap2). So for the above query both the
datamaps are included and the output of them will be applied AND condition to
improve the performance
This closes #1510
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c7a9f15e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c7a9f15e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c7a9f15e
Branch: refs/heads/datamap-rebase1
Commit: c7a9f15e2daa0207862aa28c44c51cc7cc081bac
Parents: 8104735
Author: Ravindra Pesala
Authored: Tue Nov 21 15:49:11 2017 +0530
Committer: Jacky Li
Committed: Sun Mar 4 22:00:18 2018 +0800
--
.../exceptions/MetadataProcessException.java| 37 +++
.../carbondata/core/datamap/DataMapChooser.java | 284 +++
.../core/datamap/DataMapDistributable.java | 21 +-
.../core/datamap/DataMapStoreManager.java | 148 +++---
.../carbondata/core/datamap/TableDataMap.java | 23 +-
.../core/datamap/dev/DataMapFactory.java| 3 +-
.../datamap/dev/expr/AndDataMapExprWrapper.java | 99 +++
.../dev/expr/DataMapDistributableWrapper.java | 56
.../datamap/dev/expr/DataMapExprWrapper.java| 79 ++
.../dev/expr/DataMapExprWrapperImpl.java| 88 ++
.../datamap/dev/expr/OrDataMapExprWrapper.java | 96 +++
.../carbondata/core/datastore/TableSpec.java| 14 +-
.../carbondata/core/indexstore/Blocklet.java| 20 ++
.../core/indexstore/ExtendedBlocklet.java | 33 ++-
.../core/indexstore/FineGrainBlocklet.java | 8 +
.../blockletindex/BlockletDataMap.java | 2 -
.../blockletindex/BlockletDataMapFactory.java | 9 +-
.../conditional/StartsWithExpression.java | 72 +
.../scan/filter/FilterExpressionProcessor.java | 20 +-
.../core/scan/filter/intf/ExpressionType.java | 6 +-
.../statusmanager/SegmentStatusManager.java | 5 +-
.../datamap/examples/MinMaxDataMapFactory.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 32 ++-
.../carbondata/hadoop/api/DataMapJob.java | 2 +-
.../hadoop/api/DistributableDataMapFormat.java | 32 ++-
.../preaggregate/TestPreAggCreateCommand.scala | 6 +-
.../timeseries/TestTimeSeriesCreateTable.scala | 5 +-
...CompactionSupportGlobalSortBigFileTest.scala | 2 +-
.../testsuite/dataload/TestLoadDataFrame.scala | 24 +-
.../testsuite/datamap/CGDataMapTestCase.scala | 52 ++--
.../testsuite/datamap/DataMapWriterSuite.scala | 17 +-
.../testsuite/datamap/FGDataMapTestCase.scala | 68 +++--
.../testsuite/datamap/TestDataMapCommand.scala | 72 ++---
.../iud/InsertOverwriteConcurrentTest.scala | 0
.../carbondata/spark/rdd/SparkDataMapJob.scala | 6 +-
.../org/apache/spark/sql/CarbonSource.scala | 8 +-
.../spark/sql/SparkUnknownExpression.scala | 6 +-
.../datamap/CarbonCreateDataMapCommand.scala| 91 +++---
.../datamap/CarbonDropDataMapCommand.scala | 3 +-
.../CreatePreAggregateTableCommand.scala| 29 +-
.../preaaggregate/PreAggregateUtil.scala| 12 +-
.../strategy/CarbonLateDecodeStrategy.scala | 2 +-
.../spark/sql/optimizer/CarbonFilters.scala | 20 +-
.../datamap/DataMapWriterListener.java | 6 +-
.../loading/DataLoadProcessBuilder.java | 2 +-
.../store/CarbonFactDataHandlerModel.java | 12 +-
46 files changed, 1316 insertions(+), 319 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c7a9f15e/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
--
diff --git
a/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
b/common/src/main/java/org/apache/carbondata/common/exceptions/MetadataProcessException.java
new file mode 100644
index
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1134431d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
--
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
deleted file mode 100644
index a05a8c2..000
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{File, FilenameFilter}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.MetadataProcessException
-import
org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException,
NoSuchDataMapException}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll {
-
- val testData = s"$resourcesPath/sample.csv"
-
- override def beforeAll {
-sql("drop table if exists datamaptest")
-sql("drop table if exists datamapshowtest")
-sql("drop table if exists uniqdata")
-sql("create table datamaptest (a string, b string, c string) stored by
'carbondata'")
- }
-
- val newClass = "org.apache.spark.sql.CarbonSource"
-
- test("test datamap create: don't support using non-exist class") {
-intercept[MetadataProcessException] {
- sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-}
- }
-
- test("test datamap create with dmproperties: don't support using non-exist
class") {
-intercept[MetadataProcessException] {
- sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass'
DMPROPERTIES('key'='value')")
-}
- }
-
- test("test datamap create with existing name: don't support using non-exist
class") {
-intercept[MetadataProcessException] {
- sql(
-s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass'
DMPROPERTIES('key'='value')")
-}
- }
-
- test("test datamap create with preagg") {
-sql("drop datamap if exists datamap3 on table datamaptest")
-sql(
- "create datamap datamap3 on table datamaptest using 'preaggregate' as
select count(a) from datamaptest")
-val table = CarbonMetadata.getInstance().getCarbonTable("default",
"datamaptest")
-assert(table != null)
-val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-assert(dataMapSchemaList.size() == 1)
-assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
-
assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
- }
-
- test("check hivemetastore after drop datamap") {
-try {
- CarbonProperties.getInstance()
-.addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- "true")
- sql("drop table if exists hiveMetaStoreTable")
- sql("create table hiveMetaStoreTable (a string, b string, c string)
stored by 'carbondata'")
-
- sql(
-"create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable
using 'preaggregate' as select count(a) from hiveMetaStoreTable")
- checkExistence(sql("show datamap on table hiveMetaStoreTable"), true,
"datamap_hiveMetaStoreTable")
-
- sql("drop datamap datamap_hiveMetaStoreTable on table
hiveMetaStoreTable")
- checkExistence(sql("show datamap on table hiveMetaStoreTable"), false,
"datamap_hiveMetaStoreTable")
-
-} finally {
- sql("drop table hiveMetaStoreTable")
- CarbonProperties.getInstance()
-.addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1134431d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
deleted file mode 100644
index 34e11ac..000
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
+++ /dev/null
@@ -1,971 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.indexstore.BlockMetaInfo;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
-import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
-import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.xerial.snappy.Snappy;
-
-/**
- * Datamap implementation for blocklet.
- */
-public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap
implements Cacheable {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName());
-
- private static int KEY_INDEX = 0;
-
- private static int MIN_VALUES_INDEX = 1;
-
- private static int MAX_VALUES_INDEX = 2;
-
- private static int ROW_COUNT_INDEX = 3;
-
- private static int FILE_PATH_INDEX = 4;
-
- private
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
--
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
new file mode 100644
index 000..4c0e637
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import
org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema,
CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
+
+/**
+ * Below helper class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * It will be either success or nothing happen in case of failure:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ */
+case class PreAggregateTableHelper(
+var parentTable: CarbonTable,
+dataMapName: String,
+dataMapClassName: String,
+dataMapProperties: java.util.Map[String, String],
+queryString: String,
+timeSeriesFunction: Option[String] = None,
+ifNotExistsSet: Boolean = false) {
+
+ var loadCommand: CarbonLoadDataCommand = _
+
+ def initMeta(sparkSession: SparkSession): Seq[Row] = {
+val dmProperties = dataMapProperties.asScala
+val updatedQuery = new
CarbonSpark2SqlParser().addPreAggFunction(queryString)
+val df = sparkSession.sql(updatedQuery)
+val fieldRelationMap =
PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+ df.logicalPlan, queryString)
+val fields = fieldRelationMap.keySet.toSeq
+val tableProperties = mutable.Map[String, String]()
+dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
+if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) {
+ throw new MalformedDataMapCommandException(
+"Parent table name is different in select and create")
+}
+var neworder = Seq[String]()
+val parentOrder =
parentTable.getSortColumns(parentTable.getTableName).asScala
+parentOrder.foreach(parentcol =>
+ fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get(0).parentColumnName))
+.map(cols => neworder :+= cols.column)
+)
+tableProperties.put(CarbonCommonConstants.SORT_COLUMNS,
neworder.mkString(","))
+tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+ getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT))
+tableProperties
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
new file mode 100644
index 000..34e11ac
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexDataMap.java
@@ -0,0 +1,971 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainIndexDataMap;
+import org.apache.carbondata.core.datastore.IndexKey;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.xerial.snappy.Snappy;
+
+/**
+ * Datamap implementation for blocklet.
+ */
+public class BlockletIndexDataMap extends AbstractCoarseGrainIndexDataMap
implements Cacheable {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletIndexDataMap.class.getName());
+
+ private static int KEY_INDEX = 0;
+
+ private static int MIN_VALUES_INDEX = 1;
+
+ private static int MAX_VALUES_INDEX = 2;
+
+ private static int ROW_COUNT_INDEX = 3;
+
+ private static int FILE_PATH_INDEX = 4;
+
+ private static
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1134431d/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
deleted file mode 100644
index 229e5bf..000
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/FineGrainBlocklet.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-import org.apache.carbondata.core.util.BitSetGroup;
-
-/**
- * FineGrainBlocklet
- */
-public class FineGrainBlocklet extends Blocklet implements Serializable {
-
- private List pages;
-
- public FineGrainBlocklet(String blockId, String blockletId, List
pages) {
-super(blockId, blockletId);
-this.pages = pages;
- }
-
- // For serialization purpose
- public FineGrainBlocklet() {
-
- }
-
- public List getPages() {
-return pages;
- }
-
- public static class Page implements Writable,Serializable {
-
-private int pageId;
-
-private int[] rowId;
-
-public BitSet getBitSet() {
- BitSet bitSet =
- new
BitSet(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
- for (int row : rowId) {
-bitSet.set(row);
- }
- return bitSet;
-}
-
-@Override public void write(DataOutput out) throws IOException {
- out.writeInt(pageId);
- out.writeInt(rowId.length);
- for (int i = 0; i < rowId.length; i++) {
-out.writeInt(rowId[i]);
- }
-}
-
-@Override public void readFields(DataInput in) throws IOException {
- pageId = in.readInt();
- int length = in.readInt();
- rowId = new int[length];
- for (int i = 0; i < length; i++) {
-rowId[i] = in.readInt();
- }
-}
-
-public void setPageId(int pageId) {
- this.pageId = pageId;
-}
-
-public void setRowId(int[] rowId) {
- this.rowId = rowId;
-}
- }
-
- public BitSetGroup getBitSetGroup(int numberOfPages) {
-BitSetGroup bitSetGroup = new BitSetGroup(numberOfPages);
-for (int i = 0; i < pages.size(); i++) {
- bitSetGroup.setBitSet(pages.get(i).getBitSet(), pages.get(i).pageId);
-}
-return bitSetGroup;
- }
-
- @Override public void write(DataOutput out) throws IOException {
-super.write(out);
-int size = pages.size();
-out.writeInt(size);
-for (Page page : pages) {
- page.write(out);
-}
- }
-
- @Override public void readFields(DataInput in) throws IOException {
-super.readFields(in);
-int size = in.readInt();
-pages = new ArrayList<>(size);
-for (int i = 0; i < size; i++) {
- Page page = new Page();
- page.readFields(in);
- pages.add(page);
-}
- }
-
- @Override public boolean equals(Object o) {
-return super.equals(o);
- }
-
- @Override public int hashCode() {
-return super.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1134431d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
new file mode 100644
index 000..3ca9c5a
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -0,0 +1,971 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
deleted file mode 100644
index 90178b1..000
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ /dev/null
@@ -1,971 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.indexstore.blockletindex;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import
org.apache.carbondata.core.datamap.dev.cgdatamap.AbstractCoarseGrainDataMap;
-import org.apache.carbondata.core.datastore.IndexKey;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.indexstore.BlockMetaInfo;
-import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
-import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
-import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.xerial.snappy.Snappy;
-
-/**
- * Datamap implementation for blocklet.
- */
-public class BlockletDataMap extends AbstractCoarseGrainDataMap implements
Cacheable {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BlockletDataMap.class.getName());
-
- private static int KEY_INDEX = 0;
-
- private static int MIN_VALUES_INDEX = 1;
-
- private static int MAX_VALUES_INDEX = 2;
-
- private static int ROW_COUNT_INDEX = 3;
-
- private static int FILE_PATH_INDEX = 4;
-
- private static int PAGE_COUNT_INDEX = 5;
-
-
[CARBONDATA-2206] Fixed lucene datamap evaluation issue in executor
In case of MatchExpression it should return same bitset from
RowLevelFilterExecuterImpl
This closes #2010
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8049185c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8049185c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8049185c
Branch: refs/heads/datamap-rebase1
Commit: 8049185cbdb001588be5c5d97f73bac035d16d6c
Parents: c0133aa
Author: ravipesala
Authored: Tue Feb 27 19:14:15 2018 +0530
Committer: Jacky Li
Committed: Sun Mar 4 22:45:17 2018 +0800
--
.../carbondata/core/datamap/DataMapChooser.java | 3 +-
.../core/scan/expression/MatchExpression.java | 57
.../executer/RowLevelFilterExecuterImpl.java| 4 ++
.../lucene/LuceneFineGrainDataMapSuite.scala| 8 ++-
.../datamap/expression/MatchExpression.java | 56 ---
.../spark/sql/optimizer/CarbonFilters.scala | 3 +-
6 files changed, 67 insertions(+), 64 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8049185c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index c8c971d..f9214a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -228,7 +228,8 @@ public class DataMapChooser {
private boolean contains(DataMapMeta mapMeta, List
columnExpressions,
Set expressionTypes) {
-if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH)) {
+if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH) &&
+expressionTypes.contains(ExpressionType.TEXT_MATCH)) {
// TODO: fix it with right logic
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8049185c/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
new file mode 100644
index 000..3677b51
--- /dev/null
+++
b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+@InterfaceAudience.Internal
+public class MatchExpression extends Expression {
+ private String queryString;
+
+ public MatchExpression(String queryString) {
+this.queryString = queryString;
+ }
+
+ @Override
+ public ExpressionResult evaluate(RowIntf value)
+ throws FilterUnsupportedException, FilterIllegalMemberException {
+return new ExpressionResult(DataTypes.BOOLEAN,true);
+ }
+
+ @Override
+ public ExpressionType getFilterExpressionType() {
+return ExpressionType.TEXT_MATCH;
+ }
+
+ @Override
+ public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+
+ }
+
+ @Override
+ public String getString() {
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/docs/datamap-developer-guide.md
--
diff --git a/docs/datamap-developer-guide.md b/docs/datamap-developer-guide.md
new file mode 100644
index 000..31afd34
--- /dev/null
+++ b/docs/datamap-developer-guide.md
@@ -0,0 +1,16 @@
+# DataMap Developer Guide
+
+### Introduction
+DataMap is a data structure that can be used to accelerate certain query of
the table. Different DataMap can be implemented by developers.
+Currently, there are two 2 types of DataMap supported:
+1. IndexDataMap: DataMap that leveraging index to accelerate filter query
+2. MVDataMap: DataMap that leveraging Materialized View to accelerate olap
style query, like SPJG query (select, predicate, join, groupby)
+
+### DataMap provider
+When user issues `CREATE DATAMAP dm ON TABLE main USING 'provider'`, the
corresponding DataMapProvider implementation will be created and initialized.
+Currently, the provider string can be:
+1. preaggregate: one type of MVDataMap that do pre-aggregate of single table
+2. timeseries: one type of MVDataMap that do pre-aggregate based on time
dimension of the table
+3. class name IndexDataMapFactory implementation: Developer can implement new
type of IndexDataMap by extending IndexDataMapFactory
+
+When user issues `DROP DATAMAP dm ON TABLE main`, the corresponding
DataMapProvider interface will be called.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
--
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 3bc4547..007ba2f 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -43,8 +43,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
-import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import
org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -756,7 +755,7 @@ public class CarbonTableInputFormat extends
FileInputFormat {
DistributableDataMapFormat datamapDstr =
new DistributableDataMapFormat(absoluteTableIdentifier,
dataMapExprWrapper,
segmentIds, partitionsToPrune,
- BlockletDataMapFactory.class.getName());
+ BlockletIndexDataMapFactory.class.getName());
prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
// Apply expression on the blocklets.
prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
--
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index b1962c1..f208c92 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -286,7 +286,7 @@ class TestPreAggCreateCommand extends QueryTest with
BeforeAndAfterAll {
| GROUP BY dob,name
""".stripMargin)
}
-assert(e.getMessage.contains(s"$timeSeries keyword missing"))
+assert(e.getMessage.contains("Only 'path' dmproperty is allowed for this
datamap"))
sql("DROP TABLE IF EXISTS maintabletime")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
--
diff --git
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
--
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 000..fbb93b6
--- /dev/null
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+ private CarbonTable table;
+
+ public CarbonLoadModelBuilder(CarbonTable table) {
+this.table = table;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @return a new CarbonLoadModel instance
+ */
+ public CarbonLoadModel build(
+ Map options) throws InvalidLoadOptionException,
IOException {
+Map optionsFinal =
LoadOption.fillOptionWithDefaultValue(options);
+optionsFinal.put("sort_scope", "no_sort");
+if (!options.containsKey("fileheader")) {
+ List csvHeader =
table.getCreateOrderColumn(table.getTableName());
+ String[] columns = new String[csvHeader.size()];
+ for (int i = 0; i < columns.length; i++) {
+columns[i] = csvHeader.get(i).getColName();
+ }
+ optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+}
+CarbonLoadModel model = new CarbonLoadModel();
+
+// we have provided 'fileheader', so it hadoopConf can be null
+build(options, optionsFinal, model, null);
+
+// set default values
+
model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options,
"onepass", "false")));
+model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost",
null));
+try {
+
model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options,
"dictport", "-1")));
+} catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+}
+return model;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for
optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there
'fileheader' is not set in
+ * user provided load options
+ */
+ public void build(
+ Map options,
+ Map optionsFinal,
+ CarbonLoadModel
[CARBONDATA-1997] Add CarbonWriter SDK API
Added a new module called store-sdk, and added a CarbonWriter API, it can be
used to write Carbondata files to a specified folder, without Spark and Hadoop
dependency. User can use this API in any environment.
This closes #1967
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fccdabf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fccdabf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fccdabf
Branch: refs/heads/carbonstore-rebase5
Commit: 5fccdabfc1cc4656d75e51867dcfcb250c505c91
Parents: fc31be7
Author: Jacky Li
Authored: Sat Feb 10 19:44:23 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:13 2018 +0800
--
.../org/apache/carbondata/common/Strings.java | 40
.../apache/carbondata/common/StringsSuite.java | 53 +
.../core/metadata/schema/table/CarbonTable.java | 7 +
.../schema/table/CarbonTableBuilder.java| 72 +++
.../core/metadata/schema/table/TableSchema.java | 7 +
.../schema/table/TableSchemaBuilder.java| 107 ++
.../schema/table/CarbonTableBuilderSuite.java | 86
.../metadata/schema/table/CarbonTableTest.java | 12 +-
.../schema/table/TableSchemaBuilderSuite.java | 56 ++
.../carbondata/spark/util/DataLoadingUtil.scala | 45 +
pom.xml | 7 +
store/sdk/pom.xml | 130 +
.../carbondata/sdk/file/CSVCarbonWriter.java| 89 +
.../carbondata/sdk/file/CarbonWriter.java | 51 +
.../sdk/file/CarbonWriterBuilder.java | 194 +++
.../org/apache/carbondata/sdk/file/Field.java | 74 +++
.../org/apache/carbondata/sdk/file/Schema.java | 74 +++
.../sdk/file/CSVCarbonWriterSuite.java | 127
18 files changed, 1225 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fccdabf/common/src/main/java/org/apache/carbondata/common/Strings.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java
b/common/src/main/java/org/apache/carbondata/common/Strings.java
new file mode 100644
index 000..23288dd
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common;
+
+import java.util.Objects;
+
+public class Strings {
+
+ /**
+ * Provide same function as mkString in Scala.
+ * This is added to avoid JDK 8 dependency.
+ */
+ public static String mkString(String[] strings, String delimeter) {
+Objects.requireNonNull(strings);
+Objects.requireNonNull(delimeter);
+StringBuilder builder = new StringBuilder();
+for (int i = 0; i < strings.length; i++) {
+ builder.append(strings[i]);
+ if (i != strings.length - 1) {
+builder.append(delimeter);
+ }
+}
+return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fccdabf/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
--
diff --git
a/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
new file mode 100644
index 000..65da32b
--- /dev/null
+++ b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the
Revert "[CARBONDATA-2023][DataLoad] Add size base block allocation in data
loading"
This reverts commit 6dd8b038fc898dbf48ad30adfc870c19eb38e3d0.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1d85e916
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1d85e916
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1d85e916
Branch: refs/heads/carbonstore-rebase5
Commit: 1d85e916f6a0f070960555fb18ee4cd8acbfa315
Parents: 6216294
Author: Jacky Li
Authored: Sat Feb 10 10:34:59 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:32:13 2018 +0800
--
.../constants/CarbonLoadOptionConstants.java| 10 -
.../core/datastore/block/TableBlockInfo.java| 29 --
.../carbondata/core/util/CarbonProperties.java | 11 -
docs/useful-tips-on-carbondata.md | 1 -
.../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +-
.../spark/sql/hive/DistributionUtil.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala| 18 +-
.../merger/NodeMultiBlockRelation.java | 40 --
.../processing/util/CarbonLoaderUtil.java | 494 +++
.../processing/util/CarbonLoaderUtilTest.java | 125 -
10 files changed, 183 insertions(+), 551 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1d85e916/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..bcfeba0 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,14 +114,4 @@ public final class CarbonLoadOptionConstants {
*/
public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
- /**
- * enable block size based block allocation while loading data. By default,
carbondata assigns
- * blocks to node based on block number. If this option is set to `true`,
carbondata will
- * consider block size first and make sure that all the nodes will process
almost equal size of
- * data. This option is especially useful when you encounter skewed data.
- */
- @CarbonProperty
- public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
- = "carbon.load.skewedDataOptimization.enabled";
- public static final String
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1d85e916/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c0cebe0..a7bfdba 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.core.datastore.block;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@@ -100,20 +98,6 @@ public class TableBlockInfo implements Distributable,
Serializable {
private String dataMapWriterPath;
- /**
- * comparator to sort by block size in descending order.
- * Since each line is not exactly the same, the size of a InputSplit may
differs,
- * so we allow some deviation for these splits.
- */
- public static final Comparator DATA_SIZE_DESC_COMPARATOR =
- new Comparator() {
-@Override public int compare(Distributable o1, Distributable o2) {
- long diff =
- ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo)
o2).getBlockLength();
- return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
-}
- };
-
public TableBlockInfo(String filePath, long blockOffset, String segmentId,
String[] locations, long blockLength, ColumnarFormatVersion version,
String[] deletedDeltaFilePath) {
@@ -450,17 +434,4 @@ public class TableBlockInfo implements Distributable,
Serializable {
public void setDataMapWriterPath(String dataMapWriterPath) {
this.dataMapWriterPath = dataMapWriterPath;
}
-
- @Override
- public String toString() {
-final StringBuilder sb = new StringBuilder("TableBlockInfo{");
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
--
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index f605b22..000
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.DataField;
-import
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
-import
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
-import
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be
merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data
separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends
AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(
-
UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private BucketingInfo bucketingInfo;
-
- public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[]
inputDataFields,
- BucketingInfo bucketingInfo) {
-this.bucketingInfo = bucketingInfo;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
-this.sortParameters = sortParameters;
- }
-
- @Override public Iterator[] sort(Iterator[]
iterators)
- throws CarbonDataLoadingException {
-UnsafeSortDataRows[] sortDataRows = new
UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
-UnsafeIntermediateMerger[] intermediateFileMergers =
-new UnsafeIntermediateMerger[sortDataRows.length];
-int inMemoryChunkSizeInMB =
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-inMemoryChunkSizeInMB = inMemoryChunkSizeInMB /
bucketingInfo.getNumberOfBuckets();
-if (inMemoryChunkSizeInMB < 5) {
- inMemoryChunkSizeInMB = 5;
-}
-try {
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-SortParameters parameters = sortParameters.getCopy();
-parameters.setPartitionID(i + "");
-setTempLocation(parameters);
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55c4e438/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
deleted file mode 100644
index 6629d31..000
---
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.chunk.impl;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
-import
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-
-/**
- * This class is gives access to fixed length dimension data chunk store
- */
-public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
-
- /**
- * Constructor
- *
- * @param dataChunkdata chunk
- * @param invertedIndexinverted index
- * @param invertedIndexReverse reverse inverted index
- * @param numberOfRows number of rows
- * @param columnValueSize size of each column value
- */
- public FixedLengthDimensionDataChunk(byte[] dataChunk, int[] invertedIndex,
- int[] invertedIndexReverse, int numberOfRows, int columnValueSize) {
-long totalSize = null != invertedIndex ?
-dataChunk.length + (2 * numberOfRows *
CarbonCommonConstants.INT_SIZE_IN_BYTE) :
-dataChunk.length;
-dataChunkStore = DimensionChunkStoreFactory.INSTANCE
-.getDimensionChunkStore(columnValueSize, null != invertedIndex,
numberOfRows, totalSize,
-DimensionStoreType.FIXEDLENGTH);
-dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
- }
-
- /**
- * Below method will be used to fill the data based on offset and row id
- *
- * @param data data to filed
- * @param offset offset from which data need to be filed
- * @param indexrow id of the chunk
- * @param keyStructureInfo define the structure of the key
- * @return how many bytes was copied
- */
- @Override public int fillChunkData(byte[] data, int offset, int index,
- KeyStructureInfo keyStructureInfo) {
-dataChunkStore.fillRow(index, data, offset);
-return dataChunkStore.getColumnValueSize();
- }
-
- /**
- * Converts to column dictionary integer value
- *
- * @param rowId
- * @param columnIndex
- * @param row
- * @param restructuringInfo
- * @return
- */
- @Override public int fillConvertedChunkData(int rowId, int columnIndex,
int[] row,
- KeyStructureInfo restructuringInfo) {
-row[columnIndex] = dataChunkStore.getSurrogate(rowId);
-return columnIndex + 1;
- }
-
- /**
- * Fill the data to vector
- *
- * @param vectorInfo
- * @param column
- * @param restructuringInfo
- * @return next column index
- */
- @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo,
int column,
- KeyStructureInfo restructuringInfo) {
-ColumnVectorInfo columnVectorInfo = vectorInfo[column];
-int offset = columnVectorInfo.offset;
-int vectorOffset = columnVectorInfo.vectorOffset;
-int len = columnVectorInfo.size + offset;
-CarbonColumnVector vector = columnVectorInfo.vector;
-for (int j = offset; j < len; j++) {
- int dict = dataChunkStore.getSurrogate(j);
- if (columnVectorInfo.directDictionaryGenerator == null) {
-vector.putInt(vectorOffset++, dict);
- } else {
-Object
[CARBONDATA-2156] Add interface annotation
InterfaceAudience and InterfaceStability annotation should be added for user
and developer
1.InetfaceAudience can be User and Developer
2.InterfaceStability can be Stable, Evolving, Unstable
This closes #1968
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68c16bb5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68c16bb5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68c16bb5
Branch: refs/heads/carbonstore-rebase5
Commit: 68c16bb5e26001abdae6d521742e0dfa1fc808d9
Parents: bfdf3e3
Author: Jacky Li
Authored: Sun Feb 11 10:12:10 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:04:49 2018 +0800
--
.../common/annotations/InterfaceAudience.java | 58
.../common/annotations/InterfaceStability.java | 69
2 files changed, 127 insertions(+)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68c16bb5/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
--
diff --git
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
new file mode 100644
index 000..fa9729d
--- /dev/null
+++
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * This annotation is ported and modified from Apache Hadoop project.
+ *
+ * Annotation to inform users of a package, class or method's intended
audience.
+ * Currently the audience can be {@link User}, {@link Developer}
+ *
+ * Public classes that are not marked with this annotation must be
+ * considered by default as {@link Developer}.
+ *
+ * External applications must only use classes that are marked {@link User}.
+ *
+ * Methods may have a different annotation that it is more restrictive
+ * compared to the audience classification of the class. Example: A class
+ * might be {@link User}, but a method may be {@link Developer}
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class InterfaceAudience {
+ /**
+ * Intended for use by any project or application.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface User { }
+
+ /**
+ * Intended only for developers to extend interface for CarbonData project
+ * For example, new Datamap implementations.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Developer { }
+
+ private InterfaceAudience() { } // Audience can't exist on its own
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68c16bb5/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
--
diff --git
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
new file mode 100644
index 000..b8e5e52
--- /dev/null
+++
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by
[CARBONDATA-2186] Add InterfaceAudience.Internal to annotate internal interface
This closes #1986
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8996cd4a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8996cd4a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8996cd4a
Branch: refs/heads/carbonstore-rebase5
Commit: 8996cd4a412ddb4bcded85a7320b106d20692c52
Parents: d32c0cf
Author: Jacky Li
Authored: Tue Feb 20 11:16:53 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:08:30 2018 +0800
--
.../java/org/apache/carbondata/common/Maps.java | 2 +-
.../org/apache/carbondata/common/Strings.java| 2 +-
.../common/annotations/InterfaceAudience.java| 19 ++-
.../common/annotations/InterfaceStability.java | 2 +-
.../loading/model/CarbonLoadModelBuilder.java| 2 +-
.../processing/loading/model/LoadOption.java | 2 +-
.../carbondata/sdk/file/CSVCarbonWriter.java | 4 +---
7 files changed, 20 insertions(+), 13 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8996cd4a/common/src/main/java/org/apache/carbondata/common/Maps.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Maps.java
b/common/src/main/java/org/apache/carbondata/common/Maps.java
index 14fc329..4e76192 100644
--- a/common/src/main/java/org/apache/carbondata/common/Maps.java
+++ b/common/src/main/java/org/apache/carbondata/common/Maps.java
@@ -21,7 +21,7 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
-@InterfaceAudience.Developer
+@InterfaceAudience.Internal
public class Maps {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8996cd4a/common/src/main/java/org/apache/carbondata/common/Strings.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java
b/common/src/main/java/org/apache/carbondata/common/Strings.java
index 08fdc3c..23c7f9f 100644
--- a/common/src/main/java/org/apache/carbondata/common/Strings.java
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -21,7 +21,7 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
-@InterfaceAudience.Developer
+@InterfaceAudience.Internal
public class Strings {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8996cd4a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
--
diff --git
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
index fa9729d..8d214ff 100644
---
a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
+++
b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceAudience.java
@@ -25,10 +25,10 @@ import java.lang.annotation.RetentionPolicy;
* This annotation is ported and modified from Apache Hadoop project.
*
* Annotation to inform users of a package, class or method's intended
audience.
- * Currently the audience can be {@link User}, {@link Developer}
+ * Currently the audience can be {@link User}, {@link Developer}, {@link
Internal}
*
* Public classes that are not marked with this annotation must be
- * considered by default as {@link Developer}.
+ * considered by default as {@link Internal}.
*
* External applications must only use classes that are marked {@link User}.
*
@@ -47,12 +47,21 @@ public class InterfaceAudience {
public @interface User { }
/**
- * Intended only for developers to extend interface for CarbonData project
- * For example, new Datamap implementations.
+ * Intended for developers to develop extension for Apache CarbonData project
+ * For example, "Index DataMap" to add a new index implementation, etc
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
- public @interface Developer { }
+ public @interface Developer {
+String[] value();
+ }
+
+ /**
+ * Intended only for internal usage within Apache CarbonData project.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Internal { }
private InterfaceAudience() { } // Audience can't exist on its own
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8996cd4a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
--
diff --git
[CARBONDATA-1997] Add CarbonWriter SDK API
Added a new module called store-sdk, and added a CarbonWriter API, it can be
used to write Carbondata files to a specified folder, without Spark and Hadoop
dependency. User can use this API in any environment.
This closes #1967
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9508633
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9508633
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9508633
Branch: refs/heads/carbonstore-rebase5
Commit: a95086333e665752347b67812a663eac5e8ca8c7
Parents: 68c16bb
Author: Jacky Li
Authored: Sat Feb 10 19:44:23 2018 +0800
Committer: Jacky Li
Committed: Sun Mar 4 20:04:49 2018 +0800
--
.../org/apache/carbondata/common/Strings.java | 40
.../apache/carbondata/common/StringsSuite.java | 53 +
.../core/metadata/schema/table/CarbonTable.java | 7 +
.../schema/table/CarbonTableBuilder.java| 72 +++
.../core/metadata/schema/table/TableSchema.java | 7 +
.../schema/table/TableSchemaBuilder.java| 107 ++
.../schema/table/CarbonTableBuilderSuite.java | 86
.../metadata/schema/table/CarbonTableTest.java | 12 +-
.../schema/table/TableSchemaBuilderSuite.java | 56 ++
.../carbondata/spark/util/DataLoadingUtil.scala | 45 +
pom.xml | 7 +
store/sdk/pom.xml | 130 +
.../carbondata/sdk/file/CSVCarbonWriter.java| 89 +
.../carbondata/sdk/file/CarbonWriter.java | 51 +
.../sdk/file/CarbonWriterBuilder.java | 194 +++
.../org/apache/carbondata/sdk/file/Field.java | 74 +++
.../org/apache/carbondata/sdk/file/Schema.java | 74 +++
.../sdk/file/CSVCarbonWriterSuite.java | 127
18 files changed, 1225 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9508633/common/src/main/java/org/apache/carbondata/common/Strings.java
--
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java
b/common/src/main/java/org/apache/carbondata/common/Strings.java
new file mode 100644
index 000..23288dd
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common;
+
+import java.util.Objects;
+
+public class Strings {
+
+ /**
+ * Provide same function as mkString in Scala.
+ * This is added to avoid JDK 8 dependency.
+ */
+ public static String mkString(String[] strings, String delimeter) {
+Objects.requireNonNull(strings);
+Objects.requireNonNull(delimeter);
+StringBuilder builder = new StringBuilder();
+for (int i = 0; i < strings.length; i++) {
+ builder.append(strings[i]);
+ if (i != strings.length - 1) {
+builder.append(delimeter);
+ }
+}
+return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9508633/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
--
diff --git
a/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
new file mode 100644
index 000..65da32b
--- /dev/null
+++ b/common/src/test/java/org/apache/carbondata/common/StringsSuite.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the
http://git-wip-us.apache.org/repos/asf/carbondata/blob/83df87dd/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
--
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 000..fbb93b6
--- /dev/null
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+ private CarbonTable table;
+
+ public CarbonLoadModelBuilder(CarbonTable table) {
+this.table = table;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @return a new CarbonLoadModel instance
+ */
+ public CarbonLoadModel build(
+ Map options) throws InvalidLoadOptionException,
IOException {
+Map optionsFinal =
LoadOption.fillOptionWithDefaultValue(options);
+optionsFinal.put("sort_scope", "no_sort");
+if (!options.containsKey("fileheader")) {
+ List csvHeader =
table.getCreateOrderColumn(table.getTableName());
+ String[] columns = new String[csvHeader.size()];
+ for (int i = 0; i < columns.length; i++) {
+columns[i] = csvHeader.get(i).getColName();
+ }
+ optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+}
+CarbonLoadModel model = new CarbonLoadModel();
+
+// we have provided 'fileheader', so it hadoopConf can be null
+build(options, optionsFinal, model, null);
+
+// set default values
+
model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options,
"onepass", "false")));
+model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost",
null));
+try {
+
model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options,
"dictport", "-1")));
+} catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+}
+return model;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for
optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there
'fileheader' is not set in
+ * user provided load options
+ */
+ public void build(
+ Map options,
+ Map optionsFinal,
+ CarbonLoadModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/92c9f224/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
--
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 69f5ceb..22d1df1 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -43,10 +43,9 @@ import
org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
-import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -64,8 +63,8 @@ import
org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -121,7 +120,6 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
queryProperties.queryStatisticsRecorder =
CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
-QueryUtil.resolveQueryModel(queryModel);
QueryStatistic queryStatistic = new QueryStatistic();
// sort the block info
// so block will be loaded in sorted order this will be required for
@@ -168,12 +166,12 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR,
System.currentTimeMillis());
queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
// calculating the total number of aggeragted columns
-int measureCount = queryModel.getQueryMeasures().size();
+int measureCount = queryModel.getProjectionMeasures().size();
int currentIndex = 0;
DataType[] dataTypes = new DataType[measureCount];
-for (QueryMeasure carbonMeasure : queryModel.getQueryMeasures()) {
+for (ProjectionMeasure carbonMeasure : queryModel.getProjectionMeasures())
{
// adding the data type and aggregation type of all the measure this
// can be used
// to select the aggregator
@@ -198,9 +196,11 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
queryStatistic = new QueryStatistic();
// dictionary column unique column id to dictionary mapping
// which will be used to get column actual data
-queryProperties.columnToDictionayMapping = QueryUtil
-.getDimensionDictionaryDetail(queryModel.getQueryDimension(),
-queryProperties.complexFilterDimension,
queryModel.getAbsoluteTableIdentifier(),
+queryProperties.columnToDictionayMapping =
+QueryUtil.getDimensionDictionaryDetail(
+queryModel.getProjectionDimensions(),
+queryProperties.complexFilterDimension,
+queryModel.getAbsoluteTableIdentifier(),
tableProvider);
queryStatistic
.addStatistics(QueryStatisticsConstants.LOAD_DICTIONARY,
System.currentTimeMillis());
@@ -263,8 +263,8 @@ public abstract class AbstractQueryExecutor implements
QueryExecutor {
// and query will be executed based on that infos
for (int i = 0; i < queryProperties.dataBlocks.size(); i++) {
AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
- BlockletDataRefNodeWrapper dataRefNode =
- (BlockletDataRefNodeWrapper) abstractIndex.getDataRefNode();
+ BlockletDataRefNode dataRefNode =
+ (BlockletDataRefNode) abstractIndex.getDataRefNode();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dcfe73b8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
--
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 8d394db..e69de29 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, List, Locale}
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl,
TaskAttemptContextImpl}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionedFile}
-import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil,
DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
- val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * get data loading options and initialise default value
- */
- def getDataLoadingOptions(
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String]): mutable.Map[String, String] = {
-val optionsFinal = scala.collection.mutable.Map[String, String]()
-optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
-optionsFinal.put("escapechar",
- CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
-optionsFinal.put(
- "serialization_null_format",
-