[GitHub] [carbondata] jackylk commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo
jackylk commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#issuecomment-586140082 > validateAllAggregateTablePresent removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-586139194 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1984/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3619: [HOTFIX] Remove unused fields in TableInfo
jackylk commented on a change in pull request #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#discussion_r379289159 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ## @@ -110,28 +97,6 @@ case class CarbonDropTableCommand( CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession) - if (carbonTable.hasDataMapSchema) { -// drop all child tables - val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList - -childDropCommands = childSchemas.asScala - .filter(_.getRelationIdentifier != null) - .map { childSchema => -val childTable = - CarbonEnv.getCarbonTable( -TableIdentifier(childSchema.getRelationIdentifier.getTableName, - Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession) -val dropCommand = CarbonDropTableCommand( - ifExistsSet = true, - Some(childSchema.getRelationIdentifier.getDatabaseName), - childSchema.getRelationIdentifier.getTableName, - dropChildTable = true -) -dropCommand.carbonTable = childTable -dropCommand - } -childDropCommands.foreach(_.processMetadata(sparkSession)) - } val indexDatamapSchemas = Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379278711 ## File path: datamap/mv/core/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala ## @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, _} + Review comment: Before this change, it is not compliant to code style This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379278689 ## File path: datamap/mv/core/src/test/scala/org/apache/carbondata/mv/plans/IsSPJGHSuite.scala ## @@ -20,6 +20,7 @@ package org.apache.carbondata.mv.plans import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation + Review comment: Before this change, it is not compliant to code style This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379278332 ## File path: datamap/mv/core/src/main/spark2.3/org/apache/carbondata/mv/extension/MVRules.scala ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.hive.CarbonMVRules + +class MVRules( Review comment: They are not the same :( Function `batches` is changed to `defaultBatches` in spark 2.4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3619: [HOTFIX] Remove unused fields in TableInfo
akashrn5 commented on a change in pull request #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#discussion_r379276591 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ## @@ -110,28 +97,6 @@ case class CarbonDropTableCommand( CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession) - if (carbonTable.hasDataMapSchema) { -// drop all child tables - val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList - -childDropCommands = childSchemas.asScala - .filter(_.getRelationIdentifier != null) - .map { childSchema => -val childTable = - CarbonEnv.getCarbonTable( -TableIdentifier(childSchema.getRelationIdentifier.getTableName, - Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession) -val dropCommand = CarbonDropTableCommand( - ifExistsSet = true, - Some(childSchema.getRelationIdentifier.getDatabaseName), - childSchema.getRelationIdentifier.getTableName, - dropChildTable = true -) -dropCommand.carbonTable = childTable -dropCommand - } -childDropCommands.foreach(_.processMetadata(sparkSession)) - } val indexDatamapSchemas = Review comment: can you please rename this to just `datamapSchemas` because it will have MV schemas also, so indexDataMapaSchemas will give wrong meaning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo
akashrn5 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#issuecomment-586126436 please help to remove parentRelationIdentifiers from TableInfoTest.java , validateAllAggregateTablePresent and registerAggregates from RefreshCarbonTableCommand class also This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r379273541 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r379273541 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379272983 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.{SqlLexical, TableIdentifier} +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.extension.command.{CreateMaterializedViewCommand, DropMaterializedViewCommand, RebuildMaterializedViewCommand, ShowMaterializedViewCommand} +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
jackylk commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379272959 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.{SqlLexical, TableIdentifier} +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.extension.command.{CreateMaterializedViewCommand, DropMaterializedViewCommand, RebuildMaterializedViewCommand, ShowMaterializedViewCommand} +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) + } +} + } + + private lazy val start: Parser[LogicalPlan] = mvCommand + + private lazy val mvCommand: Parser[LogicalPlan] = +createMV | dropMV | showMV | rebuildMV Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Inf
[GitHub] [carbondata] jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r379272244 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-586117728 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/281/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379267683 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data.
kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data. URL: https://github.com/apache/carbondata/pull/3611#discussion_r379267345 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala ## @@ -272,6 +272,79 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with } } + test("test current none compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") +createTable() +loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") +loadData() +checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) Review comment: Can you change select count(*) to select * so that actual data is validated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data.
kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data. URL: https://github.com/apache/carbondata/pull/3611#discussion_r379267345 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala ## @@ -272,6 +272,79 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with } } + test("test current none compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") +createTable() +loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") +loadData() +checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) Review comment: Can you change select count(*) to select * so that actual data is validated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data.
kunal642 commented on a change in pull request #3611: [CARBONDATA-3692] Support NoneCompression during loading data. URL: https://github.com/apache/carbondata/pull/3611#discussion_r379266803 ## File path: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala ## @@ -272,6 +272,79 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with } } + test("test current none compressor on legacy store with snappy") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "snappy") +createTable() +loadData() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, "none") +loadData() +checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(16))) Review comment: Can you change select count(*) to select * so that actual data is validated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379266709 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); -} - } - - public void startMergingIfPossible() { -File[] fileList; -if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { -fileList = procFiles.toArray(new File[procFiles.size()]); -this.procFiles = new ArrayList(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { +File[] fileList = procFiles.toArray(new File[procFiles.size()]); +this.procFiles = new ArrayList<>(); +startIntermediateMerging(fileList); Review comment: that will cause ClassCastException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
Indhumathi27 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379264939 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ## @@ -290,4 +301,71 @@ public boolean isTimeSeries() { public void setTimeSeries(boolean timeSeries) { isTimeSeries = timeSeries; } + + public boolean supportIncrementalBuild() { +String prop = getProperties().get(DataMapProperty.FULL_REFRESH); +return prop == null || prop.equalsIgnoreCase("false"); + } + + public String getPropertiesAsString() { +String[] properties = getProperties().entrySet().stream() +// ignore internal used property +.filter(p -> +!p.getKey().equalsIgnoreCase(DataMapProperty.DEFERRED_REBUILD) && +!p.getKey().equalsIgnoreCase(DataMapProperty.CHILD_SELECT_QUERY) && Review comment: `DataMapProperty.CHILD_SELECT_QUERY` && `DataMapProperty.QUERY_TYPE` was used for preaggregate. Please remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r379262676 ## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { +val data = new Array[AnyRef](outputArrayLength) +var i = 0 +val fieldTypesLen = fieldTypes.length +while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { +fieldTypes(i) match { + case StringType => +data(i) = row.getString(i) + case d: DecimalType => +data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => Review comment: No. cannot send to 'other' branch. **other branch will give utf8 string, we need java string object here.** added comment now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
Indhumathi27 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r379252982 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/command/ShowMaterializedViewCommand.scala ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension.command + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.types.{BooleanType, StringType} + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.mv.extension.MVDataMapProvider + +/** + * Show Materialized View Command implementation + * + */ +case class ShowMaterializedViewCommand(tableIdentifier: Option[TableIdentifier]) + extends DataCommand { + + override def output: Seq[Attribute] = { +Seq( + AttributeReference("Name", StringType, nullable = false)(), + AttributeReference("Associated Table", StringType, nullable = false)(), + AttributeReference("Refresh", StringType, nullable = false)(), + AttributeReference("Incremental", BooleanType, nullable = false)(), + AttributeReference("Properties", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)(), + AttributeReference("Sync Info", StringType, nullable = false)()) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { +convertToRow(getAllMVSchema(sparkSession)) + } + + /** + * get all datamaps for this table, including preagg, index datamaps and mv Review comment: please change the description only for mv This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
Indhumathi27 commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r379260606 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378968659 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala ## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, Literal, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average} +import org.apache.spark.sql.catalyst.plans.logical.{Join, Limit, LogicalPlan} +import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType} +import org.apache.spark.util.{DataMapUtil, PartitionUtils} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan} +import org.apache.carbondata.mv.plans.util.SQLBuilder +import org.apache.carbondata.mv.rewrite.{SummaryDatasetCatalog, Utils} +import org.apache.carbondata.mv.timeseries.{TimeSeriesFunction, TimeSeriesUtil} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Utility for MV datamap operations. + */ +object MVHelper { + + def createMVDataMap( + sparkSession: SparkSession, + dataMapSchema: DataMapSchema, + queryString: String, + ifNotExistsSet: Boolean = false): Unit = { +val dmProperties = dataMapSchema.getProperties.asScala +if (dmProperties.contains("streaming") && dmProperties("streaming").equalsIgnoreCase("true")) { + throw new MalformedCarbonCommandException( +s"Materialized view does not support streaming" + ) +} +val mvUtil = new MVUtil +mvUtil.validateDMProperty(dmProperties) +val logicalPlan = Utils.dropDummyFunc( + MVParser.getMVPlan(queryString, sparkSession)) +// if there is limit in MV ctas query string, throw exception, as its not a valid usecase +logicalPlan match { + case Limit(_, _) => +throw new MalformedCarbonCommandException("Materialized view does not support the query " + + "with limit") + case _ => +} +val selectTables = getTables(logicalPlan) +if (selectTables.isEmpty) { + throw new MalformedCarbonCommandException( +s"Non-Carbon table does not support creating MV datamap") +} +val modularPlan = validateMVQuery(sparkSession, logicalPlan) +val updatedQueryWithDb = modularPlan.asCompactSQL +val (timeSeriesColumn, granularity): (String, String) = validateMVTimeSeriesQuery( + logicalPlan, + dataMapSchema) +val fullRebuild = isFullReload(logicalPlan) +var counter = 0 +// the ctas query can have duplicate columns, so we should take distinct and create fields, +// so that it won't fail during create mv table +val fields = logicalPlan.output.map { attr => +
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378976973 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/command/RebuildMaterializedViewCommand.scala ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension.command + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand + +import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _} + +/** + * Refresh Materialized View Command implementation + * This command refresh the MV table incrementally and make it synchronized with the main + * table. After sync, MV state is changed to enabled. + */ +case class RebuildMaterializedViewCommand( +mvName: String) extends DataCommand { + + override def processData(sparkSession: SparkSession): Seq[Row] = { +import scala.collection.JavaConverters._ +val schemas = DataMapStoreManager.getInstance().getAllDataMapSchemas +val schemaOption = schemas.asScala.find(p => p.getDataMapName.equalsIgnoreCase(mvName)) +if (schemaOption.isEmpty) { +throw new MalformedMaterializedViewException(s"Materialized view $mvName does not exist") +} +val schema = schemaOption.get Review comment: rename to `datamapSchema` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378946277 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ## @@ -290,4 +301,71 @@ public boolean isTimeSeries() { public void setTimeSeries(boolean timeSeries) { isTimeSeries = timeSeries; } + + public boolean supportIncrementalBuild() { Review comment: instead of `supportIncrementalBuild` how about `needIncrementalBuild` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378981536 ## File path: datamap/mv/core/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala ## @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, _} + Review comment: same as above for imports This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378942524 ## File path: common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedMaterializedViewException.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.exceptions.sql; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +/** + * This exception will be thrown when MV related SQL statement is invalid + */ +@InterfaceAudience.User +@InterfaceStability.Stable +public class MalformedMaterializedViewException extends MalformedCarbonCommandException { + /** Review comment: please give one line space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378965526 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVExtension.scala ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, SQLConf} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.rewrite.MVUdf +import org.apache.carbondata.mv.timeseries.TimeSeriesFunction + +// Materialized View extension for Apache Spark +// +// Following SQL command are added: +// 1. CREATE MATERIALIZED VIEW +// 2. DROP MATERIALIZED VIEW +// 3. SHOW MATERIALIZED VIEW +// 4. REFRESH MATERIALIZED VIEW +// +// Following optimizer rules are added: +// 1. Rewrite SQL statement by matching existing MV and +// select the lowest cost MV +// +class MVExtension extends (SparkSessionExtensions => Unit) { + + override def apply(extensions: SparkSessionExtensions): Unit = { +// MV parser +extensions.injectParser( + (sparkSession: SparkSession, parser: ParserInterface) => +new MVExtensionSqlParser(new SQLConf, sparkSession, parser)) + +// MV optimizer rules +extensions.injectPostHocResolutionRule( + (session: SparkSession) => OptimizerRule(session) ) + } +} + +case class OptimizerRule(session: SparkSession) extends Rule[LogicalPlan] { + self => + + var initialized = false + + override def apply(plan: LogicalPlan): LogicalPlan = { +if (!initialized) { + self.synchronized { +if (!initialized) { + initialized = true + + addMVUdf(session) + + val sessionState = session.sessionState + val field = sessionState.getClass.getDeclaredField("optimizer") + field.setAccessible(true) + field.set(sessionState, +new MVRules(session, sessionState.catalog, sessionState.optimizer)) +} + } +} +plan + } + + private def addMVUdf(sparkSession: SparkSession) = { +// added for handling MV table creation. when user will fire create ddl for Review comment: now we do not have preaggregate right, can we modifiy this comment or better to remove if not required This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251181 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: better to refactor when we have chance, else it will be skipped actually This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378981200 ## File path: datamap/mv/core/src/test/scala/org/apache/carbondata/mv/plans/IsSPJGHSuite.scala ## @@ -20,6 +20,7 @@ package org.apache.carbondata.mv.plans import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation + Review comment: revert the change if not reqired This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378973665 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/command/CreateMaterializedViewCommand.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension.command + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command._ + +import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.events._ +import org.apache.carbondata.mv.extension.MVDataMapProvider + +/** + * Create Materialized View Command implementation + * It will create the MV table, load the MV table (if deferred rebuild is false), + * and register the MV schema in [[DataMapStoreManager]] + */ +case class CreateMaterializedViewCommand( +mvName: String, +properties: Map[String, String], +queryString: Option[String], +ifNotExistsSet: Boolean = false, +deferredRebuild: Boolean = false) + extends AtomicRunnableCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + private var dataMapProvider: DataMapProvider = _ + private var dataMapSchema: DataMapSchema = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + +setAuditInfo(Map("mvName" -> mvName) ++ properties) + +dataMapSchema = new DataMapSchema(mvName, MVDataMapProvider.MV_PROVIDER_NAME) +val property = properties.map(x => (x._1.trim, x._2.trim)).asJava +val javaMap = new java.util.HashMap[String, String](property) +javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString) +dataMapSchema.setProperties(javaMap) + +dataMapProvider = DataMapManager.get.getDataMapProvider(null, dataMapSchema, sparkSession) +if (DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala + .exists(_.getDataMapName.equalsIgnoreCase(dataMapSchema.getDataMapName))) { + if (!ifNotExistsSet) { +throw new MalformedMaterializedViewException( Review comment: can use s" " instead of `+` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378956132 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVAnalyzerRule.scala ## @@ -56,7 +56,7 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { // first check if any mv UDF is applied it is present is in plan // then call is from create MV so no need to transform the query plan // TODO Add different UDF name - case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase(CarbonEnv.MV_SKIP_RULE_UDF) => + case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase(MVUdf.MV_SKIP_RULE_UDF) => Review comment: can we rename to `MvUDF` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378948275 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ## @@ -290,4 +301,71 @@ public boolean isTimeSeries() { public void setTimeSeries(boolean timeSeries) { isTimeSeries = timeSeries; } + + public boolean supportIncrementalBuild() { +String prop = getProperties().get(DataMapProperty.FULL_REFRESH); +return prop == null || prop.equalsIgnoreCase("false"); + } + + public String getPropertiesAsString() { +String[] properties = getProperties().entrySet().stream() +// ignore internal used property +.filter(p -> +!p.getKey().equalsIgnoreCase(DataMapProperty.DEFERRED_REBUILD) && +!p.getKey().equalsIgnoreCase(DataMapProperty.CHILD_SELECT_QUERY) && +!p.getKey().equalsIgnoreCase(DataMapProperty.QUERY_TYPE) && +!p.getKey().equalsIgnoreCase(DataMapProperty.FULL_REFRESH)) +.map(p -> "'" + p.getKey() + "'='" + p.getValue() + "'") +.sorted() +.toArray(String[]::new); +return Strings.mkString(properties, ","); + } + + public String getTable() { +return relationIdentifier.getDatabaseName() + "." + relationIdentifier.getTableName(); Review comment: use point constants from CarbonCommonConstants This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378971907 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/command/CreateMaterializedViewCommand.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension.command + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command._ + +import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.events._ +import org.apache.carbondata.mv.extension.MVDataMapProvider + +/** + * Create Materialized View Command implementation + * It will create the MV table, load the MV table (if deferred rebuild is false), + * and register the MV schema in [[DataMapStoreManager]] + */ +case class CreateMaterializedViewCommand( +mvName: String, +properties: Map[String, String], +queryString: Option[String], +ifNotExistsSet: Boolean = false, +deferredRebuild: Boolean = false) + extends AtomicRunnableCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + private var dataMapProvider: DataMapProvider = _ + private var dataMapSchema: DataMapSchema = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + +setAuditInfo(Map("mvName" -> mvName) ++ properties) + +dataMapSchema = new DataMapSchema(mvName, MVDataMapProvider.MV_PROVIDER_NAME) +val property = properties.map(x => (x._1.trim, x._2.trim)).asJava Review comment: rename to datamap properties This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378962286 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVExtension.scala ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, SQLConf} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.rewrite.MVUdf +import org.apache.carbondata.mv.timeseries.TimeSeriesFunction + +// Materialized View extension for Apache Spark +// +// Following SQL command are added: +// 1. CREATE MATERIALIZED VIEW +// 2. DROP MATERIALIZED VIEW +// 3. SHOW MATERIALIZED VIEW +// 4. REFRESH MATERIALIZED VIEW +// +// Following optimizer rules are added: +// 1. Rewrite SQL statement by matching existing MV and +// select the lowest cost MV +// +class MVExtension extends (SparkSessionExtensions => Unit) { + + override def apply(extensions: SparkSessionExtensions): Unit = { +// MV parser +extensions.injectParser( + (sparkSession: SparkSession, parser: ParserInterface) => +new MVExtensionSqlParser(new SQLConf, sparkSession, parser)) + +// MV optimizer rules +extensions.injectPostHocResolutionRule( + (session: SparkSession) => OptimizerRule(session) ) + } +} + +case class OptimizerRule(session: SparkSession) extends Rule[LogicalPlan] { Review comment: can we rename it as `MVOptimizerRule` to be more specific This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378948860 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ## @@ -290,4 +301,71 @@ public boolean isTimeSeries() { public void setTimeSeries(boolean timeSeries) { isTimeSeries = timeSeries; } + + public boolean supportIncrementalBuild() { +String prop = getProperties().get(DataMapProperty.FULL_REFRESH); +return prop == null || prop.equalsIgnoreCase("false"); + } + + public String getPropertiesAsString() { +String[] properties = getProperties().entrySet().stream() +// ignore internal used property +.filter(p -> +!p.getKey().equalsIgnoreCase(DataMapProperty.DEFERRED_REBUILD) && +!p.getKey().equalsIgnoreCase(DataMapProperty.CHILD_SELECT_QUERY) && +!p.getKey().equalsIgnoreCase(DataMapProperty.QUERY_TYPE) && +!p.getKey().equalsIgnoreCase(DataMapProperty.FULL_REFRESH)) +.map(p -> "'" + p.getKey() + "'='" + p.getValue() + "'") +.sorted() +.toArray(String[]::new); +return Strings.mkString(properties, ","); + } + + public String getTable() { Review comment: i think can be renamed to `getUniqueTableName` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378981003 ## File path: datamap/mv/core/src/main/spark2.3/org/apache/carbondata/mv/extension/MVRules.scala ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.hive.CarbonMVRules + +class MVRules( Review comment: since two classes MVRules in 2.3 and 2.4 are same, can we keep in common place itself, instead of two classes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378961450 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVExtension.scala ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, SQLConf} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.carbondata.mv.rewrite.MVUdf +import org.apache.carbondata.mv.timeseries.TimeSeriesFunction + +// Materialized View extension for Apache Spark Review comment: please use Java Doc comment style. like /** * * **/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378968380 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala ## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, Literal, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average} +import org.apache.spark.sql.catalyst.plans.logical.{Join, Limit, LogicalPlan} +import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType} +import org.apache.spark.util.{DataMapUtil, PartitionUtils} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan} +import org.apache.carbondata.mv.plans.util.SQLBuilder +import org.apache.carbondata.mv.rewrite.{SummaryDatasetCatalog, Utils} +import org.apache.carbondata.mv.timeseries.{TimeSeriesFunction, TimeSeriesUtil} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Utility for MV datamap operations. + */ +object MVHelper { + + def createMVDataMap( + sparkSession: SparkSession, + dataMapSchema: DataMapSchema, + queryString: String, + ifNotExistsSet: Boolean = false): Unit = { +val dmProperties = dataMapSchema.getProperties.asScala +if (dmProperties.contains("streaming") && dmProperties("streaming").equalsIgnoreCase("true")) { + throw new MalformedCarbonCommandException( +s"Materialized view does not support streaming" + ) +} +val mvUtil = new MVUtil +mvUtil.validateDMProperty(dmProperties) +val logicalPlan = Utils.dropDummyFunc( + MVParser.getMVPlan(queryString, sparkSession)) +// if there is limit in MV ctas query string, throw exception, as its not a valid usecase +logicalPlan match { + case Limit(_, _) => +throw new MalformedCarbonCommandException("Materialized view does not support the query " + + "with limit") + case _ => +} +val selectTables = getTables(logicalPlan) +if (selectTables.isEmpty) { + throw new MalformedCarbonCommandException( +s"Non-Carbon table does not support creating MV datamap") +} +val modularPlan = validateMVQuery(sparkSession, logicalPlan) +val updatedQueryWithDb = modularPlan.asCompactSQL +val (timeSeriesColumn, granularity): (String, String) = validateMVTimeSeriesQuery( + logicalPlan, + dataMapSchema) +val fullRebuild = isFullReload(logicalPlan) +var counter = 0 +// the ctas query can have duplicate columns, so we should take distinct and create fields, +// so that it won't fail during create mv table +val fields = logicalPlan.output.map { attr => +
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378960831 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala ## @@ -207,3 +207,7 @@ class MVDataMapProvider( override def supportRebuild(): Boolean = true } + +object MVDataMapProvider { Review comment: No need to create this object right, we already have `DataMapClassProvider` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378973214 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/command/CreateMaterializedViewCommand.scala ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension.command + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.command._ + +import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.events._ +import org.apache.carbondata.mv.extension.MVDataMapProvider + +/** + * Create Materialized View Command implementation + * It will create the MV table, load the MV table (if deferred rebuild is false), + * and register the MV schema in [[DataMapStoreManager]] + */ +case class CreateMaterializedViewCommand( +mvName: String, +properties: Map[String, String], +queryString: Option[String], +ifNotExistsSet: Boolean = false, +deferredRebuild: Boolean = false) + extends AtomicRunnableCommand { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + private var dataMapProvider: DataMapProvider = _ + private var dataMapSchema: DataMapSchema = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + +setAuditInfo(Map("mvName" -> mvName) ++ properties) + +dataMapSchema = new DataMapSchema(mvName, MVDataMapProvider.MV_PROVIDER_NAME) +val property = properties.map(x => (x._1.trim, x._2.trim)).asJava +val javaMap = new java.util.HashMap[String, String](property) Review comment: no need to create on more map,`property` is already a java map, can put in same and set to datamapSchema This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378963805 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala ## @@ -17,7 +17,7 @@ package org.apache.carbondata.mv.rewrite -import org.apache.spark.Logging +import org.apache.spark.internal.Logging Review comment: revert the change if not required This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
akashrn5 commented on a change in pull request #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#discussion_r378966804 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVExtensionSqlParser.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import org.apache.spark.sql.{CarbonEnv, CarbonUtils, SparkSession} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.CarbonException + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Parser for Materialized View related command + */ +class MVExtensionSqlParser( +conf: SQLConf, +sparkSession: SparkSession, +initialParser: ParserInterface +) extends SparkSqlParser(conf) { + + val parser = new MVParser + + override def parsePlan(sqlText: String): LogicalPlan = { +parser.synchronized { + CarbonEnv.getInstance(sparkSession) +} +CarbonUtils.updateSessionInfoToCurrentThread(sparkSession) +try { + val plan = parser.parse(sqlText) + plan +} catch { + case ce: MalformedCarbonCommandException => +throw ce + case ex: Throwable => +try { + val parsedPlan = initialParser.parsePlan(sqlText) + CarbonScalaUtil.cleanParserThreadLocals + parsedPlan +} catch { + case mce: MalformedCarbonCommandException => +throw mce + case e: Throwable => +e.printStackTrace(System.err) Review comment: no need to print complete stackTrace i think This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379252776 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r37925 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251181 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: better to refactor when we have chance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r37925 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250891 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); -} - } - - public void startMergingIfPossible() { -File[] fileList; -if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { -fileList = procFiles.toArray(new File[procFiles.size()]); -this.procFiles = new ArrayList(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { +File[] fileList = procFiles.toArray(new File[procFiles.size()]); +this.procFiles = new ArrayList<>(); +startIntermediateMerging(fileList); Review comment: yes, that's why i gave comment with type cast also, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250976 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() Review comment: yes, i saw original code, but since you are doing refactoring in this PR, better change now This is an automated message from the Apache Git S
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250819 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** Review comment: i suggest to to improve the comments like what exactly the method does and how it is done for better understanding of any new developer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379250672 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { Review comment: yes, but better to rename, it will be meaningful This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-586090371 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1983/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-586073909 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/280/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379218383 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -101,6 +87,10 @@ private void startIntermediateMerging(File[] intermediateFiles) { + '_' + parameters.getRangeId() + '_' + System.nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sumitting request for intermediate merging no of files: " Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379217256 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); -} - } - - public void startMergingIfPossible() { -File[] fileList; -if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { -fileList = procFiles.toArray(new File[procFiles.size()]); -this.procFiles = new ArrayList(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { +File[] fileList = procFiles.toArray(new File[procFiles.size()]); +this.procFiles = new ArrayList<>(); +startIntermediateMerging(fileList); Review comment: `procFiles.toArray()` returns `Object[]`, type mismatch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379217256 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); -} - } - - public void startMergingIfPossible() { -File[] fileList; -if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { -fileList = procFiles.toArray(new File[procFiles.size()]); -this.procFiles = new ArrayList(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { +File[] fileList = procFiles.toArray(new File[procFiles.size()]); +this.procFiles = new ArrayList<>(); +startIntermediateMerging(fileList); Review comment: `procFiles.toArray()` returns `Object[]`, type missing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216889 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216281 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() Review comment: moved from `DataSorterAndWriter` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub a
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379216009 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { Review comment: only remove synchronized from origin This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215697 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** Review comment: When the method name is meaningful, comment is not necessary. Keep clean code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215166 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator[] sort(Iterator[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); -UnsafeSortDataRows sortDataRow = -new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); +UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: please check original code Line 91. As I said above, we can limit the pool size. Existing parameter `sortParameters.getNumberOfCores()` can be used This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379215166 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator[] sort(Iterator[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); -UnsafeSortDataRows sortDataRow = -new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); +UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: please check original code Line 91. As I said above, we can limit the pool size. We can use existing parameter from `sortParameters.getNumberOfCores()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
kevinjmh commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r379214468 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: just modify one parameter from origin code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3614: [CARBONDATA-3693] Separate Index command from DataMap command
CarbonDataQA1 commented on issue #3614: [CARBONDATA-3693] Separate Index command from DataMap command URL: https://github.com/apache/carbondata/pull/3614#issuecomment-585843181 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1982/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3614: [CARBONDATA-3693] Separate Index command from DataMap command
CarbonDataQA1 commented on issue #3614: [CARBONDATA-3693] Separate Index command from DataMap command URL: https://github.com/apache/carbondata/pull/3614#issuecomment-585816220 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/279/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] asfgit closed pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
asfgit closed pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
QiangCai commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585814883 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585810276 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1981/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
CarbonDataQA1 commented on issue #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#issuecomment-585779269 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1980/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo
CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#issuecomment-585768857 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1979/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585768043 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/278/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378874531 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ## @@ -80,29 +80,25 @@ public void initialize(SortParameters sortParameters) { public Iterator[] sort(Iterator[] iterators) throws CarbonDataLoadingException { int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); -UnsafeSortDataRows sortDataRow = -new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB); +UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[iterators.length]; Review comment: i think, this number of threads depends upon iterators right, so based on iterators it will be done parallel right. or are you planning to define separate property for it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378832453 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() Review comment: do not hardcode, use underscore constant from CarbonCommonConstant This is an automated message from the Apache Git Service. To respond to the
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378832145 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ## @@ -72,20 +67,11 @@ public void addFileToMerge(File sortTempFile) { // intermediate merging of sort temp files will be triggered synchronized (lockObject) { procFiles.add(sortTempFile); -} - } - - public void startMergingIfPossible() { -File[] fileList; -if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { - synchronized (lockObject) { -fileList = procFiles.toArray(new File[procFiles.size()]); -this.procFiles = new ArrayList(); + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { +File[] fileList = procFiles.toArray(new File[procFiles.size()]); +this.procFiles = new ArrayList<>(); +startIntermediateMerging(fileList); Review comment: directly can pass (File[]) procFiles.toArray() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378823627 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { Review comment: i think the size parameter is confusing, better rename it, because its an index passed right, we can may be name it as sortDataRowIndex This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378824300 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** Review comment: i suggest not to remove comments and better to improve them if they aren't giving any meaningful info. Please update the comment instead of removing in all places This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378832595 ## File path: processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ## @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(" Writing to temp file ** "); } - intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - try { -semaphore.acquire(); -dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); - } catch (InterruptedException e) { -LOGGER.error("exception occurred while trying to acquire a semaphore lock: ", e); -throw new CarbonSortKeyAndGroupByException(e); - } + handlePreviousPage(recordHolderListLocal); // create the new holder Array this.recordHolderList = new Object[this.sortBufferSize][]; this.entryCount = 0; } recordHolderList[entryCount++] = row; } - /** - * This method will be used to add new row - * - * @param rowBatch new rowBatch - * @throws CarbonSortKeyAndGroupByException problem while writing - */ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file -synchronized (addRowsLock) { - int sizeLeft = 0; - if (entryCount + size >= sortBufferSize) { -if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" Writing to temp file ** "); -} -intermediateFileMerger.startMergingIfPossible(); -Object[][] recordHolderListLocal = recordHolderList; -sizeLeft = sortBufferSize - entryCount; -if (sizeLeft > 0) { - System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); -} -try { - semaphore.acquire(); - dataSorterAndWriterExecutorService - .execute(new DataSorterAndWriter(recordHolderListLocal)); -} catch (Exception e) { - LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); - throw new CarbonSortKeyAndGroupByException(e); -} -// create the new holder Array -this.recordHolderList = new Object[this.sortBufferSize][]; -this.entryCount = 0; -size = size - sizeLeft; -if (size == 0) { - return; -} +int sizeLeft = 0; +if (entryCount + size >= sortBufferSize) { + if (LOGGER.isDebugEnabled()) { +LOGGER.debug(" Writing to temp file ** "); } - System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); - entryCount += size; + Object[][] recordHolderListLocal = recordHolderList; + sizeLeft = sortBufferSize - entryCount; + if (sizeLeft > 0) { +System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + handlePreviousPage(recordHolderListLocal); + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { +return; + } +} +System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size); +entryCount += size; + } + + /** + * sort and write data + * @param recordHolderArray + */ + private void handlePreviousPage(Object[][] recordHolderArray) + throws CarbonSortKeyAndGroupByException { +try { + long startTime = System.currentTimeMillis(); + if (parameters.getNumberOfNoDictSortColumns() > 0) { +Arrays.sort(recordHolderArray, +new NewRowComparator(parameters.getNoDictionarySortColumn(), +parameters.getNoDictDataType())); + } else { +Arrays.sort(recordHolderArray, +new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); + } + + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; + File sortTempFile = new File( + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); + // add sort temp filename to and arrayList. When the list size reaches
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance
akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] Optimize local sort performance URL: https://github.com/apache/carbondata/pull/3603#discussion_r378871132 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ## @@ -154,11 +155,13 @@ public void close() { /** * Below method will be used to process data to next step */ - private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) Review comment: i think there duplicate code fragments in unsafe and safe, please check and better to refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3701) Apache CarbonData should provides python interface to support deep learning framework MXNet to ready and write data from/to CarbonData
Bo Xu created CARBONDATA-3701: - Summary: Apache CarbonData should provides python interface to support deep learning framework MXNet to ready and write data from/to CarbonData Key: CARBONDATA-3701 URL: https://issues.apache.org/jira/browse/CARBONDATA-3701 Project: CarbonData Issue Type: Sub-task Reporter: Bo Xu Assignee: Bo Xu Nowadays AI model training is getting more and more popular. Currently many AI framework uses raw data files or row format data files for model training, it could not provide projection, filtering, and fast scan capability like in columnar store. So, if CarbonData supports AI framework, it can speed up model training by increase IO throughput, and provide more flexible training set selection ability to AI developers AI compute engine integration: MXNet integration: New python API in pycarbon to support MXNet to read data from CarbonData files for training model -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command
CarbonDataQA1 commented on issue #3612: [CARBONDATA-3694] Separate Materialized View command from DataMap command URL: https://github.com/apache/carbondata/pull/3612#issuecomment-585756614 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/277/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r378854282 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r378853809 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax
jackylk commented on a change in pull request #3609: [CARBONDATA-3689] Support independent MV extension and MV syntax URL: https://github.com/apache/carbondata/pull/3609#discussion_r378853691 ## File path: datamap/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVParser.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.extension + +import scala.language.implicitConversions +import scala.util.matching.Regex +import scala.util.parsing.combinator.PackratParsers +import scala.util.parsing.combinator.syntactical.StandardTokenParsers + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.{CarbonParserUtil, SqlLexical, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand} +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.hive.CarbonMVRules +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.mv.rewrite.MVUdf + +class MVParser extends StandardTokenParsers with PackratParsers { + + // Keywords used in this parser + protected val SELECT: Regex = carbonKeyWord("SELECT") + protected val CREATE: Regex = carbonKeyWord("CREATE") + protected val MATERIALIZED: Regex = carbonKeyWord("MATERIALIZED") + protected val VIEW: Regex = carbonKeyWord("VIEW") + protected val VIEWS: Regex = carbonKeyWord("VIEWS") + protected val AS: Regex = carbonKeyWord("AS") + protected val DROP: Regex = carbonKeyWord("DROP") + protected val SHOW: Regex = carbonKeyWord("SHOW") + protected val IF: Regex = carbonKeyWord("IF") + protected val EXISTS: Regex = carbonKeyWord("EXISTS") + protected val NOT: Regex = carbonKeyWord("NOT") + protected val MVPROPERTIES: Regex = carbonKeyWord("MVPROPERTIES") + protected val WITH: Regex = carbonKeyWord("WITH") + protected val DEFERRED: Regex = carbonKeyWord("DEFERRED") + protected val REBUILD: Regex = carbonKeyWord("REBUILD") + protected val ON: Regex = carbonKeyWord("ON") + protected val TABLE: Regex = carbonKeyWord("TABLE") + protected val ALTER: Regex = carbonKeyWord("ALTER") + protected val COMPACT: Regex = carbonKeyWord("COMPACT") + protected val IN: Regex = carbonKeyWord("IN") + protected val SEGMENT: Regex = carbonKeyWord("SEGMENT") + protected val ID: Regex = carbonKeyWord("ID") + protected val WHERE: Regex = carbonKeyWord("WHERE") + + /** + * This will convert key word to regular expression. + */ + private def carbonKeyWord(keys: String): Regex = { +("(?i)" + keys).r + } + + implicit def regexToParser(regex: Regex): Parser[String] = { +import lexical.Identifier +acceptMatch( + s"identifier matching regex ${ regex }", + { case Identifier(str) if regex.unapplySeq(str).isDefined => str } +) + } + + // By default, use Reflection to find the reserved words defined in the sub class. + // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this + // method during the parent class instantiation, because the sub class instance + // isn't created yet. + protected lazy val reservedWords: Seq[String] = + this +.getClass +.getMethods +.filter(_.getReturnType == classOf[Keyword]) +.map(_.invoke(this).asInstanceOf[Keyword].normalize) + + // Set the keywords as empty by default, will change that later. + override val lexical = new SqlLexical + + protected case class Keyword(str: String) { +def normalize: String = lexical.normalizeKeyword(str) +def parser: Parser[String] = normalize + } + + def parse(input: String): LogicalPlan = { +synchronized { + phrase(start)(new lexical.Scanner(input)) match { +case Success(plan, _) => + plan +case failureOrError => + CarbonException.analysisException(failureOrError.toString) +
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585748604 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1978/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo
CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#issuecomment-585745007 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/276/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585739133 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1977/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
CarbonDataQA1 commented on issue #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#issuecomment-585720917 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/275/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow
CarbonDataQA1 commented on issue #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#issuecomment-585718233 Build Success with Spark 2.4.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.4/274/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811905 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = +showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = +CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ +("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ +(TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + +if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811891 ## File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ## @@ -621,6 +624,52 @@ public boolean accept(CarbonFile file) { } } + /** + * this is the clean up added specifically for SI table, because after we merge the data files + * inside the secondary index table, we need to delete the stale carbondata files. + * refer {@link org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD} + */ + private static void cleanUpDataFilesAfterSmallFIlesMergeForSI(CarbonTable table, Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811841 ## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ## @@ -2341,4 +2347,72 @@ private CarbonCommonConstants() { * Default first day of week */ public static final String CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT = "SUNDAY"; + + @CarbonProperty + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER = + "carbon.infilter.subquery.pushdown.enable"; + + + /** + * CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT + */ + public static final String CARBON_PUSH_LEFTSEMIEXIST_JOIN_AS_IN_FILTER_DEFAULT = "false"; + + /** + * key to get broadcast record size from properties + */ + @CarbonProperty + public static final String BROADCAST_RECORD_SIZE = "broadcast.record.size"; + + /** + * default broadcast record size + */ + public static final String DEFAULT_BROADCAST_RECORD_SIZE = "100"; + + /** + * to enable SI lookup partial string + */ + @CarbonProperty + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING = "carbon.si.lookup.partialstring"; + + /** + * default value of ENABLE_SI_LOOKUP_PARTIALSTRING + */ + public static final String ENABLE_SI_LOOKUP_PARTIALSTRING_DEFAULT = "true"; + + /** + * configuration for launching the number of threads during secondary index creation + */ + @CarbonProperty + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS = + "carbon.secondary.index.creation.threads"; + + /** + * default value configuration for launching the number of threads during secondary + * index creation + */ + public static final String CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT = "1"; + + /** + * max value configuration for launching the number of threads during secondary + * index creation + */ + public static final int CARBON_SECONDARY_INDEX_CREATION_THREADS_MAX = 50; + + /** + * Enable SI segment Compaction / merge small files + */ + @CarbonProperty + public static final String CARBON_SI_SEGMENT_MERGE = "carbon.si.segment.merge"; + + /** + * Default value for SI segment Compaction / merge small files + * Making this true degrade the LOAD performance + * When the number of small files increase for SI segments(it can happen as number of columns will + * be less and we store position id and reference columns), user an either set to true which will + * merge the data files for upcoming loads or run SI rebuild command which does this job for all + * segments. (REBUILD INDEX ) + */ + public static final String DEFAULT_CARBON_SI_SEGMENT_MERGE = "false"; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811931 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ## @@ -609,6 +613,137 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } + protected lazy val indexCommands: Parser[LogicalPlan] = +showIndexes | createIndexTable | dropIndexTable | registerIndexes | rebuildIndex + + protected lazy val createIndexTable: Parser[LogicalPlan] = +CREATE ~> INDEX ~> ident ~ (ON ~> TABLE ~> (ident <~ ".").? ~ ident) ~ +("(" ~> repsep(ident, ",") <~ ")") ~ (AS ~> stringLit) ~ +(TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ { + case indexTableName ~ table ~ cols ~ indexStoreType ~ tblProp => + +if (!("carbondata".equalsIgnoreCase(indexStoreType) || + "org.apache.carbondata.format".equalsIgnoreCase(indexStoreType))) { + sys.error("Not a carbon format request") +} + +val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) +} + +val tableProperties = if (tblProp.isDefined) { + val tblProps = tblProp.get.map(f => f._1 -> f._2) + scala.collection.mutable.Map(tblProps: _*) +} else { + scala.collection.mutable.Map.empty[String, String] +} +// validate the tableBlockSize from table properties +CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE) +// validate for supported table properties +validateTableProperties(tableProperties) +// validate column_meta_cache proeperty if defined +val tableColumns: List[String] = cols.map(f => f.toLowerCase) +validateColumnMetaCacheAndCacheLevelProeprties(dbName, + indexTableName.toLowerCase, + tableColumns, + tableProperties) +validateColumnCompressorProperty(tableProperties + .getOrElse(CarbonCommonConstants.COMPRESSOR, null)) +val indexTableModel = SecondaryIndex(dbName, + tableName.toLowerCase, + tableColumns, + indexTableName.toLowerCase) +CreateIndexTable(indexTableModel, tableProperties) +} + + private def validateColumnMetaCacheAndCacheLevelProeprties(dbName: Option[String], + tableName: String, + tableColumns: Seq[String], + tableProperties: scala.collection.mutable.Map[String, String]): Unit = { +// validate column_meta_cache property +if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { + CommonUtil.validateColumnMetaCacheFields( +dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME), +tableName, +tableColumns, +tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).get, +tableProperties) +} +// validate cache_level property +if (tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).isDefined) { + CommonUtil.validateCacheLevel( +tableProperties.get(CarbonCommonConstants.CACHE_LEVEL).get, +tableProperties) +} + } + + private def validateColumnCompressorProperty(columnCompressor: String): Unit = { +// Add validatation for column compressor when creating index table +try { + if (null != columnCompressor) { +CompressorFactory.getInstance().getCompressor(columnCompressor) + } +} catch { + case ex: UnsupportedOperationException => +throw new InvalidConfigurationException(ex.getMessage) +} + } + + /** + * this method validates if index table properties contains other than supported ones + * + * @param tableProperties + */ + private def validateTableProperties(tableProperties: scala.collection.mutable.Map[String, +String]) = { +val supportedPropertiesForIndexTable = Seq("TABLE_BLOCKSIZE", + "COLUMN_META_CACHE", + "CACHE_LEVEL", + CarbonCommonConstants.COMPRESSOR.toUpperCase) +tableProperties.foreach { property => + if (!supportedPropertiesForIndexTable.contains(property._1.toUpperCase)) { +val errorMessage = "Unsupported Table property in index creation: " + property._1.toString +throw new MalformedCarbonCommandException(errorMessage) + } +} + } + + protected lazy val dropIndexTable: Parser[LogicalPlan] = +DROP ~> INDEX ~> opt(IF ~> EXISTS) ~ ident ~ (ON ~> (ident <~ ".").? ~ ident) <~ opt(";") ^^ { + case ifexist ~ indexTableName ~ table => +val (dbName, tableName) = table match { + case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) +} +DropIndexCommand(ifexist.isDefined, dbName, indexTableName.toLowerCase,
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table.
akashrn5 commented on a change in pull request #3608: [CARBONDATA-3680][alpha-feature]Support Secondary Index feature on carbon table. URL: https://github.com/apache/carbondata/pull/3608#discussion_r378811867 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ## @@ -61,7 +61,8 @@ public DiskBasedDMSchemaStorageProvider(String storePath) { this.storePath = CarbonUtil.checkAndAppendHDFSUrl(storePath); -this.mdtFilePath = storePath + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; +this.mdtFilePath = CarbonUtil.checkAndAppendHDFSUrl( Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo
CarbonDataQA1 commented on issue #3619: [HOTFIX] Remove unused fields in TableInfo URL: https://github.com/apache/carbondata/pull/3619#issuecomment-585712697 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1975/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801875 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ## @@ -17,85 +17,517 @@ package org.apache.spark.sql.execution.command.management -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} - -case class CarbonInsertIntoCommand( -relation: CarbonDatasourceHadoopRelation, -child: LogicalPlan, -overwrite: Boolean, -partition: Map[String, Option[String]]) +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/* +* insert into without df, by just using logical plan +* +*/ +case class CarbonInsertIntoCommand(databaseNameOp: Option[String], +tableName: String, +options: Map[String, String], +isOverwriteTable: Boolean, +var logicalPlan: LogicalPlan, +var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801536 ## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ## @@ -834,4 +843,179 @@ object CommonUtil { displaySize } + def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow, + fieldTypes: Seq[DataType], + outputArrayLength: Int): Array[AnyRef] = { +val data = new Array[AnyRef](outputArrayLength) +var i = 0 +val fieldTypesLen = fieldTypes.length +while (i < fieldTypesLen) { + if (!row.isNullAt(i)) { +fieldTypes(i) match { + case StringType => +data(i) = row.getString(i) + case d: DecimalType => +data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal + case arrayType : ArrayType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), arrayType) + case structType : StructType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i, + structType.fields.length), structType) + case mapType : MapType => +data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), mapType) + case other => Review comment: it was from base code. yes. done like that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801973 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util + +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, UpdateTableModel} +import org.apache.spark.util.CausedBy + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.OperationContext +import org.apache.carbondata.events.exception.PreEventException +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory + +/* +* insert into with df, doesn't use logical plan +* +*/ +case class CarbonInsertIntoWithDf(databaseNameOp: Option[String], +tableName: String, +options: Map[String, String], +isOverwriteTable: Boolean, +var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(), Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801595 ## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ## @@ -140,6 +141,89 @@ object CarbonScalaUtil { } } + /** + * Converts incoming value to String after converting data as per the data type. + * + * @param value Input value to convert + * @param dataTypeDatatype to convert and then convert to String + * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes + * @param dateFormat DataFormat to convert in case of DateType datatype + * @return converted String + */ + def convertStaticPartitionToValues( + value: String, + dataType: DataType, + timeStampFormat: SimpleDateFormat, + dateFormat: SimpleDateFormat): AnyRef = { +val defaultValue = value != null && value.equalsIgnoreCase(hiveDefaultPartition) +try { + dataType match { +case TimestampType if timeStampFormat != null => + val formattedString = +if (defaultValue) { + timeStampFormat.format(new Date()) +} else { + timeStampFormat.format(DateTimeUtils.stringToTime(value)) +} + val convertedValue = +DataTypeUtil + .getDataBasedOnDataType(formattedString, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType)) + convertedValue +case DateType if dateFormat != null => + val formattedString = +if (defaultValue) { + dateFormat.format(new Date()) +} else { + dateFormat.format(DateTimeUtils.stringToTime(value)) +} + val convertedValue = +DataTypeUtil + .getDataBasedOnDataType(formattedString, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType)) + val date = generateDictionaryKey(convertedValue.asInstanceOf[Long]) + date.asInstanceOf[AnyRef] +case BinaryType => + // TODO: decode required ? currently it is working + ByteUtil.toBytes(value) +case _ => + val convertedValue = +DataTypeUtil + .getDataBasedOnDataType(value, + CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)) + if (convertedValue == null) { +if (defaultValue) { + dataType match { +case BooleanType => + return false.asInstanceOf[AnyRef] +case _ => + return 0.asInstanceOf[AnyRef] + } +} +throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") + } + convertedValue + } +} catch { + case e: Exception => +throw new MalformedCarbonCommandException( + s"Value $value with datatype $dataType on static partition is not correct") +} + } + + def generateDictionaryKey(timeValue: Long): Int = { +if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE || +timeValue > DateDirectDictionaryGenerator.MAX_VALUE) { + if (LOGGER.isDebugEnabled) { +LOGGER.debug("Value for date type column is not in valid range. Value considered as null.") + } + return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL +} +Math.floor(timeValue.toDouble / DateDirectDictionaryGenerator.MILLIS_PER_DAY).toInt + +DateDirectDictionaryGenerator.cutOffDate Review comment: no change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801434 ## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ## @@ -263,17 +265,35 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.setPreFetch(false) val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() -val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context) val serializer = SparkEnv.get.closureSerializer.newInstance() var serializeBytes: Array[Byte] = null -while(partitionIterator.hasNext) { - val value = partitionIterator.next() - if (serializeBytes == null) { -serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array() +if (isDataFrame) { + val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, +context) + while (partitionIterator.hasNext) { +val value = partitionIterator.next() +if (serializeBytes == null) { + serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array() +} +recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, + carbonLoadModel, context) } - recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, +} else { + // For internal row, no need of converter and re-arrange step, + model.setLoadWithoutConverterWithoutReArrangeStep(true) Review comment: yes. done like that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services