[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 closed the pull request at: https://github.com/apache/carbondata/pull/2366 ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r200321640 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala --- @@ -163,10 +164,13 @@ class StreamHandoffRDD[K, V]( val model = format.createQueryModel(inputSplit, attemptContext) val inputFormat = new CarbonStreamInputFormat val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) - .asInstanceOf[CarbonStreamRecordReader] -streamReader.setVectorReader(false) -streamReader.setQueryModel(model) -streamReader.setUseRawRow(true) + .asInstanceOf[RecordReader[Void, Any]] --- End diff -- Add these options to CreateRecordReader or to CarbonStreamInputFormat ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r198717072 --- Diff: integration/spark2/pom.xml --- @@ -243,6 +238,18 @@ 2.11 2.11.8 + --- End diff -- Don't refactor unnecessary ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r198715828 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.IOException; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Stream input format + */ +public class CarbonStreamInputFormat extends FileInputFormat { --- End diff -- Don't move this class to spark-common , let it be in streaming only ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r197680925 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.CarbonReflectionUtils + +class CarbonAnalyzer(catalog: SessionCatalog, --- End diff -- In 2.1 CarbonAnalyzer class is part of CarbonSessionState.scala and the code is different from 2.2. Now the 2.2 and 2.3 code is same and as per design it is required to copy to 2.3 folder also. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r197435782 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonAnalyzer.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.CarbonReflectionUtils + +class CarbonAnalyzer(catalog: SessionCatalog, --- End diff -- Move out this class from here and do for remaining versions also. It is same for all versions. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r197435289 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.commons.lang3.StringUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.Utils + +/** Physical plan node for scanning data from a batched relation. */ +case class BatchedDataSourceScanExec( --- End diff -- This code should not duplicated. I have raised PR https://github.com/apache/carbondata/pull/2400 to remove dependency. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196715002 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -71,15 +72,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.spark.memory.MemoryMode; +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; --- End diff -- Please find the task jira id https://issues.apache.org/jira/browse/CARBONDATA-2619 ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655625 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) { do { try { LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcEnvConfig( + val config = RpcUtil.getRpcEnvConfig( --- End diff -- After analyzing the #2372 these changes are not required,so reverted ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655227 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + + def getRowDataSourceScanExecObj(relation: LogicalRelation, --- End diff -- fixed ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655176 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + --- End diff -- Fixed ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655245 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.util.CarbonProperties -/** - * Carbon strategies for ddl commands - */ + /** Carbon strategies for ddl commands --- End diff -- fixed ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655128 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) { namedExpressionList += -Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId, +CarbonCompilerUtil.createAliasRef(expressions.head, + name + "_ sum", + NamedExpression.newExprId, alias.qualifier, Some(alias.metadata), - alias.isGenerated) + Some(alias)) validExpressionsMap += AggExpToColumnMappingModel(sumExp) } // check with same expression already count is present then do not add to // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) { namedExpressionList += -Alias(expressions.last, name + "_ count")(NamedExpression.newExprId, - alias.qualifier, - Some(alias.metadata), - alias.isGenerated) + CarbonCompilerUtil.createAliasRef(expressions.last, name + "_ count", --- End diff -- Fixed,Changed the name from CarbonCompilerUtil to CarbonToSparkAdapater ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196655020 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala --- @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS ") } catch { case e: Exception => -assert(e.getMessage.contains("no viable alternative at input")) +assert(true) --- End diff -- Fixed,added or condition with message as per spark 2.3.0 ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196654906 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -140,6 +142,13 @@ object CarbonReflectionUtils { relation, expectedOutputAttributes, catalogTable)._1.asInstanceOf[LogicalRelation] +} else if (SPARK_VERSION.startsWith("2.3")) { --- End diff -- Fixed,added the Utility method for spark version comparison in SparkUtil.scala ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196654926 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -355,18 +362,19 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } private def getDataSourceScan(relation: LogicalRelation, - output: Seq[Attribute], - partitions: Seq[PartitionSpec], - scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], -ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow], - candidatePredicates: Seq[Expression], - pushedFilters: Seq[Filter], - metadata: Map[String, String], - needDecoder: ArrayBuffer[AttributeReference], - updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = { +output: Seq[Attribute], --- End diff -- fixed ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196654954 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala --- @@ -149,8 +149,9 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { } test("test sum*10 aggregation on big decimal column with high precision") { -checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"), - sql("select sum(salary)*10 from hiveBigDecimal")) +val carbonSeq = sql("select sum(salary)*10 from carbonBigDecimal_2").collect --- End diff -- fixed ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sandeep-katta commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196654884 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -65,7 +66,7 @@ object CarbonReflectionUtils { className, tableIdentifier, tableAlias)._1.asInstanceOf[UnresolvedRelation] -} else if (SPARK_VERSION.startsWith("2.2")) { +} else if (SPARK_VERSION.startsWith("2.2") || SPARK_VERSION.startsWith("2.3")) { --- End diff -- Fixed,added the Utility method for spark version comparison in SparkUtil.scala ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196650804 --- Diff: integration/spark-common/pom.xml --- @@ -65,6 +65,11 @@ scalatest_${scala.binary.version} provided + + org.apache.zookeeper --- End diff -- Not intentional change i guess :) ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196314387 --- Diff: store/search/src/main/scala/org/apache/spark/rpc/Master.scala --- @@ -81,7 +81,7 @@ class Master(sparkConf: SparkConf) { do { try { LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcEnvConfig( + val config = RpcUtil.getRpcEnvConfig( --- End diff -- RPC is refactored in #2372, this should be rebased after #2372 is merged ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196314106 --- Diff: integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonHiveSessionCatalog( +externalCatalog: HiveExternalCatalog, +globalTempViewManager: GlobalTempViewManager, +functionRegistry: FunctionRegistry, +sparkSession: SparkSession, +conf: SQLConf, +hadoopConf: Configuration, +parser: ParserInterface, +functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog ( +externalCatalog, +globalTempViewManager, +new HiveMetastoreCatalog(sparkSession), +functionRegistry, +conf, +hadoopConf, +parser, +functionResourceLoader + ) with CarbonSessionCatalog { + + private lazy val carbonEnv = { +val env = new CarbonEnv +env.init(sparkSession) +env + } + /** + * return's the carbonEnv instance + * @return + */ + override def getCarbonEnv() : CarbonEnv = { +carbonEnv + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { +val rtnRelation = super.lookupRelation(name) +val isRelationRefreshed = + CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession) +if (isRelationRefreshed) { + super.lookupRelation(name) +} else { + rtnRelation +} + } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + override def getClient(): org.apache.spark.sql.hive.client.HiveClient = { +sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + } + + def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit = { +getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + + s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") +
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196313373 --- Diff: integration/spark2/src/main/spark2.2/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonVectorProxy implements CarbonSparkVectorReader { + +private ColumnVector columnVector; +private ColumnarBatch columnarBatch; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNumrows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ +public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { +columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); +} + +public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { +columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); +} + +/** + * Sets the number of rows in this batch. + */ +public void setNumRows(int numRows) { +columnarBatch.setNumRows(numRows); +} + +/** + * Returns the number of rows for read, including filtered rows. + */ +public int numRows() { +return columnarBatch.capacity(); +} + +/** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ +public void close() { +columnarBatch.close(); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public InternalRow getRow(int rowId) { +return columnarBatch.getRow(rowId); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public Object getColumnarBatch() { +return columnarBatch; +} + +/** + * Resets this column for writing. The currently stored values are no longer accessible. + */ +public void reset() { +columnarBatch.reset(); +} + + +public void putRowToColumnBatch(int rowId, Object value, int offset) { +this.columnVector = columnarBatch.column(offset); +org.apache.spark.sql.types.DataType t = columnVector.dataType(); +if (null == value) { +columnVector.putNull(rowId); +} else { +if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { +columnVector.putBoolean(rowId, (boolean) value); +}
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196313120 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonVectorProxy implements CarbonSparkVectorReader { + +private ColumnVector columnVector; +private ColumnarBatch columnarBatch; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNumrows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ +public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { +columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); +} + +public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { +columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); +} + +/** + * Sets the number of rows in this batch. + */ +public void setNumRows(int numRows) { +columnarBatch.setNumRows(numRows); +} + +/** + * Returns the number of rows for read, including filtered rows. + */ +public int numRows() { +return columnarBatch.capacity(); +} + +/** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ +public void close() { +columnarBatch.close(); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public InternalRow getRow(int rowId) { +return columnarBatch.getRow(rowId); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public Object getColumnarBatch() { +return columnarBatch; +} + +/** + * Resets this column for writing. The currently stored values are no longer accessible. + */ +public void reset() { +columnarBatch.reset(); +} + + +public void putRowToColumnBatch(int rowId, Object value, int offset) { +this.columnVector = columnarBatch.column(offset); +org.apache.spark.sql.types.DataType t = columnVector.dataType(); +if (null == value) { +columnVector.putNull(rowId); +} else { +if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { +columnVector.putBoolean(rowId, (boolean) value); +}
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196313170 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonVectorProxy implements CarbonSparkVectorReader { + +private ColumnVector columnVector; +private ColumnarBatch columnarBatch; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNumrows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ +public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { +columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); +} + +public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { +columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); +} + +/** + * Sets the number of rows in this batch. + */ +public void setNumRows(int numRows) { +columnarBatch.setNumRows(numRows); +} + +/** + * Returns the number of rows for read, including filtered rows. + */ +public int numRows() { +return columnarBatch.capacity(); +} + +/** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ +public void close() { +columnarBatch.close(); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public InternalRow getRow(int rowId) { +return columnarBatch.getRow(rowId); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public Object getColumnarBatch() { +return columnarBatch; +} + +/** + * Resets this column for writing. The currently stored values are no longer accessible. + */ +public void reset() { +columnarBatch.reset(); +} + + +public void putRowToColumnBatch(int rowId, Object value, int offset) { +this.columnVector = columnarBatch.column(offset); +org.apache.spark.sql.types.DataType t = columnVector.dataType(); +if (null == value) { +columnVector.putNull(rowId); +} else { +if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { +columnVector.putBoolean(rowId, (boolean) value); +}
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196313064 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonVectorProxy implements CarbonSparkVectorReader { + +private ColumnVector columnVector; +private ColumnarBatch columnarBatch; + +/** + * Adapter class which handles the columnar vector reading of the carbondata + * based on the spark ColumnVector and ColumnarBatch API. This proxy class + * handles the complexity of spark 2.3 version related api changes since + * spark ColumnVector and ColumnarBatch interfaces are still evolving. + * + * @param memMode which represent the type onheap or offheap vector. + * @param rowNumrows number for vector reading + * @param structFileds, metadata related to current schema of table. + */ +public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) { +columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum); +} + +public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) { +columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum); +} + +/** + * Sets the number of rows in this batch. + */ +public void setNumRows(int numRows) { +columnarBatch.setNumRows(numRows); +} + +/** + * Returns the number of rows for read, including filtered rows. + */ +public int numRows() { +return columnarBatch.capacity(); +} + +/** + * Called to close all the columns in this batch. It is not valid to access the data after + * calling this. This must be called at the end to clean up memory allocations. + */ +public void close() { +columnarBatch.close(); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public InternalRow getRow(int rowId) { +return columnarBatch.getRow(rowId); +} + +/** + * Returns the row in this batch at `rowId`. Returned row is reused across calls. + */ +public Object getColumnarBatch() { +return columnarBatch; +} + +/** + * Resets this column for writing. The currently stored values are no longer accessible. + */ +public void reset() { +columnarBatch.reset(); +} + + --- End diff -- remove extra empty line ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310953 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonVectorProxy.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import java.math.BigInteger; + +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +public class CarbonVectorProxy implements CarbonSparkVectorReader { --- End diff -- Add comment for this class and add annotation @InterfaceAudience.Internal ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310806 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -38,9 +39,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.util.CarbonProperties -/** - * Carbon strategies for ddl commands - */ + /** Carbon strategies for ddl commands --- End diff -- move to next line, like ``` /** *. Carbon ... */ ``` ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310531 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + + def getRowDataSourceScanExecObj(relation: LogicalRelation, --- End diff -- please make the indentation like: ``` def getRowDataSourceScanExecObj( relation: LogicalRelation, output: Seq[Attribute], pushedFilters: Seq[Filter]): RowDataSourceScanExec = { ``` ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310344 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -247,6 +252,32 @@ object CarbonReflectionUtils { isFormatted } + --- End diff -- remove empty line ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310355 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -71,15 +72,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.spark.memory.MemoryMode; +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; --- End diff -- Handled ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310194 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -127,7 +127,7 @@ class CarbonAppendableStreamSink( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = fileLogPath, -isAppend = false) +false) --- End diff -- no need to modify this ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196310131 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -418,36 +412,47 @@ private boolean isScanRequired(BlockletHeader header) { } private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { +Constructor cons = null; // if filter is null and output projection is empty, use the row number of blocklet header -if (skipScanData) { - int rowNums = header.getBlocklet_info().getNum_rows(); - columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums); - columnarBatch.setNumRows(rowNums); - input.skipBlockletData(true); - return rowNums > 0; -} - -input.readBlockletData(header); -columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums()); int rowNum = 0; -if (null == filter) { - while (input.hasNext()) { -readRowFromStream(); -putRowToColumnBatch(rowNum++); +try { + String vectorReaderClassName = "org.apache.spark.sql.CarbonVectorProxy"; --- End diff -- Since you are using `CarbonVectorProxy`, can you remove the spark dependency in this stream module? ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196309854 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -115,7 +109,7 @@ // vectorized reader private StructType outputSchema; - private ColumnarBatch columnarBatch; + private CarbonSparkVectorReader vectorProxy; --- End diff -- Mainly because we cannot have any common api's as columnvector and columnarbatch package itself is changed, so we will not be able to extract it. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196309518 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -115,7 +109,7 @@ // vectorized reader private StructType outputSchema; - private ColumnarBatch columnarBatch; + private CarbonSparkVectorReader vectorProxy; --- End diff -- I will remove this interface, as we are moving CarbonstreamRecordReader to spark2 this interface will not be required ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196307942 --- Diff: integration/spark-common/pom.xml --- @@ -65,6 +65,11 @@ scalatest_${scala.binary.version} provided + + org.apache.zookeeper --- End diff -- Why this is introduced? ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r196307852 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala --- @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS ") } catch { case e: Exception => -assert(e.getMessage.contains("no viable alternative at input")) +assert(true) --- End diff -- Use `intercept` to intercept and assert the exception message ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195980220 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -71,15 +72,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.spark.memory.MemoryMode; +import org.apache.carbondata.spark.vectorreader.CarbonSparkVectorReader; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; --- End diff -- Add another sub-issue and PR for CarbonStreaming spark and hadoop dependency refactoring. 1) Move CarbonStreamRecordReader.java, Spark2 and CarbonStreamInputFormat.java to Carbon-hadoop and use CarbonStreamRecordReader using reflection. 2) Take out dependency of Spark on Carbon-streaming. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195976603 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -126,9 +126,9 @@ object CarbonReflectionUtils { } def getLogicalRelation(relation: BaseRelation, - expectedOutputAttributes: Seq[Attribute], - catalogTable: Option[CatalogTable], - isStreaming:Boolean): LogicalRelation = { + expectedOutputAttributes: Seq[Attribute], --- End diff -- Wrong indentation use carbon/dev/intellijFormatter ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195975666 --- Diff: pom.xml --- @@ -582,6 +582,59 @@ + --- End diff -- Check if any method to make profile common for 2.2 and 2.3 and pass only specific details though override or parameters. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195972758 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -1787,20 +1839,23 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) { namedExpressionList += -Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId, +CarbonCompilerUtil.createAliasRef(expressions.head, + name + "_ sum", + NamedExpression.newExprId, alias.qualifier, Some(alias.metadata), - alias.isGenerated) + Some(alias)) validExpressionsMap += AggExpToColumnMappingModel(sumExp) } // check with same expression already count is present then do not add to // named expression list otherwise update the list and add it to set if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) { namedExpressionList += -Alias(expressions.last, name + "_ count")(NamedExpression.newExprId, - alias.qualifier, - Some(alias.metadata), - alias.isGenerated) + CarbonCompilerUtil.createAliasRef(expressions.last, name + "_ count", --- End diff -- Change CarbonCompilerUtil name as it gives meaning of carbon compiler ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195971582 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala --- @@ -87,7 +87,7 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS ") } catch { case e: Exception => -assert(e.getMessage.contains("no viable alternative at input")) +assert(true) --- End diff -- Add or condition with message ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195971259 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala --- @@ -149,8 +149,9 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { } test("test sum*10 aggregation on big decimal column with high precision") { -checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"), - sql("select sum(salary)*10 from hiveBigDecimal")) +val carbonSeq = sql("select sum(salary)*10 from carbonBigDecimal_2").collect --- End diff -- better to change testcase as "select cast(sum(salary)*10 as double) from carbonBigDecimal_2" ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195970266 --- Diff: examples/spark2/pom.xml --- @@ -204,5 +204,35 @@ true + + spark-2.3 + +2.3.0 +2.11 +2.11.8 + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + + + org.apache.carbondata + carbondata-core + ${project.version} + +
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195969282 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -355,18 +362,19 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } private def getDataSourceScan(relation: LogicalRelation, - output: Seq[Attribute], - partitions: Seq[PartitionSpec], - scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], -ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow], - candidatePredicates: Seq[Expression], - pushedFilters: Seq[Filter], - metadata: Map[String, String], - needDecoder: ArrayBuffer[AttributeReference], - updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = { +output: Seq[Attribute], --- End diff -- Keep previous indentation ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195968598 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -140,6 +142,13 @@ object CarbonReflectionUtils { relation, expectedOutputAttributes, catalogTable)._1.asInstanceOf[LogicalRelation] +} else if (SPARK_VERSION.startsWith("2.3")) { --- End diff -- Add a function SPARK_VERSION.above, so that SPARK_VERSION.above(2.3), so that we need not change code every place for every new version ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195968507 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala --- @@ -65,7 +66,7 @@ object CarbonReflectionUtils { className, tableIdentifier, tableAlias)._1.asInstanceOf[UnresolvedRelation] -} else if (SPARK_VERSION.startsWith("2.2")) { +} else if (SPARK_VERSION.startsWith("2.2") || SPARK_VERSION.startsWith("2.3")) { --- End diff -- Add a function SPARK_VERSION.above(2.2), so that we need not change code every place for every new version ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2366#discussion_r195968291 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java --- @@ -115,7 +109,7 @@ // vectorized reader private StructType outputSchema; - private ColumnarBatch columnarBatch; + private CarbonSparkVectorReader vectorProxy; --- End diff -- CarbonSparkVectorReader interface not required. create a abstract class keeping common code. ---
[GitHub] carbondata pull request #2366: [CARBONDATA-2532][Integration] Carbon to supp...
GitHub user sujith71955 opened a pull request: https://github.com/apache/carbondata/pull/2366 [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version ``` ## What changes were proposed in this pull request? In this PR inorder to hide the compatibility issues of columnar vector API's from the existing common classes, i introduced an interface of the proxy vector readers, this proxy vector readers will take care the compatibility issues with respect to spark different versions. Column vector and Columnar Batch interface compatibility issues has been addressed in this PR, The changes were related to below modifications done in spark interface. Highlights: a) This is a refactoring of ColumnVector hierarchy and related classes. b) make ColumnVector read-only c) introduce WritableColumnVector with write interface d) remove ReadOnlyColumnVector e) Fixed spark-carbon integration API compatibility issues - By Sandi f) Corrected the testcases based on spark 2.3.0 behaviour change g) Excluded following dependency from pom.xml files net.jpountzlz4 as spark 2.3.0 changed it to org.lz4, so removed from the test class path of spark2,spark-common-test,spark2-examples ## How was this patch tested? Manual testing, and existing test-case execution ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/sujith71955/incubator-carbondata spark-2.3_carbon_spark_2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2366.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2366 commit 9d68b4270c46b99d8d7985069ce633a60c04ba87 Author: sujith71955 Date: 2018-05-24T05:51:50Z [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version ## What changes were proposed in this pull request? Column vector and Columnar Batch interface compatibility issues has been addressed in this PR, The changes were related to below modifications done in spark interface a) This is a refactoring of ColumnVector hierarchy and related classes. b) make ColumnVector read-only c) introduce WritableColumnVector with write interface d) remove ReadOnlyColumnVector In this PR inorder to hide the compatibility issues of columnar vector API's from the existing common classes, i introduced an interface of the proxy vector readers, this proxy vector readers will take care the compatibility issues with respect to spark different versions. ## How was this patch tested? Manual testing, and existing test-case execution [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version ## What changes were proposed in this pull request? Column vector and Columnar Batch interface compatibility issues has been addressed in this PR, The changes were related to below modifications done in spark interface a) This is a refactoring of ColumnVector hierarchy and related classes. b) make ColumnVector read-only c) introduce WritableColumnVector with write interface d) remove ReadOnlyColumnVector In this PR inorder to hide the compatibility issues of columnar vector API's from the existing common classes, i introduced an interface of the proxy vector readers, this proxy vector readers will take care the compatibility issues with respect to spark different versions. ## How was this patch tested? Manual testing, and existing test-case execution commit 8e62ac800c39ee308466f70d574ce1892dde9436 Author: sujith71955 Date: 2018-06-08T07:27:39Z ``` ## What changes were proposed in this pull request? In this PR inorder to hide the compatibility issues of columnar vector API's from the existing common classes, i introduced an interface of the proxy vector readers, this proxy vector readers will take care the compatibility issues with respect to spark different versions. Column vector and Columnar Batch interface compatibility issues has been addressed in this PR, The changes were related to below modifications done in spark interface. Highlights: a) This is a refactoring of ColumnVector hierarchy and related classes. b) make ColumnVector read-only c) introduce WritableColumnVector with write interface d) remove ReadOnlyColumnVector e) Fixed spark-carbon integration API compatibility issues - By Sandi f) Corrected the testcases based on spark 2.3.0 behaviour change g) Excluded following dependency from pom.xml files net.jpountzlz4 as spark 2.3.0 changed it to org.lz4, so removed from the test class path of spark2,spark-common-test,spark2-examples ## How was this patch tested? Manual testing, and existing test-case execution