[GitHub] carbondata issue #3065: [HOTFIX] Optimize presto-guide
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3065 @zzcclp Can you verify the presto with current master. There are changes related to Hive metastore is done now. So now carbon behaves as a one of the hive supported format in presto. Please check and let us know your feedback. ---
[GitHub] carbondata issue #3068: [HOTFIX] Fixed NPE during query with Local Dictionar...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3068 LGTM ---
[GitHub] carbondata issue #3068: [HOTFIX] Fixed NPE during query with Local Dictionar...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3068 > why does one segment have some blocklet encoded with local dictionary and some without local dictionary ? It is because carbon generates a dictionary based on the column value count threshold, so once it reaches that threshold it stops generating the dictionary. There are scenarios where some blocks/blocklets are with in threshold and some are not, thats why some blocks has local dictionary and some don't have ---
[GitHub] carbondata issue #3066: [CARBONDATA-3244] Add benchmark for Change Data Capt...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3066 > i think the query performance of carbon_solution is lower than hive_solution's, because carbon_solution has more segment (insert generates a segment and update generates more segment) > Do we have some method to optimize this? Since we are updating the existing data it creates extra files like delete delta and incremental carbondata files It may degrade query performance a little but when do the compaction it will get improved. ---
[GitHub] carbondata pull request #3066: [CARBONDATA-3244] Add benchmark for Change Da...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3066#discussion_r247119761 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.benchmark + +import java.io.File +import java.sql.Date + +import org.apache.commons.lang3.time.DateUtils +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * Benchmark for Change Data Capture scenario. + * This test simulates updates to history table using CDC table. + * + * The benchmark shows performance of two update methods: + * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for hive warehouse. + * 2. carbon_solution, which uses CarbonData's update syntax to update the history table directly. + * + * When running in a 8-cores laptop, the benchmark shows: + * + * 1. test one + * History table 1M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 13,516 ms + * carbon_solution: total process time takes 7,521 ms + * + * + * 2. test two + * History table 10M records, update 10K records everyday and insert 10K records everyday, + * simulated 3 days. + * hive_solution: total process time takes 104,250 ms + * carbon_solution: total process time takes 17,384 ms + * + */ +object CDCBenchmark { + + // Schema for history table + // Table name: dw_order + // +-+---+-+ + // | Column name | Data type | Cardinality | + // +-+---+-+ + // | order_id| string| 10,000,000 | + // +-+---+-+ + // | customer_id | string| 10,000,000 | + // +-+---+-+ + // | start_date | date | NA | + // +-+---+-+ + // | end_date| date | NA | + // +-+---+-+ + // | state | int | 4 | + // +-+---+-+ + case class Order (order_id: String, customer_id: String, start_date: Date, end_date: Date, + state: Int) + + // Schema for CDC data which is used for update to history table every day + // Table name: ods_order + // +-+---+-+ + // | Column name | Data type | Cardinality | + // +-+---+-+ + // | order_id| string| 10,000,000 | + // +-+---+-+ + // | customer_id | string| 10,000,000 | + // +-+---+-+ + // | update_date | date | NA | + // +-+---+-+ + // | state | int | 4 | + // +-+---+-+ + case class CDC (order_id: String, customer_id: String, update_date: Date, state: Int) + + // number of records for first day + val numOrders = 1000 + + // number of records to update every day + val numUpdateOrdersDaily = 1 + + // number of new records to insert every day + val newNewOrdersDaily = 1 + + // number of days to simulate + val numDays = 3 + + // print eveyday result or not to console + val printDetail = false + + def generateDataForDay0( + sparkSession: SparkSession, + numOrders: Int = 100, + startDate: Date = Date.valueOf("2018-05-01")): DataFrame = { +import sparkSession.implicits._ +sparkSession.sparkContext.parallelize(1 to numOrders, 4) + .map { x => Order(s"order$x", s"customer$x", startDate, Date.valueOf("-01-01"), 1) +
[GitHub] carbondata issue #3060: [HOTFIX] Exclude filter doesn't work in presto carbo...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3060 LGTM ---
[GitHub] carbondata issue #3056: [CARBONDATA-3236] Fix for JVM Crash for insert into ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3056 LGTM ---
[GitHub] carbondata issue #3001: [CARBONDATA-3220] Support presto to read stream segm...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3001 LGTM ---
[GitHub] carbondata issue #3001: [CARBONDATA-3220] Support presto to read stream segm...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3001 @QiangCai Please don't add binary files :( . you supposed to generate files and execute the test ---
[GitHub] carbondata issue #3055: [CARBONDATA-3237] Fix presto carbon issues in dictio...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3055 LGTM ---
[GitHub] carbondata issue #3029: [CARBONDATA-3200] No-Sort compaction
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3029 LGTM ---
[GitHub] carbondata pull request #3055: [CARBONDATA-3237] Fix presto carbon issues in...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3055#discussion_r246277431 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java --- @@ -142,5 +144,17 @@ public SliceStreamReader(int batchSize, DataType dataType, @Override public void reset() { builder = type.createBlockBuilder(null, batchSize); +this.isLocalDict = false; + } + + @Override public void putInt(int rowId, int value) { +Object data = DataTypeUtil --- End diff -- Direct overriding does not create problem to local dictionary? how do you handle local dictionary here? ---
[GitHub] carbondata pull request #3055: [CARBONDATA-3237] Fix presto carbon issues in...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3055#discussion_r246277337 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java --- @@ -95,22 +105,14 @@ public SliceStreamReader(int batchSize, DataType dataType, dictOffsets[dictOffsets.length - 1] = size; dictionaryBlock = new VariableWidthBlock(dictionary.getDictionarySize(), Slices.wrappedBuffer(singleArrayDictValues), dictOffsets, Optional.of(nulls)); -values = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray(); +this.isLocalDict = true; } - @Override public void setBatchSize(int batchSize) { + --- End diff -- remove empty space ---
[GitHub] carbondata issue #3001: [CARBONDATA-3220] Support presto to read stream segm...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3001 @QiangCai Please try to add test case to it, otherwise it will be easy to break in future commits. ---
[GitHub] carbondata issue #3001: [CARBONDATA-3220] Support presto to read stream segm...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3001 LGTM ---
[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2971 LGTM @QiangCai I feel it is better to keep in tableproprties as it is not supposed changed for each load. We can further discuss and raise another PR if needed, I am merging this now. Thanks for working on it. ---
[GitHub] carbondata issue #3024: [CARBONDATA-3230] Add alter test case for datasource
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3024 LGTM ---
[GitHub] carbondata issue #3051: [CARBONDATA-3221] Fix the error of SDK don't support...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3051 LGTM ---
[GitHub] carbondata issue #3014: [CARBONDATA-3201] Added load level SORT_SCOPE
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3014 LGTM ---
[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2971 @QiangCai we should restrict changing that property from table properties. I am just explaining about how we can do the compaction on range column since there are similarities with partitioning I mentioned it here. I feel range boundaries can be recalculated during the compaction using min/max of range column and go for the merge sort. ---
[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2971 @QiangCai My question how the user can benefit if he chooses a different range column for each load. I feel range column should be at the table level not at the load level. And regarding compaction, yes currently after compaction it becomes local sort but there is a way we can support range column compaction like how we do compaction for partitions. This work can be done in future. But if you allow the user to choose range column at each load level then this type of compaction cannot be done. ---
[GitHub] carbondata issue #3039: [CARBONDATA-3217] Optimize implicit filter expressio...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3039 LGTM ---
[GitHub] carbondata pull request #3029: [CARBONDATA-3200] No-Sort compaction
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3029#discussion_r244959393 --- Diff: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java --- @@ -114,6 +113,31 @@ public void startFinalMerge() throws CarbonDataWriterException { startSorting(filesToMerge); } + /** + * Below method will be used to add in memory raw result iterator to priority queue. + * This will be called in case of compaction, when it is compacting sorted and unsorted + * both type of carbon data file + * This method will add sorted file's RawResultIterator to priority queue using + * InMemorySortTempChunkHolder as wrapper + * + * @param sortedRawResultMergerList + * @param segmentProperties + * @param noDicAndComplexColumns + * @throws CarbonSortKeyAndGroupByException + */ + public void addInMemoryRawResultIterator(List sortedRawResultMergerList, + SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns, + DataType[] measureDataType) + throws CarbonSortKeyAndGroupByException { +for (RawResultIterator rawResultIterator : sortedRawResultMergerList) { + InMemorySortTempChunkHolder inMemorySortTempChunkHolder = + new InMemorySortTempChunkHolder(rawResultIterator, segmentProperties, + noDicAndComplexColumns, sortParameters, measureDataType); + inMemorySortTempChunkHolder.readRow(); --- End diff -- Don't need to check hasNext here before reading the row first time? ---
[GitHub] carbondata issue #3021: [CARBONDATA-3193] Cdh5.14.2 spark2.2.0 support
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3021 I will check on it On Wed, 2 Jan 2019 at 10:37 PM, Chandrasekhar Saripaka < notificati...@github.com> wrote: > b0733ec > <https://github.com/apache/carbondata/commit/b0733ecbf380d7956dee57a9048dd7537620744e> > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/carbondata/pull/3021#issuecomment-450922573>, > or mute the thread > <https://github.com/notifications/unsubscribe-auth/AHwdxlnPiNf3k7TSQcnRFjD_jqqCTHEiks5u_OdkgaJpZM4ZgM5V> > . > -- Thanks & Regards, Ravi ---
[GitHub] carbondata pull request #3041: [CARBONDATA-3218] Fix schema refresh and wron...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3041#discussion_r244692501 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java --- @@ -134,23 +127,17 @@ */ public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location, Configuration config) { -if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) { - updateSchemaTables(table, config); - parseCarbonMetadata(table, location, config); -} -if (carbonCache.get().containsKey(table)) { - return carbonCache.get().get(table); -} else { - return null; +updateSchemaTables(table, config); +CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table); +if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) { + return parseCarbonMetadata(table, location, config); } +return carbonTableCacheModel; } private void removeTableFromCache(SchemaTableName table) { --- End diff -- ok, removed the method ---
[GitHub] carbondata issue #2971: [CARBONDATA-3219] Support range partition the input ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2971 @QiangCai @jackylk Adding a `RANGE_COLUMN` at each load level does not create an issue? If user selects different range column for each load how you are going to compact when you support it in future? what is the background of giving the range_column in load level instead of create table level? ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681963 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.hashing.byteswap32 + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} + +/** + * support data skew scenario + * copy from spark: RangePartiitoner + */ +class DataSkewRangePartitioner[K: Ordering : ClassTag, V]( --- End diff -- Please add the comment how it is different from spark's range partitioner and how you do skew partitioning ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681720 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "&q
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681813 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala --- @@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_once ORDER BY name")) } + test("Make sure the result is right and sorted in global level for range_sort") { --- End diff -- Please add some test cases with scalefactor as well. ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681164 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "&q
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681062 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { +val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) +val conf = DataLoadProcessBuilder.createConfiguration(model) +val rowParser = new RowParserImpl(conf.getDataFields, conf) +val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) +TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) +} + +new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { +var row : CarbonRow = null +val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] +if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) +} else { + row = new CarbonRow(rowParser.parseRow(rawRow)) +} +rowCounter.add(1) +row + } +} + } + + def internalSampleInputFunc( --- End diff -- Please unify `internalSampleInputFunc` and `internalInputFunc` ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244680876 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "&q
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244670461 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( --- End diff -- Code is duplicated with method `loadDataUsingGlobalSort` , please try to extract common code and reuse here. ---
[GitHub] carbondata issue #3028: [CARBONDATA-3205]Fix Get Local Dictionary for empty ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3028 LGTM ---
[GitHub] carbondata pull request #3041: [CARBONDATA-3218] Fix schema refresh and wron...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3041#discussion_r244665281 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java --- @@ -234,30 +219,26 @@ public TBase create() { wrapperTableInfo.setTransactionalTable(isTransactionalTable); + CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName()); // Step 4: Load metadata info into CarbonMetadata CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); - cache.carbonTable = - CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()); - - // cache the table - carbonCache.get().put(table, cache); + cache.setCarbonTable( + CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName())); --- End diff -- It is not supposed be null as in above line only the table is loaded. But added the null check now. ---
[GitHub] carbondata pull request #3041: [WIP] Fix schema refresh and wrong query resu...
GitHub user ravipesala opened a pull request: https://github.com/apache/carbondata/pull/3041 [WIP] Fix schema refresh and wrong query result issues in presto. Problem: Schema which is updated in spark is not reflecting in presto. which results in wrong query result in presto. Solution: Update the schema in presto whenever the schema changed in spark. And also override the putNulls method in all presto readers to work for null data scenarios. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata presto-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3041.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3041 commit de54ec7faa1365e044f7b34971135c52efa00255 Author: ravipesala Date: 2018-12-31T11:50:24Z Fix schema refresh issues in presto. ---
[GitHub] carbondata issue #3029: [CARBONDATA-3200] No-Sort compaction
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3029 @NamanRastogi Lots of code is duplicated here, Please try to unify with other compactor processor to avoid the duplication. ---
[GitHub] carbondata pull request #3029: [CARBONDATA-3200] No-Sort compaction
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3029#discussion_r244319031 --- Diff: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java --- @@ -126,17 +128,24 @@ public CarbonCompactionExecutor(Map segmentMapping, // for each segment get taskblock info TaskBlockInfo taskBlockInfo = taskMap.getValue(); Set taskBlockListMapping = taskBlockInfo.getTaskSet(); + // Check if block needs sorting or not + boolean sortingRequired = + CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime()) + || !CarbonCompactionUtil.isSorted(taskBlockInfo); --- End diff -- Here we are reading each and every carbondata file footer, it will impact the compaction performance. I feel we should discuss and consider adding isSort flag also to the carbonindex file to simplify it ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244278709 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala --- @@ -157,7 +157,7 @@ case class CarbonCreateTableCommand( | tableName "$tableName", | dbName "$dbName", | tablePath "$tablePath", - | path "$tablePath", + | path "${FileFactory.addSchemeIfNotExists(tablePath)}", --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244278266 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java --- @@ -369,6 +369,24 @@ public static boolean createNewLockFile(String filePath, FileType fileType) thro LOCAL, HDFS, ALLUXIO, VIEWFS, S3 } + public static String addSchemeIfNotExists(String filePath) { --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244278116 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java --- @@ -17,62 +17,150 @@ package org.apache.carbondata.presto; -import javax.inject.Inject; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; -import org.apache.carbondata.presto.impl.CarbonTableConfig; import org.apache.carbondata.presto.impl.CarbonTableReader; +import com.facebook.presto.hive.CoercionPolicy; +import com.facebook.presto.hive.DirectoryLister; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.GenericHiveRecordCursorProvider; +import com.facebook.presto.hive.HadoopDirectoryLister; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationUpdater; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveClientModule; +import com.facebook.presto.hive.HiveCoercionPolicy; +import com.facebook.presto.hive.HiveConnectorId; +import com.facebook.presto.hive.HiveEventClient; +import com.facebook.presto.hive.HiveFileWriterFactory; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.HiveLocationService; +import com.facebook.presto.hive.HiveMetadataFactory; +import com.facebook.presto.hive.HiveNodePartitioningProvider; +import com.facebook.presto.hive.HivePageSinkProvider; +import com.facebook.presto.hive.HivePageSourceFactory; +import com.facebook.presto.hive.HivePartitionManager; +import com.facebook.presto.hive.HiveRecordCursorProvider; +import com.facebook.presto.hive.HiveSessionProperties; +import com.facebook.presto.hive.HiveSplitManager; +import com.facebook.presto.hive.HiveTableProperties; +import com.facebook.presto.hive.HiveTransactionManager; +import com.facebook.presto.hive.HiveTypeTranslator; +import com.facebook.presto.hive.HiveWriterStats; +import com.facebook.presto.hive.LocationService; +import com.facebook.presto.hive.NamenodeStats; +import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.OrcFileWriterFactory; +import com.facebook.presto.hive.PartitionUpdate; +import com.facebook.presto.hive.RcFileFileWriterFactory; +import com.facebook.presto.hive.TableParameterCodec; +import com.facebook.presto.hive.TransactionalMetadata; +import com.facebook.presto.hive.TypeTranslator; +import com.facebook.presto.hive.orc.DwrfPageSourceFactory; +import com.facebook.presto.hive.orc.OrcPageSourceFactory; +import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; +import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider; +import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory; +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; import com.facebook.presto.spi.connector.ConnectorSplitManager; -import com.facebook.presto.spi.type.Type; -import com.facebook.presto.spi.type.TypeManager; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.Multibinder; +import io.airlift.event.client.EventClient; -import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; -public class CarbondataModule implements Module { +import static org.weakref.jmx.ObjectNames.generatedNameOf; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class CarbondataModule extends HiveClientModule { --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244276202 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java --- @@ -78,32 +72,33 @@ private static final String HIVE_DEFAULT_DYNAMIC_PARTITION = "__HIVE_DEFAULT_PARTITION__"; /** - * @param carbondataColumnHandle + * @param columnHandle * @return */ - private static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) { -Type colType = carbondataColumnHandle.getColumnType(); -if (colType == BooleanType.BOOLEAN) { + private static DataType spi2CarbondataTypeMapper(HiveColumnHandle columnHandle) { +HiveType colType = columnHandle.getHiveType(); +if (colType.equals(HiveType.HIVE_BOOLEAN)) { return DataTypes.BOOLEAN; -} else if (colType == SmallintType.SMALLINT) { +} else if (colType.equals(HiveType.HIVE_SHORT)) { return DataTypes.SHORT; -} else if (colType == IntegerType.INTEGER) { +} else if (colType.equals(HiveType.HIVE_INT)) { return DataTypes.INT; -} else if (colType == BigintType.BIGINT) { +} else if (colType.equals(HiveType.HIVE_LONG)) { return DataTypes.LONG; -} else if (colType == DoubleType.DOUBLE) { +} else if (colType.equals(HiveType.HIVE_DOUBLE)) { return DataTypes.DOUBLE; -} else if (colType == VarcharType.VARCHAR) { +} else if (colType.equals(HiveType.HIVE_STRING)) { return DataTypes.STRING; -} else if (colType == DateType.DATE) { +} else if (colType.equals(HiveType.HIVE_DATE)) { return DataTypes.DATE; -} else if (colType == TimestampType.TIMESTAMP) { +} else if (colType.equals(HiveType.HIVE_TIMESTAMP)) { return DataTypes.TIMESTAMP; -} else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), -carbondataColumnHandle.getScale( { - return DataTypes.createDecimalType(carbondataColumnHandle.getPrecision(), - carbondataColumnHandle.getScale()); -} else { +} +else if (colType.getTypeInfo() instanceof DecimalTypeInfo) { --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244276046 --- Diff: integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala --- @@ -80,7 +80,7 @@ object CarbonDataStoreCreator { UUID.randomUUID().toString)) // val factFilePath: String = new File(dataFilePath).getCanonicalPath val storeDir: File = new File(absoluteTableIdentifier.getTablePath) - CarbonUtil.deleteFoldersAndFiles(storeDir) +// CarbonUtil.deleteFoldersAndFiles(storeDir) --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097233 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala --- @@ -72,69 +74,107 @@ object CarbonSessionExample { val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" // scalastyle:off -spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE source - | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') - """.stripMargin) -// scalastyle:on - -spark.sql( - s""" - | SELECT charField, stringField, intField - | FROM source - | WHERE stringfield = 'spark' AND decimalField > 40 - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source WHERE length(stringField) = 5 - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source WHERE date_format(dateField, "-MM-dd") = "2015-07-23" - """.stripMargin).show() - -spark.sql("SELECT count(stringField) FROM source").show() - -spark.sql( - s""" - | SELECT sum(intField), stringField - | FROM source - | GROUP BY stringField - """.stripMargin).show() - -spark.sql( - s""" - | SELECT t1.*, t2.* - | FROM source t1, source t2 - | WHERE t1.stringField = t2.stringField - """.stripMargin).show() - -spark.sql( - s""" - | WITH t1 AS ( - | SELECT * FROM source - | UNION ALL - | SELECT * FROM source - | ) - | SELECT t1.*, t2.* - | FROM t1, source t2 - | WHERE t1.stringField = t2.stringField - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source - | WHERE stringField = 'spark' and floatField > 2.8 - """.stripMargin).show() +//spark.sql( +// s""" +// | LOAD DATA LOCAL INPATH '$path' +// | INTO TABLE source +// | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') +// """.stripMargin) +//// scalastyle:on +// +//spark.sql( +// s""" +// | CREATE TABLE source_cs( +// | shortField SHORT, +// | intField INT, +// | bigintField LONG, +// | doubleField DOUBLE, +// | stringField STRING, +// | timestampField TIMESTAMP, +// | decimalField DECIMAL(18,2), +// | dateField DATE, +// | charField CHAR(5), +// | floatField FLOAT +// | ) +// | using carbon +// | location 'file://${ExampleUtils.storeLocation}' +// """.stripMargin) +// +//spark.sql("insert into source_cs select * from source") +// +//spark.sql( +// s""" +// | CREATE TABLE source_par( +// | shortField SHORT, +// | intField INT, +// | bigintField LONG, +// | doubleField DOUBLE, +// | stringField STRING, +// | timestampField TIMESTAMP, +// | decimalField DECIMAL(18,2), +// | dateField DATE, +// | charField CHAR(5), +// | floatField FLOAT +// | ) +// | using parquet +// """.stripMargin) +// +//spark.sql("insert into source_par select * from source") +//spark.sql( +// s""" +// | SELECT charField, stringField, intField +// | FROM source +// | WHERE stringfield = 'spark' AND decimalField > 40 +// """.stripMargin).show() +// +//spark.sql( +// s""" +// | SELECT * +// | FROM source WHERE length(stringField) = 5 +// """.stripMargin).show() +// +//spark.sql( +// s""" +// | SELECT * +// | FROM source WHERE date_format(dateField, "-MM-dd") = "2015-07-23" +// """.stripMargin).show() +// +//spark.sql("SELECT count(stringField) FROM source").show() +// +//spark.sql( +// s""" +// | SELECT sum(intField), stringField +// | FROM source +// | GROUP BY stringField +// """.stripMargin).show() +// +//spark.sql( +// s""" --- End diff -- Reverted ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097300 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java --- @@ -17,69 +17,179 @@ package org.apache.carbondata.presto; +import java.lang.management.ManagementFactory; +import java.lang.reflect.*; import java.util.Map; +import java.util.Optional; +import java.util.Set; import static java.util.Objects.requireNonNull; -import com.facebook.presto.spi.ConnectorHandleResolver; +import org.apache.carbondata.presto.impl.CarbonTableConfig; + +import com.facebook.presto.hive.HiveConnector; +import com.facebook.presto.hive.HiveConnectorFactory; +import com.facebook.presto.hive.HiveMetadataFactory; +import com.facebook.presto.hive.HiveProcedureModule; +import com.facebook.presto.hive.HiveSchemaProperties; +import com.facebook.presto.hive.HiveSessionProperties; +import com.facebook.presto.hive.HiveStorageFormat; +import com.facebook.presto.hive.HiveTableProperties; +import com.facebook.presto.hive.HiveTransactionManager; +import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.hive.RebindSafeMBeanServer; +import com.facebook.presto.hive.authentication.HiveAuthenticationModule; +import com.facebook.presto.hive.metastore.HiveMetastoreModule; +import com.facebook.presto.hive.s3.HiveS3Module; +import com.facebook.presto.hive.security.HiveSecurityModule; +import com.facebook.presto.hive.security.PartitionsAwareAccessControl; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PageIndexerFactory; +import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; -import com.facebook.presto.spi.connector.*; -import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorAccessControl; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager; -import com.google.common.base.Throwables; +import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider; +import com.facebook.presto.spi.procedure.Procedure; +import com.facebook.presto.spi.type.TypeManager; +import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; import io.airlift.bootstrap.Bootstrap; import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.event.client.EventModule; import io.airlift.json.JsonModule; +import io.airlift.units.DataSize; +import org.weakref.jmx.guice.MBeanModule; +import sun.reflect.ConstructorAccessor; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static io.airlift.configuration.ConfigBinder.configBinder; /** * Build Carbondata Connector * It will be called by CarbondataPlugin */ -public class CarbondataConnectorFactory implements ConnectorFactory { +public class CarbondataConnectorFactory extends HiveConnectorFactory { private final String name; --- End diff -- removed ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097200 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java --- @@ -17,62 +17,150 @@ package org.apache.carbondata.presto; -import javax.inject.Inject; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; -import org.apache.carbondata.presto.impl.CarbonTableConfig; import org.apache.carbondata.presto.impl.CarbonTableReader; +import com.facebook.presto.hive.CoercionPolicy; +import com.facebook.presto.hive.DirectoryLister; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.GenericHiveRecordCursorProvider; +import com.facebook.presto.hive.HadoopDirectoryLister; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationUpdater; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveClientModule; +import com.facebook.presto.hive.HiveCoercionPolicy; +import com.facebook.presto.hive.HiveConnectorId; +import com.facebook.presto.hive.HiveEventClient; +import com.facebook.presto.hive.HiveFileWriterFactory; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.HiveLocationService; +import com.facebook.presto.hive.HiveMetadataFactory; +import com.facebook.presto.hive.HiveNodePartitioningProvider; +import com.facebook.presto.hive.HivePageSinkProvider; +import com.facebook.presto.hive.HivePageSourceFactory; +import com.facebook.presto.hive.HivePartitionManager; +import com.facebook.presto.hive.HiveRecordCursorProvider; +import com.facebook.presto.hive.HiveSessionProperties; +import com.facebook.presto.hive.HiveSplitManager; +import com.facebook.presto.hive.HiveTableProperties; +import com.facebook.presto.hive.HiveTransactionManager; +import com.facebook.presto.hive.HiveTypeTranslator; +import com.facebook.presto.hive.HiveWriterStats; +import com.facebook.presto.hive.LocationService; +import com.facebook.presto.hive.NamenodeStats; +import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.OrcFileWriterFactory; +import com.facebook.presto.hive.PartitionUpdate; +import com.facebook.presto.hive.RcFileFileWriterFactory; +import com.facebook.presto.hive.TableParameterCodec; +import com.facebook.presto.hive.TransactionalMetadata; +import com.facebook.presto.hive.TypeTranslator; +import com.facebook.presto.hive.orc.DwrfPageSourceFactory; +import com.facebook.presto.hive.orc.OrcPageSourceFactory; +import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; +import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider; +import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory; +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; import com.facebook.presto.spi.connector.ConnectorSplitManager; -import com.facebook.presto.spi.type.Type; -import com.facebook.presto.spi.type.TypeManager; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.Multibinder; +import io.airlift.event.client.EventClient; -import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; -public class CarbondataModule implements Module { +import static org.weakref.jmx.ObjectNames.generatedNameOf; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class CarbondataModule extends HiveClientModule { private final String connectorId; - private final TypeManager typeManager; - public CarbondataModule(String connectorId, TypeManager typeManager) { + public CarbondataModule(String connectorId) { +super(connectorId); this.connectorId = requireNonNull(connectorId, "connector id is null"); -this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097091 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java --- @@ -113,7 +132,7 @@ private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split, PrestoCarbonVectorizedRecordReader reader = new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel, (AbstractDetailQueryResultIterator) iterator, readSupport); - reader.setTaskId(carbondataSplit.getIndex()); + reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index"))); --- End diff -- ok, generalized ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097070 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java --- @@ -43,63 +44,81 @@ import static org.apache.carbondata.presto.Types.checkType; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HivePageSourceFactory; +import com.facebook.presto.hive.HivePageSourceProvider; +import com.facebook.presto.hive.HiveRecordCursorProvider; +import com.facebook.presto.hive.HiveSplit; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; -import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.type.TypeManager; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskAttemptContextImpl; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; - /** * Provider Class for Carbondata Page Source class. */ -public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider { +public class CarbondataPageSourceProvider extends HivePageSourceProvider { - private String connectorId; private CarbonTableReader carbonTableReader; private String queryId ; - - @Inject public CarbondataPageSourceProvider(CarbondataConnectorId connectorId, + private HdfsEnvironment hdfsEnvironment; + + @Inject public CarbondataPageSourceProvider( + HiveClientConfig hiveClientConfig, + HdfsEnvironment hdfsEnvironment, + Set cursorProviders, + Set pageSourceFactories, + TypeManager typeManager, CarbonTableReader carbonTableReader) { -this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); +super(hiveClientConfig, hdfsEnvironment, cursorProviders, pageSourceFactories, typeManager); this.carbonTableReader = requireNonNull(carbonTableReader, "carbonTableReader is null"); +this.hdfsEnvironment = hdfsEnvironment; } @Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns) { -this.queryId = ((CarbondataSplit)split).getQueryId(); +HiveSplit carbonSplit = +checkType(split, HiveSplit.class, "split is not class HiveSplit"); +if (carbonSplit.getSchema().getProperty("queryId") == null) { + return super.createPageSource(transactionHandle, session, split, columns); +} +this.queryId = carbonSplit.getSchema().getProperty("queryId"); +Configuration configuration = this.hdfsEnvironment.getConfiguration( +new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()), +new Path(carbonSplit.getSchema().getProperty("tablePath"))); +configuration = carbonTableReader.updateS3Properties(configuration); CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport(); PrestoCarbonVectorizedRecordReader carbonRecordReader = -createReader(split, columns, readSupport); +createReader(carbonSplit, columns, readSupport, configuration); return new CarbondataPageSource(carbonRecordReader, columns); } /** - * @param split + * @param carbonSplit * @param columns * @param readSupport --- End diff -- ok ---
[GitHub] carbondata pull request #3019: [CARBONDATA-3194] Integrating Carbon with Pre...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3019#discussion_r244097115 --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala --- @@ -72,69 +74,107 @@ object CarbonSessionExample { val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" // scalastyle:off -spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE source - | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') - """.stripMargin) -// scalastyle:on - -spark.sql( - s""" - | SELECT charField, stringField, intField - | FROM source - | WHERE stringfield = 'spark' AND decimalField > 40 - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source WHERE length(stringField) = 5 - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source WHERE date_format(dateField, "-MM-dd") = "2015-07-23" - """.stripMargin).show() - -spark.sql("SELECT count(stringField) FROM source").show() - -spark.sql( - s""" - | SELECT sum(intField), stringField - | FROM source - | GROUP BY stringField - """.stripMargin).show() - -spark.sql( - s""" - | SELECT t1.*, t2.* - | FROM source t1, source t2 - | WHERE t1.stringField = t2.stringField - """.stripMargin).show() - -spark.sql( - s""" - | WITH t1 AS ( - | SELECT * FROM source - | UNION ALL - | SELECT * FROM source - | ) - | SELECT t1.*, t2.* - | FROM t1, source t2 - | WHERE t1.stringField = t2.stringField - """.stripMargin).show() - -spark.sql( - s""" - | SELECT * - | FROM source - | WHERE stringField = 'spark' and floatField > 2.8 - """.stripMargin).show() +//spark.sql( --- End diff -- done ---
[GitHub] carbondata issue #3024: [WIP] Add ALTER ADD COLUMNS test case for using parq...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3024 @xubo245 can you more tests related to drop column, rename column , change datatype of column also here. ---
[GitHub] carbondata pull request #3001: [Presto][Streaming] support presto read strea...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3001#discussion_r244095423 --- Diff: integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala --- @@ -38,48 +38,16 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll private val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath - private val storePath = s"$rootPath/integration/presto/target/store" - private val systemPath = s"$rootPath/integration/presto/target/system" + private val storePath = rootPath + "/examples/spark2/target/store" --- End diff -- Why removed all existing testcases? ---
[GitHub] carbondata pull request #3001: [Presto][Streaming] support presto read strea...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3001#discussion_r244095320 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/stream/CarbondataStreamPageSource.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.stream; --- End diff -- Not expecting any changes in presto implementations, Changes should be only present in `PrestoCarbonVectorizedRecordReader` or `org.apache.carbondata.presto.CarbonVectorBatch`. Because these are reader classes why changes needed here. ---
[GitHub] carbondata pull request #3001: [Presto][Streaming] support presto read strea...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3001#discussion_r244095288 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java --- @@ -86,4 +86,15 @@ public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictiona builder = type.createBlockBuilder(null, batchSize); } + @Override public void putObject(int rowId, Object value) { --- End diff -- Already it has methods to handle put why added methods again in all readers. I don't think these changes required in all readers ---
[GitHub] carbondata pull request #3001: [Presto][Streaming] support presto read strea...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3001#discussion_r244095118 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java --- @@ -79,13 +80,31 @@ @Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns) { -this.queryId = ((CarbondataSplit)split).getQueryId(); +CarbondataSplit carbondataSplit = (CarbondataSplit) split; --- End diff -- Not expecting any changes in presto implementations, Changes should be only present in `PrestoCarbonVectorizedRecordReader` or `org.apache.carbondata.presto.CarbonVectorBatch`. Because these are reader classes why changes needed here. ---
[GitHub] carbondata pull request #3001: [Presto][Streaming] support presto read strea...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3001#discussion_r244093200 --- Diff: integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java --- @@ -95,6 +95,9 @@ private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType } else if (dataType == DataTypes.STRING) { return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock); } else if (DataTypes.isDecimal(dataType)) { + if (dictionary != null && dataType instanceof DecimalType) { +return new DecimalSliceStreamReader(batchSize, (DecimalType) dataType, dictionary); + } --- End diff -- I am not sure why this change is required? Can you explain? ---
[GitHub] carbondata issue #3026: [WIP] Added support to compile carbon CDH spark dist...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3026 @qiuchenjian Please check the PR description for why carbon need changes for Spark 2.2 CDH ---
[GitHub] carbondata pull request #3026: [WIP] Added support to compile carbon CDH spa...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3026#discussion_r244090720 --- Diff: integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonDictionaryUtil.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Method; + +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + +import org.apache.spark.sql.execution.vectorized.ColumnVector; + +/** + * This class uses the java reflection to create parquet dictionary class as CDH distribution uses + * twitter parquet instead of apache parquet. + */ +public class CarbonDictionaryUtil { --- End diff -- ok ---
[GitHub] carbondata pull request #3004: [CARBONDATA-3188] Create carbon table as hive...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3004#discussion_r244086681 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession} +import org.apache.spark.util.SparkUtil +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat + +class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with stored by") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |STORED BY 'carbondata' + """.stripMargin) + +verifyTable + +sql("DROP TABLE IF EXISTS source") + } + + private def verifyTable = { +val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source") +assert(table.schema.fields.length == 3) +if (SparkUtil.isSparkVersionEqualTo("2.2")) { + assert(table.storage.locationUri.get.equals(new Path(s"file:$storeLocation/source").toUri)) +} + assert(table.storage.inputFormat.get.equals(classOf[CarbonFileInputFormat[_]].getName)) + } + + test("test create table and verify the hive table correctness with using carbondata") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |using carbondata + """.stripMargin) + +verifyTable + + +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with using carbon") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |using carbon + """.stripMargin) + +verifyTable + --- End diff -- ok ---
[GitHub] carbondata pull request #3004: [CARBONDATA-3188] Create carbon table as hive...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3004#discussion_r244086693 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession} +import org.apache.spark.util.SparkUtil +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat + +class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with stored by") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |STORED BY 'carbondata' + """.stripMargin) + +verifyTable + +sql("DROP TABLE IF EXISTS source") + } + + private def verifyTable = { +val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source") +assert(table.schema.fields.length == 3) --- End diff -- ok ---
[GitHub] carbondata pull request #3004: [CARBONDATA-3188] Create carbon table as hive...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3004#discussion_r244086704 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession} +import org.apache.spark.util.SparkUtil +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat + +class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with stored by") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |STORED BY 'carbondata' --- End diff -- ok ---
[GitHub] carbondata pull request #3004: [CARBONDATA-3188] Create carbon table as hive...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3004#discussion_r244086686 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession} +import org.apache.spark.util.SparkUtil +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat + +class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with stored by") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |STORED BY 'carbondata' + """.stripMargin) + +verifyTable + +sql("DROP TABLE IF EXISTS source") + } + + private def verifyTable = { +val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source") +assert(table.schema.fields.length == 3) +if (SparkUtil.isSparkVersionEqualTo("2.2")) { + assert(table.storage.locationUri.get.equals(new Path(s"file:$storeLocation/source").toUri)) +} + assert(table.storage.inputFormat.get.equals(classOf[CarbonFileInputFormat[_]].getName)) + } + + test("test create table and verify the hive table correctness with using carbondata") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |using carbondata + """.stripMargin) + +verifyTable + + --- End diff -- ok ---
[GitHub] carbondata pull request #3004: [CARBONDATA-3188] Create carbon table as hive...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3004#discussion_r244086671 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession} +import org.apache.spark.util.SparkUtil +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat + +class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + override def afterAll(): Unit = { +sql("DROP TABLE IF EXISTS source") + } + + test("test create table and verify the hive table correctness with stored by") { +sql("DROP TABLE IF EXISTS source") +sql( + s""" + |CREATE TABLE source (key INT, value string, col1 double) + |STORED BY 'carbondata' + """.stripMargin) + +verifyTable + +sql("DROP TABLE IF EXISTS source") + } + + private def verifyTable = { +val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source") +assert(table.schema.fields.length == 3) +if (SparkUtil.isSparkVersionEqualTo("2.2")) { + assert(table.storage.locationUri.get.equals(new Path(s"file:$storeLocation/source").toUri)) +} + assert(table.storage.inputFormat.get.equals(classOf[CarbonFileInputFormat[_]].getName)) --- End diff -- OK, changed to CarbonTableInputFormat. ---
[GitHub] carbondata issue #3025: [WIP][CARBONDATA-3199]"show datamap" represents prea...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3025 @qiuchenjian currently pre-aggregate datamap is not following the datamap interfaces as it was implemented before datamap framework. That is why not all datamap DDL works with pre-aggregate. It is better to keep the pre-aggregate datamap untouched in this area until we decide whether to take MV datamap or pre-aggregate datamap. ---
[GitHub] carbondata issue #3019: [CARBONDATA-3194] Integrating Carbon with Presto usi...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3019 @chenliang613 if we are planning to contribute to Presto then it would be under presto-hive connector, it is just like how ORC, parquet and other formats supported under this connector. It would be much simpler and lesser code than the current implementation. Now we are overriding HiveConnector to add our carbon format to it, if we change presto-hive connector to add carbon format then it would be adding of read/write factories to it would be enough. CarbonData cannot be another individual connector on its own as it lacks features like metadata , authentication etc which are readily available in presto-hive connector. That is the reason we are overriding HiveConnector. ---
[GitHub] carbondata issue #3021: [CARBONDATA-3193] Cdh5.14.2 spark2.2.0 support
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3021 @chandrasaripaka Please check the PR https://github.com/apache/carbondata/pull/3026 ---
[GitHub] carbondata pull request #3026: [WIP] Added support to compile carbon CDH spa...
GitHub user ravipesala opened a pull request: https://github.com/apache/carbondata/pull/3026 [WIP] Added support to compile carbon CDH spark distribution Please use `spark-2.2-cdh` profile to compile cdh. example: ``` mvn -DskipTests -Pspark-2.2-cdh package ``` Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata cdh-support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3026.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3026 commit 349e901c8f5c8d658859089ab3acaf6377107150 Author: ravipesala Date: 2018-12-26T16:27:23Z Added support to compile carbon CDH spark distribution ---
[GitHub] carbondata issue #3021: [CARBONDATA-3193] Cdh5.14.2 spark2.2.0 support
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3021 @chandrasaripaka , I got the issue, but creating many duplicate files may not be a good idea as it will be difficult to maintain, I will try to do with reflection. And one more question is I don't find the package `spark-hive-thriftserver_2.11` from cloudera, without this package we cannot run the carbon thrift server. Where to find this package? Or Is it ok if we don't run carbon thrift server from cloudera distribution? ---
[GitHub] carbondata issue #3021: [CARBONDATA-3193] Cdh5.14.2 spark2.2.0 support
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3021 I went through this link earlier, but I cannot find spark 2.2 version in this distribution. I can find only `1.6.0-cdh5.14.4` of spark here. ---
[GitHub] carbondata issue #3021: [CARBONDATA-3193] Cdh5.14.2 spark2.2.0 support
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3021 @chandrasaripaka I can't find the spark maven dependency for CDH5.14.2, But I am able to build with CDH spark versions `2.2.0-cdh6.0.1` and `2.2.0.cloudera3`. Only the problem here I found is it does not have `spark-hive-thriftserver` jar in cloudera repo, so classes related it like `CarbonThriftServer` and `CarbonSQLCLIDriver` cannot compile. Apart from I am able to compile carbon with `2.2.0-cdh6.0.1` and `2.2.0.cloudera3` versions. I am not sure why CDH does not include `spark-hive-thriftserver` jar in there repo. Please send the repository for CDH5.14.2, so that I can verify this version also. ---
[GitHub] carbondata issue #2897: [CARBONDATA-3080] Supporting local dictionary enable...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2897 LGTM ---
[GitHub] carbondata pull request #2897: [CARBONDATA-3080] Supporting local dictionary...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2897#discussion_r243810091 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java --- @@ -68,6 +67,10 @@ private AbstractDetailQueryResultIterator iterator; private QueryModel queryModel; + //This holds mapping of fetch index with respect to project col index. + // it is used when same col is used in projection many times.So need to fetch only that col. + private List projectionMapping = new ArrayList<>(); --- End diff -- Better use an array to have fast lookups instead of list ---
[GitHub] carbondata pull request #2897: [CARBONDATA-3080] Supporting local dictionary...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2897#discussion_r243810009 --- Diff: core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java --- @@ -173,6 +174,7 @@ public synchronized void freeMemoryAll(String taskId) { "Freeing offheap working memory of size %d. Current available memory is %d", occuppiedMemory, totalMemory - memoryUsed)); } +ThreadLocalTaskInfo.clearCarbonTaskInfo(); --- End diff -- Better call explicitly from methods where we call `freeMemoryAll, thread clearing should not be part of it. ---
[GitHub] carbondata issue #3004: [CARBONDATA-3188] Create carbon table as hive unders...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/3004 @SteNicholas This PR does not work directly on the Hive integration as we need to set the right serde, inputformat and output format. We are planning to refactor the current Hive integration code to unify the inpuformat and outputformat with the hadoop modules input/outformats. This PR is basic step for both Hive and Presto integration. ---
[GitHub] carbondata pull request #3019: [WIP] Carbon Presto hive metastore
GitHub user ravipesala opened a pull request: https://github.com/apache/carbondata/pull/3019 [WIP] Carbon Presto hive metastore Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata presto-hive-metastore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3019 commit 50f7a330e797bcdd71598fbf3145a71fa5df7215 Author: ravipesala Date: 2018-12-19T16:30:57Z Create carbon table as hive metastore table commit 8621a302ff2f26470b19d31b24a21ff03e4a2831 Author: ravipesala Date: 2018-12-19T15:49:41Z Added Hive Metastore to Carbon Presto ---
[GitHub] carbondata issue #2998: [CARBONDATA-3184]Fix DataLoad Failure with 'using ca...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2998 LGTM ---
[GitHub] carbondata pull request #2998: [CARBONDATA-3184]Fix DataLoad Failure with 'u...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2998#discussion_r243530847 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -331,7 +333,14 @@ object CarbonSource { properties, query) // updating params - val updatedFormat = storageFormat.copy(properties = map) + var updatedFormat: CatalogStorageFormat = null + // For Spark version 2.2 and above, check if catalog table locationUri is empty, then assign + // the value of tablepath to locationUri + if (SparkUtil.isSparkVersionXandAbove("2.2") && tableDesc.storage.locationUri.isEmpty) { --- End diff -- PLease add for 2.1 as well ---
[GitHub] carbondata pull request #2998: [CARBONDATA-3184]Fix DataLoad Failure with 'u...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2998#discussion_r243489099 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala --- @@ -331,7 +333,17 @@ object CarbonSource { properties, query) // updating params - val updatedFormat = storageFormat.copy(properties = map) + var updatedFormat: CatalogStorageFormat = null + // check if catalog table location is empty or if differs from tablepath, then change the + // value of locationUri to tablepath + if (SparkUtil.isSparkVersionXandAbove("2.2") && tableDesc.storage.locationUri.isEmpty) { +updatedFormat = CarbonToSparkAdapater.getUpdatedStorageFormat(storageFormat, map, tablePath) + } else if (SparkUtil.isSparkVersionXandAbove("2.2") && + !tableDesc.storage.locationUri.toString.equalsIgnoreCase(tablePath)) { --- End diff -- Please check this equals check , because `locationUri` may comes with file protocol also like `file:/` or `hdfs:/` , in case of local file the `file:/` may not be present so this equals check fails. ---
[GitHub] carbondata pull request #3004: [WIP] Create carbon table as hive understanda...
GitHub user ravipesala opened a pull request: https://github.com/apache/carbondata/pull/3004 [WIP] Create carbon table as hive understandable metastore table needed by Presto and Hive Problem: Current carbon table created in spark creates the hive table internally but it does not have much information like schema, input/output format and location details. So other execution engines like Presto and Hive cannot read the table. Reason: Spark always checks in HiveSerde static map whether it is a hive supported table or not, since carbon is not registered to that map it cannot create hive understandable table. It justs creates a table without schema and location and adds its own schema as part of properties. Solution: Add the carbon details also to HiveSerde static map so that it can create Hive understandable table. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ravipesala/incubator-carbondata correct-carbon-table-creation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3004.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3004 commit 5218664382353d8ec8f7b3303eec46e1796c15c1 Author: ravipesala Date: 2018-12-19T16:30:57Z Create carbon table as hive metastore table ---
[GitHub] carbondata issue #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA-3164] ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2966 LGTM ---
[GitHub] carbondata issue #2995: [CARBONDATA-3160] Compaction support with MAP data t...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2995 LGTM ---
[GitHub] carbondata issue #2985: [HOTFIX] Fixed Query performance issue
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2985 LGTM ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r242417861 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -959,6 +959,14 @@ public static void setDataTypeConverter(DataTypeConverter converterLocal) { } } + /** + * As each load can have it's own time format. Reset the thread local for each load. + */ + public static void initializeFormatter() { --- End diff -- Better rename to `clearFormatter` ---
[GitHub] carbondata issue #2949: [CARBONDATA-3118] support parallel block pruning for...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2949 LGTM ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241982006 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java --- @@ -94,14 +93,9 @@ public CarbonFile getParentFile() { public boolean renameForce(String changeToName) { FileSystem fs; try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { -((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changeToName), -org.apache.hadoop.fs.Options.Rename.OVERWRITE); -return true; - } else { -return false; - } + fs = fileStatus.getPath().getFileSystem(hadoopConf); + fs.delete(new Path(changeToName), true); + return fs.rename(fileStatus.getPath(), new Path(changeToName)); --- End diff -- You mean force rename is handled in Alluxio 1.7.1 ? which Alluxio version are you using currently? ---
[GitHub] carbondata issue #2161: [CARBONDATA-2218] AlluxioCarbonFile while trying to ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2161 @chandrasaripaka you can put the CarbonFile test with Alluxio Mini Cluster but make sure it does not go beyond a few seconds to finish the test as it impacts the build time ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241981844 --- Diff: core/src/main/java/org/apache/carbondata/core/locks/AlluxioFileLock.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.locks; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * This class is used to handle the S3 File locking. + * This is acheived using the concept of acquiring the data out stream using Append option. + */ +public class AlluxioFileLock extends AbstractCarbonLock { --- End diff -- @chandrasaripaka What I meant was we can use the same HDFSFileLock class or extended class of HDFSFileLock as I don't see much difference in code of `AlluxioFileLock` and `HDFSFileLock` ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241977467 --- Diff: core/src/main/java/org/apache/carbondata/core/locks/AlluxioFileLock.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.locks; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.log4j.Logger; + +/** + * This class is used to handle the S3 File locking. + * This is acheived using the concept of acquiring the data out stream using Append option. + */ +public class AlluxioFileLock extends AbstractCarbonLock { --- End diff -- why not use `HdfsFileLock` directly?, this class looks exactly same as `HdfsFileLock` ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241977202 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java --- @@ -43,7 +44,7 @@ * LOGGER */ private static final Logger LOGGER = - LogServiceFactory.getLogService(FileFactory.class.getName()); + LogServiceFactory.getLogService(FileFactory.class.getName()); --- End diff -- There are lots of unnecessary format changes, please remove those changes and keep only the changes required for this PR ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241977147 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java --- @@ -94,14 +93,9 @@ public CarbonFile getParentFile() { public boolean renameForce(String changeToName) { FileSystem fs; try { - fs = fileStatus.getPath().getFileSystem(FileFactory.getConfiguration()); - if (fs instanceof DistributedFileSystem) { -((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changeToName), -org.apache.hadoop.fs.Options.Rename.OVERWRITE); -return true; - } else { -return false; - } + fs = fileStatus.getPath().getFileSystem(hadoopConf); + fs.delete(new Path(changeToName), true); + return fs.rename(fileStatus.getPath(), new Path(changeToName)); --- End diff -- It's a risky operation if 98 is failed then no way to get the file which is deleted in 97 line. I think we should find a way to do it in single transaction. Any better way @jackylk ? ---
[GitHub] carbondata pull request #2161: [CARBONDATA-2218] AlluxioCarbonFile while try...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2161#discussion_r241976975 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java --- @@ -550,12 +550,10 @@ public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory. if (null != fileStatus && fileStatus.isDirectory()) { --- End diff -- Now the `pathFilter` is not used here? it may impact the callers of this method. ---
[GitHub] carbondata issue #2979: [CARBONDATA-3153] Complex delimiters change
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2979 LGTM ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r241437965 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java --- @@ -501,6 +502,17 @@ public CarbonLoadModel buildLoadModel(Schema carbonSchema) } // for the longstring field, change the datatype from string to varchar this.schema = updateSchemaFields(carbonSchema, longStringColumns); +if (sortColumns != null && sortColumns.length != 0) { + if (options == null || options.get("sort_scope") == null) { +// If sort_columns are specified and sort_scope is not specified, +// change sort scope to local_sort as now by default sort scope is no_sort. +if (options == null) { + options = new HashMap<>(); +} +//TODO: add in carbon property instead of load options --- End diff -- It should be tableproperty not carbon property ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r241437223 --- Diff: integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala --- @@ -523,7 +523,8 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl } } - test("describe formatted for default sort_columns pre and post alter") { + // after changing default sort_scope to no_sort, all dimensions are not selected for sorting. + ignore("describe formatted for default sort_columns pre and post alter") { --- End diff -- Instead of ignoring update the testcase by specifying the sort_columns while creating the table ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r241436353 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -848,6 +848,19 @@ class TableNewProcessor(cm: TableModel) { tableSchema.getTableId, cm.databaseNameOp.getOrElse("default")) tablePropertiesMap.put("bad_record_path", badRecordsPath) +if (tablePropertiesMap.get("sort_columns") != null) { + val sortCol = tablePropertiesMap.get("sort_columns") + if ((!sortCol.trim.isEmpty) && tablePropertiesMap.get("sort_scope") == null) { +// If sort_scope is not specified, but sort_columns are present, set sort_scope as +// local_sort in carbon_properties (cannot add in table properties as if user sets carbon +// properties it won't be reflected as table properties is given higher priority) +if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) == +null) { + CarbonProperties.getInstance() +.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT") --- End diff -- I don't think we can add sort_scope directly to CarbonProperties, it changes for the complete system. If sortColumns are present then default table sort_scope should become local_sort not to complete system ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r241433863 --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java --- @@ -61,7 +61,35 @@ } }; - private static final ThreadLocal dateformatter = new ThreadLocal() { + private static ThreadLocal timeStampformatterString = new ThreadLocal() { +@Override protected String initialValue() { + return CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT); +} + }; + + private static void updateTimeStamp() { --- End diff -- Can you describe why these changes related to this PR? ---
[GitHub] carbondata pull request #2966: [CARBONDATA-3162][CARBONDATA-3163][CARBONDATA...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2966#discussion_r241432600 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java --- @@ -2265,7 +2265,8 @@ public static int compareValues(byte[] filterValue, byte[] minMaxBytes, defaultValue = FilterUtil .getMaskKey(key, currentBlockDimension, segmentProperties.getSortColumnsGenerator()); } else { - defaultValue = ByteUtil.toXorBytes(key); + defaultValue = FilterUtil + .getMaskKey(key, currentBlockDimension, segmentProperties.getDimensionKeyGenerator()); --- End diff -- `if` and `else` has same code. remove if condition ---
[GitHub] carbondata issue #2979: [CARBONDATA-3153] Complex delimiters change
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2979 retest this please ---
[GitHub] carbondata issue #2979: [CARBONDATA-3153] Complex delimiters change
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2979 LGTM ---
[GitHub] carbondata issue #2976: [CARBONDATA-2755][Complex DataType Enhancements] Com...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2976 retest this please ---
[GitHub] carbondata issue #2982: [CARBONDATA-3158] support presto-carbon to read sdk ...
Github user ravipesala commented on the issue: https://github.com/apache/carbondata/pull/2982 LGTM , just a minor comment. ---