[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569481217 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1348/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569480941 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1335/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569477613 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1325/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (CARBONDATA-3618) Update query should throw exception if key has more than one value
[ https://issues.apache.org/jira/browse/CARBONDATA-3618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacky Li resolved CARBONDATA-3618. -- Fix Version/s: 2.0.0 Resolution: Fixed > Update query should throw exception if key has more than one value > -- > > Key: CARBONDATA-3618 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3618 > Project: CarbonData > Issue Type: Bug >Reporter: Ajantha Bhat >Assignee: Ajantha Bhat >Priority: Minor > Fix For: 2.0.0 > > Time Spent: 5.5h > Remaining Estimate: 0h > > problem: Update query should throw exception if key has more than one value > cause : Currently update command is adding multiple entries of key instead of > throwing exception when update result key has more than one value to replace. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] asfgit closed pull request #3509: [CARBONDATA-3618] Update query should throw exception if key has more than one value
asfgit closed pull request #3509: [CARBONDATA-3618] Update query should throw exception if key has more than one value URL: https://github.com/apache/carbondata/pull/3509 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (CARBONDATA-3614) Support Alter table set/unset table property for long_string_columns
[ https://issues.apache.org/jira/browse/CARBONDATA-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacky Li resolved CARBONDATA-3614. -- Fix Version/s: 2.0.0 Assignee: Ajantha Bhat Resolution: Fixed > Support Alter table set/unset table property for long_string_columns > > > Key: CARBONDATA-3614 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3614 > Project: CarbonData > Issue Type: Bug >Reporter: Ajantha Bhat >Assignee: Ajantha Bhat >Priority: Minor > Fix For: 2.0.0 > > Time Spent: 10h 20m > Remaining Estimate: 0h > > Support Alter table set/unset table property for long_string_columns -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] asfgit closed pull request #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns
asfgit closed pull request #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns URL: https://github.com/apache/carbondata/pull/3504 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns
jackylk commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns URL: https://github.com/apache/carbondata/pull/3504#issuecomment-569476512 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827632 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( Review comment: formatted like: ``` partitoinDataList.map { case (partitionMap, splits) => ... } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361828090 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ## @@ -898,12 +896,32 @@ case class CarbonLoadDataCommand( sortScope: SortScopeOptions.SortScope, isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = { // Converts the data as per the loading steps before give it to writer or sorter -val updatedRdd = convertData( +val convertedRdd = convertData( rdd, sparkSession, loadModel, isDataFrame, partitionValues) +val updatedRdd = if (isDataFrame) { + val columnCount = loadModel.getCsvHeaderColumns.length + convertedRdd.map{ row => Review comment: add space before `{` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361828078 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ## @@ -794,8 +792,8 @@ case class CarbonLoadDataCommand( sparkSession, operationContext) val logicalPlan = if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) { -var numPartitions = - CarbonDataProcessorUtil.getGlobalSortPartitions(carbonLoadModel.getGlobalSortPartitions) +var numPartitions = 1 +// CarbonDataProcessorUtil.getGlobalSortPartitions(carbonLoadModel.getGlobalSortPartitions) Review comment: remove it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361828062 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( + partitionData => { +index = index + 1 +val partition = partitionData._1 +val splits = partitionData._2 +LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") +val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits) +val columns = dataFrame.columns +val header = columns.mkString(",") +val selectColumns = columns.filter(!partition.contains(_)) +val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail : _*) +val loadCommand = CarbonLoadDataCommand( + databaseNameOp = Option(table.getDatabaseName), + tableName = table.getTableName, + factPathFromUser = null, + dimFilesPath = Seq(), + options = scala.collection.immutable.Map("fileheader" -> header), + isOverwriteTable = false, + inputSqlString = null, + dataFrame = Some(selectedDataFrame), + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = partition +) +loadCommand.run(spark) + } +) LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") } + private def listPartitionFiles(stageInputs : Seq[StageInput]): + Seq[(Map[String, Option[String]], Seq[InputSplit])] = { +val partitionMap = new util.HashMap[Map[String, Option[String]], util.List[InputSplit]]() +stageInputs.foreach( + stageInput => { +val locations = stageInput.getLocations.asScala +locations.foreach( + location => { +val partition = location.getPartitions.asScala.map(t => (t._1, Option(t._2))).toMap Review comment: Why Option is required? Can't we remove it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827985 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( + partitionData => { +index = index + 1 +val partition = partitionData._1 +val splits = partitionData._2 +LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") Review comment: suggest mention it is for which partition in the log This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827670 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( + partitionData => { +index = index + 1 +val partition = partitionData._1 +val splits = partitionData._2 +LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") +val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits) +val columns = dataFrame.columns +val header = columns.mkString(",") +val selectColumns = columns.filter(!partition.contains(_)) +val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail : _*) +val loadCommand = CarbonLoadDataCommand( + databaseNameOp = Option(table.getDatabaseName), + tableName = table.getTableName, + factPathFromUser = null, + dimFilesPath = Seq(), + options = scala.collection.immutable.Map("fileheader" -> header), + isOverwriteTable = false, + inputSqlString = null, + dataFrame = Some(selectedDataFrame), + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = partition +) +loadCommand.run(spark) + } +) LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") } + private def listPartitionFiles(stageInputs : Seq[StageInput]): Review comment: describe the return tuple in comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827670 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( + partitionData => { +index = index + 1 +val partition = partitionData._1 +val splits = partitionData._2 +LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") +val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits) +val columns = dataFrame.columns +val header = columns.mkString(",") +val selectColumns = columns.filter(!partition.contains(_)) +val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail : _*) +val loadCommand = CarbonLoadDataCommand( + databaseNameOp = Option(table.getDatabaseName), + tableName = table.getTableName, + factPathFromUser = null, + dimFilesPath = Seq(), + options = scala.collection.immutable.Map("fileheader" -> header), + isOverwriteTable = false, + inputSqlString = null, + dataFrame = Some(selectedDataFrame), + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = partition +) +loadCommand.run(spark) + } +) LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") } + private def listPartitionFiles(stageInputs : Seq[StageInput]): Review comment: describe the return value in comment, otherwise it is not easy to understand This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827656 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( + partitionData => { +index = index + 1 +val partition = partitionData._1 +val splits = partitionData._2 +LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") +val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits) +val columns = dataFrame.columns +val header = columns.mkString(",") +val selectColumns = columns.filter(!partition.contains(_)) +val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail : _*) +val loadCommand = CarbonLoadDataCommand( + databaseNameOp = Option(table.getDatabaseName), + tableName = table.getTableName, + factPathFromUser = null, + dimFilesPath = Seq(), + options = scala.collection.immutable.Map("fileheader" -> header), + isOverwriteTable = false, + inputSqlString = null, + dataFrame = Some(selectedDataFrame), + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = partition +) +loadCommand.run(spark) + } +) LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") } + private def listPartitionFiles(stageInputs : Seq[StageInput]): Review comment: move `:` to next line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827632 ## File path: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala ## @@ -269,11 +281,90 @@ case class CarbonInsertFromStageCommand( SparkSQLUtil.sessionState(spark).newHadoopConf() ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } +} + +LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Start global sort loading of partition table + */ + private def startLoadingWithPartition( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] +): Unit = { +val partitionDataList = listPartitionFiles(stageInput) +val start = System.currentTimeMillis() +var index = 0 +partitionDataList.map( Review comment: formatted like: ``` partitoinDataList.map { partitionData => ... } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
jackylk commented on a change in pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#discussion_r361827544 ## File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala ## @@ -189,4 +194,20 @@ class TestCarbonPartitionWriter extends QueryTest { output.asScala } + private def delDir(dir: File): Boolean = { +if (dir.isDirectory) { + val children = dir.list + if (children != null) { +var i = 0 +while (i < children.length) { Review comment: declare a val like `val len = children.length` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361824649 ## File path: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java ## @@ -77,7 +76,6 @@ public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPa private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec, ColumnPage inputPage) { switch (columnSpec.getColumnType()) { - case GLOBAL_DICTIONARY: case DIRECT_DICTIONARY: Review comment: Yes, if we can think of a good name, we can consider it in another PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569442508 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1347/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3515: [CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column
akashrn5 commented on a change in pull request #3515: [CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column URL: https://github.com/apache/carbondata/pull/3515#discussion_r361808788 ## File path: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ## @@ -442,7 +442,8 @@ object DataLoadProcessBuilderOnSpark { .map { row => new GenericRow(row.getData.asInstanceOf[Array[Any]]) Review comment: Here rdd creation is not required, please remove and please give the reason why we need to use this API instead of existing one in comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569442151 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1334/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569438006 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1324/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569436744 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1346/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table
CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#issuecomment-569436583 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1332/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569434077 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1333/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table
CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#issuecomment-569430944 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1345/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569428344 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1323/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table
CarbonDataQA1 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#issuecomment-569426381 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1322/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (CARBONDATA-3640) Insert from stage command support partition table.
[ https://issues.apache.org/jira/browse/CARBONDATA-3640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhi Liu updated CARBONDATA-3640: Description: Currently, The CarbonInsertFromStageCommand only support writing to a non-partition table. Sometimes, We also need it support writing to a partition table. > Insert from stage command support partition table. > -- > > Key: CARBONDATA-3640 > URL: https://issues.apache.org/jira/browse/CARBONDATA-3640 > Project: CarbonData > Issue Type: New Feature >Reporter: Zhi Liu >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Currently, The CarbonInsertFromStageCommand only support writing to a > non-partition table. > Sometimes, We also need it support writing to a partition table. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] niuge01 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table
niuge01 commented on issue #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542#issuecomment-569423869 please test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] niuge01 opened a new pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table
niuge01 opened a new pull request #3542: [CARBONDATA-3640] Insert from stage command support partition table URL: https://github.com/apache/carbondata/pull/3542 This feature is actually implemented by the CarbonDataLoadCommand. - [ ] Any interfaces changed? NO - [ ] Any backward compatibility impacted? NO - [ ] Document update required? NA - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. YES - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361798058 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java ## @@ -98,70 +82,21 @@ public void initialize() throws IOException { configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD) .toString()); List fieldConverterList = new ArrayList<>(); -localCaches = new Map[fields.length]; long lruCacheStartTime = System.currentTimeMillis(); -DictionaryClient client = createDictionaryClient(); -dictClients.add(client); for (int i = 0; i < fields.length; i++) { - localCaches[i] = new ConcurrentHashMap<>(); FieldConverter fieldConverter = FieldEncoderFactory.getInstance() - .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, - configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord, - configuration.getParentTablePath(), isConvertToBinary, + .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary, (String) configuration.getDataLoadProperty( CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER)); fieldConverterList.add(fieldConverter); } CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0); -fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]); +fieldConverters = fieldConverterList.toArray(new FieldConverter[0]); Review comment: It is recommended to give 0 size array to `.toArray` func in java This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3640) Insert from stage command support partition table.
Zhi Liu created CARBONDATA-3640: --- Summary: Insert from stage command support partition table. Key: CARBONDATA-3640 URL: https://issues.apache.org/jira/browse/CARBONDATA-3640 Project: CarbonData Issue Type: New Feature Reporter: Zhi Liu -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361797805 ## File path: docs/ddl-of-carbondata.md ## @@ -83,7 +83,6 @@ CarbonData DDL statements are documented here,which includes: | Property | Description | | | | -| [DICTIONARY_INCLUDE](#dictionary-encoding-configuration) | Columns for which dictionary needs to be generated | | [NO_INVERTED_INDEX](#inverted-index-configuration) | Columns to exclude from inverted index generation| | [INVERTED_INDEX](#inverted-index-configuration) | Columns to include for inverted index generation | Review comment: inverted index is not removed, it is required This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361797763 ## File path: docs/configuration-parameters.md ## @@ -90,7 +90,6 @@ This section provides the details of all the configurations required for the Car | enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. | | carbon.dictionary.chunk.size | 1 | CarbonData generates dictionary keys and writes them to separate dictionary file during data loading. To optimize the IO, this configuration determines the number of dictionary keys to be persisted to dictionary file at a time. **NOTE:** Writing to file also serves as a commit point to the dictionary generated. Increasing more values in memory causes more data loss during system or application failure. It is advised to alter this configuration judiciously. | | dictionary.worker.threads | 1 | CarbonData supports Optimized data loading by relying on a dictionary server. Dictionary server helps to maintain dictionary values independent of the data loading and there by avoids reading the same input data multiples times. This configuration determines the number of concurrent dictionary generation or request that needs to be served by the dictionary server. **NOTE:** This configuration takes effect when ***carbon.options.single.pass*** is configured as true. Please refer to *carbon.options.single.pass*to understand how dictionary server optimizes data loading. | Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361797761 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ## @@ -704,8 +704,6 @@ public boolean isHivePartitionTable() { public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { AbsoluteTableIdentifier absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); -absoluteTableIdentifier.setDictionaryPath( - tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.DICTIONARY_PATH)); return absoluteTableIdentifier; Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
jackylk commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361797717 ## File path: conf/carbon.properties.template ## @@ -85,7 +85,6 @@ carbon.major.compaction.size=1024 ##Min max feature is added to enhance query performance. To disable this feature, make it false. #carbon.enableMinMax=true - Global Dictionary Configurations Review comment: they are still required This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361797527 ## File path: integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink + +import java.io.{File, InputStreamReader} +import java.util +import java.util.{Collections, Properties} + +import com.google.gson.Gson +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.statusmanager.StageInput +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.core.fs.Path +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.spark.sql.test.util.QueryTest +import org.junit.Test + +import scala.collection.JavaConverters._ + +class TestCarbonPartitionWriter extends QueryTest { + + val tableName = "test_flink_partition" + + @Test + def testLocal(): Unit = { +sql(s"drop table if exists $tableName").collect() +sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata + | PARTITIONED BY (hour_ string, date_ string) Review comment: can you add 2 more testcase: 1. use Date type as partition column 2. use Int type as partition column This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361797390 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, + final List partitionLocationList, + final List partitions + ) { +final File[] files = directory.listFiles(); +if (files == null) { + return; +} +Map fileNameMapLength = new HashMap<>(); +for (File file : files) { + if (file.isDirectory()) { +partitions.add(file.getName()); +uploadSegmentDataFiles(file, remotePath, partitionLocationList, partitions); +partitions.remove(partitions.size() - 1); +continue; + } + fileNameMapLength.put(file.getName(), file.length()); + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); + } + try { +CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); + } catch (CarbonDataWriterException exception) { +LOGGER.error(exception.getMessage(), exception); +throw exception; + } + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); + } +} +if (!fileNameMapLength.isEmpty()) { + final Map partitionMap = new
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796705 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java ## @@ -184,29 +204,15 @@ public void close() { } } - private Map uploadSegmentDataFiles( - final String localPath, final String remotePath) { -final File[] files = new File(localPath).listFiles(); -if (files == null) { - return new HashMap<>(0); + private void closeWriters() throws IOException { +if (this.writerFactory == null) { + return; } -Map fileNameMapLength = new HashMap<>(files.length); -for (File file : files) { - fileNameMapLength.put(file.getName(), file.length()); - if (LOGGER.isDebugEnabled()) { -LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); - } - try { -CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); - } catch (CarbonDataWriterException exception) { -LOGGER.error(exception.getMessage(), exception); -throw exception; - } - if (LOGGER.isDebugEnabled()) { -LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); - } +final List writers = Review comment: ```suggestion final List writers = ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361797118 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, + final List partitionLocationList, + final List partitions + ) { +final File[] files = directory.listFiles(); +if (files == null) { + return; +} +Map fileNameMapLength = new HashMap<>(); +for (File file : files) { + if (file.isDirectory()) { +partitions.add(file.getName()); +uploadSegmentDataFiles(file, remotePath, partitionLocationList, partitions); +partitions.remove(partitions.size() - 1); +continue; + } + fileNameMapLength.put(file.getName(), file.length()); + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); + } + try { +CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); + } catch (CarbonDataWriterException exception) { +LOGGER.error(exception.getMessage(), exception); +throw exception; + } + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); Review comment: ```suggestion LOGGER.debug("Upload file[" + file.getAbsolutePath()
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796904 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); Review comment: ```suggestion LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] finished."); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361797118 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, + final List partitionLocationList, + final List partitions + ) { +final File[] files = directory.listFiles(); +if (files == null) { + return; +} +Map fileNameMapLength = new HashMap<>(); +for (File file : files) { + if (file.isDirectory()) { +partitions.add(file.getName()); +uploadSegmentDataFiles(file, remotePath, partitionLocationList, partitions); +partitions.remove(partitions.size() - 1); +continue; + } + fileNameMapLength.put(file.getName(), file.length()); + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); + } + try { +CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); + } catch (CarbonDataWriterException exception) { +LOGGER.error(exception.getMessage(), exception); +throw exception; + } + if (LOGGER.isDebugEnabled()) { +LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); Review comment: ```suggestion LOGGER.debug("Upload file[" + file.getAbsolutePath()
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361797038 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, + final List partitionLocationList, + final List partitions + ) { +final File[] files = directory.listFiles(); +if (files == null) { Review comment: ```suggestion if (files == null || files.length == 0) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796993 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, Review comment: ```suggestion final File localPath, final String remotePath, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796993 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( + final File directory, final String remotePath, Review comment: ```suggestion final File localDirectory, final String remotePath, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796894 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); Review comment: create a constant for 1024 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796904 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); Review comment: ```suggestion LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] finished."); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796868 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { Review comment: ```suggestion if (files == null || files.length == 0) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796793 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { +ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); +} +this.factory = factory; +this.identifier = identifier; +this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { +return this.factory; + } + + @Override + public String getIdentifier() { +return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return null. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { +if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { +return null; + } + Map fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { +fileNameMapLength.put(file.getName(), file.length()); +if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); +} +try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); +} catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; +} +if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); +} + } + return new StageInput(remotePath, fileNameMapLength); +} else { + final List partitionLocationList = new ArrayList<>(); + final List partitions = new ArrayList<>(); + uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, partitions); + if (partitionLocationList.isEmpty()) { +return null; + } else { +return new StageInput(remotePath, partitionLocationList); + } +} + } + + private static void uploadSegmentDataFiles( Review comment: please add comment to describe this function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796748 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter { +public abstract class CarbonWriter extends ProxyFileWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); Review comment: ```suggestion LogServiceFactory.getLogService(CarbonWriter.class.getName()); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table
jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796705 ## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java ## @@ -184,29 +204,15 @@ public void close() { } } - private Map uploadSegmentDataFiles( - final String localPath, final String remotePath) { -final File[] files = new File(localPath).listFiles(); -if (files == null) { - return new HashMap<>(0); + private void closeWriters() throws IOException { +if (this.writerFactory == null) { + return; } -Map fileNameMapLength = new HashMap<>(files.length); -for (File file : files) { - fileNameMapLength.put(file.getName(), file.length()); - if (LOGGER.isDebugEnabled()) { -LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); - } - try { -CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); - } catch (CarbonDataWriterException exception) { -LOGGER.error(exception.getMessage(), exception); -throw exception; - } - if (LOGGER.isDebugEnabled()) { -LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); - } +final List writers = Review comment: ```suggestion final List writers = ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569418286 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1343/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery
CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery URL: https://github.com/apache/carbondata/pull/3528#issuecomment-569418042 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1344/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive
CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive URL: https://github.com/apache/carbondata/pull/3541#issuecomment-569417803 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1342/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive
CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive URL: https://github.com/apache/carbondata/pull/3541#issuecomment-569412192 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1331/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569410371 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1330/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery
CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery URL: https://github.com/apache/carbondata/pull/3528#issuecomment-569407467 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1321/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569407298 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1320/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns
CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns URL: https://github.com/apache/carbondata/pull/3540#issuecomment-569407061 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1319/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery
CarbonDataQA1 commented on issue #3528: [CARBONDATA-3630] update should support limit 1 sub query and empty result subquery URL: https://github.com/apache/carbondata/pull/3528#issuecomment-569407026 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1329/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code
CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code URL: https://github.com/apache/carbondata/pull/3539#issuecomment-569406972 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1339/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive
CarbonDataQA1 commented on issue #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive URL: https://github.com/apache/carbondata/pull/3541#issuecomment-569406901 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1318/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code
CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code URL: https://github.com/apache/carbondata/pull/3539#issuecomment-569406631 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1326/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV
CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV URL: https://github.com/apache/carbondata/pull/3530#issuecomment-569406601 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1328/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569406360 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1327/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV
CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV URL: https://github.com/apache/carbondata/pull/3530#issuecomment-569406146 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1340/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569405950 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1341/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns
CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns URL: https://github.com/apache/carbondata/pull/3540#issuecomment-569405817 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1325/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns
CarbonDataQA1 commented on issue #3540: [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from CSV flow with binary non-sort columns URL: https://github.com/apache/carbondata/pull/3540#issuecomment-569405635 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1338/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361787554 ## File path: docs/ddl-of-carbondata.md ## @@ -83,7 +83,6 @@ CarbonData DDL statements are documented here,which includes: | Property | Description | | | | -| [DICTIONARY_INCLUDE](#dictionary-encoding-configuration) | Columns for which dictionary needs to be generated | | [NO_INVERTED_INDEX](#inverted-index-configuration) | Columns to exclude from inverted index generation| | [INVERTED_INDEX](#inverted-index-configuration) | Columns to include for inverted index generation | Review comment: no need This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361788681 ## File path: processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java ## @@ -98,70 +82,21 @@ public void initialize() throws IOException { configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD) .toString()); List fieldConverterList = new ArrayList<>(); -localCaches = new Map[fields.length]; long lruCacheStartTime = System.currentTimeMillis(); -DictionaryClient client = createDictionaryClient(); -dictClients.add(client); for (int i = 0; i < fields.length; i++) { - localCaches[i] = new ConcurrentHashMap<>(); FieldConverter fieldConverter = FieldEncoderFactory.getInstance() - .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, - configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord, - configuration.getParentTablePath(), isConvertToBinary, + .createFieldEncoder(fields[i], i, nullFormat, isEmptyBadRecord, isConvertToBinary, (String) configuration.getDataLoadProperty( CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER)); fieldConverterList.add(fieldConverter); } CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0); -fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]); +fieldConverters = fieldConverterList.toArray(new FieldConverter[0]); Review comment: why size is zero? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361787085 ## File path: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ## @@ -704,8 +704,6 @@ public boolean isHivePartitionTable() { public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { AbsoluteTableIdentifier absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); -absoluteTableIdentifier.setDictionaryPath( - tableInfo.getFactTable().getTableProperties().get(CarbonCommonConstants.DICTIONARY_PATH)); return absoluteTableIdentifier; Review comment: use "return tableInfo.getOrCreateAbsoluteTableIdentifier()" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361788336 ## File path: integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ## @@ -767,78 +766,19 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { } } -// All excluded cols should be there in create table cols if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) { - LOGGER.warn("dictionary_exclude option was deprecated, " + - "by default string column does not use global dictionary.") - dictExcludeCols = - tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim) - dictExcludeCols -.foreach { dictExcludeCol => - if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) { -val errorMsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol + - " does not exist in table or unsupported for complex child column. " + - "Please check the create table statement." -throw new MalformedCarbonCommandException(errorMsg) - } else { -val dataType = fields.find(x => - x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get -if (!isDataTypeSupportedForDictionary_Exclude(dataType)) { - val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() + - " data type column: " + dictExcludeCol - throw new MalformedCarbonCommandException(errorMsg) -} else if (varcharCols.exists(x => x.equalsIgnoreCase(dictExcludeCol))) { - throw new MalformedCarbonCommandException( -"DICTIONARY_EXCLUDE is unsupported for long string datatype column: " + -dictExcludeCol) -} - } -} + // dictionary_exclude is not supported since 2.0 + throw new DeprecatedFeatureException("dictionary_exclude") } -// All included cols should be there in create table cols if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) { - dictIncludeCols = - tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim) - dictIncludeCols.foreach { distIncludeCol => -if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) { - val errorMsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim + - " does not exist in table or unsupported for complex child column. " + - "Please check the create table statement." - throw new MalformedCarbonCommandException(errorMsg) -} -val rangeField = fields.find(_.column.equalsIgnoreCase(distIncludeCol.trim)) -if ("binary".equalsIgnoreCase(rangeField.get.dataType.get)) { - throw new MalformedCarbonCommandException( -"DICTIONARY_INCLUDE is unsupported for binary data type column: " + -distIncludeCol.trim) -} -if (varcharCols.exists(x => x.equalsIgnoreCase(distIncludeCol.trim))) { - throw new MalformedCarbonCommandException( -"DICTIONARY_INCLUDE is unsupported for long string datatype column: " + -distIncludeCol.trim) -} - } -} - -// include cols should not contain exclude cols -dictExcludeCols.foreach { dicExcludeCol => - if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) { -val errorMsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol + - " with DICTIONARY_INCLUDE. Please check the create table statement." -throw new MalformedCarbonCommandException(errorMsg) - } + // dictionary_include is not supported since 2.0 + throw new DeprecatedFeatureException("dictionary_include") } // by default consider all String cols as dims and if any dictionary include isn't present then // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims fields.foreach { field => - if (dictExcludeCols.exists(x => x.equalsIgnoreCase(field.column))) { -noDictionaryDims :+= field.column -dimFields += field - } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) { -dimFields += field - } else if (field.dataType.get.toUpperCase.equals("TIMESTAMP") && Review comment: need check this change, maybe exclude timestamp column, it will be non-dict column This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361787853 ## File path: hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java ## @@ -60,11 +59,9 @@ public void initialize(CarbonColumn[] carbonColumns, Cache forwardDictionaryCache = cacheProvider .createCache(CacheType.FORWARD_DICTIONARY); dataTypes[i] = carbonColumns[i].getDataType(); -String dictionaryPath = carbonTable.getTableInfo().getFactTable().getTableProperties() -.get(CarbonCommonConstants.DICTIONARY_PATH); dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier( carbonTable.getAbsoluteTableIdentifier(), -carbonColumns[i].getColumnIdentifier(), dataTypes[i], dictionaryPath)); +carbonColumns[i].getColumnIdentifier(), dataTypes[i])); Review comment: use CarbonRowReadSupport instead of this class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361786417 ## File path: conf/carbon.properties.template ## @@ -85,7 +85,6 @@ carbon.major.compaction.size=1024 ##Min max feature is added to enhance query performance. To disable this feature, make it false. #carbon.enableMinMax=true - Global Dictionary Configurations Review comment: how about "Timestamp/Date encoding Configurations" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361786862 ## File path: core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java ## @@ -77,7 +76,6 @@ public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPa private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec, ColumnPage inputPage) { switch (columnSpec.getColumnType()) { - case GLOBAL_DICTIONARY: case DIRECT_DICTIONARY: Review comment: can we use a new name instead of DIRECT_DICTIONARY? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature
QiangCai commented on a change in pull request #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#discussion_r361787474 ## File path: docs/configuration-parameters.md ## @@ -90,7 +90,6 @@ This section provides the details of all the configurations required for the Car | enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. | | carbon.dictionary.chunk.size | 1 | CarbonData generates dictionary keys and writes them to separate dictionary file during data loading. To optimize the IO, this configuration determines the number of dictionary keys to be persisted to dictionary file at a time. **NOTE:** Writing to file also serves as a commit point to the dictionary generated. Increasing more values in memory causes more data loss during system or application failure. It is advised to alter this configuration judiciously. | | dictionary.worker.threads | 1 | CarbonData supports Optimized data loading by relying on a dictionary server. Dictionary server helps to maintain dictionary values independent of the data loading and there by avoids reading the same input data multiples times. This configuration determines the number of concurrent dictionary generation or request that needs to be served by the dictionary server. **NOTE:** This configuration takes effect when ***carbon.options.single.pass*** is configured as true. Please refer to *carbon.options.single.pass*to understand how dictionary server optimizes data loading. | Review comment: remove this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
CarbonDataQA1 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569402636 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1337/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (CARBONDATA-3639) Global Sort CSV loading flow with Binary non-sort columns throws exception
[ https://issues.apache.org/jira/browse/CARBONDATA-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ajantha Bhat updated CARBONDATA-3639: - Description: h6. Global Sort CSV loading flow with Binary non-sort columns throws exception Previous exception in task: Dataload failed, String length cannot exceed 32000 charactersPrevious exception in task: Dataload failed, String length cannot exceed 32000 characters org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:109) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139) at org.apache.spark.TaskContextImpl.markTaskFailed(TaskContextImpl.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:114) ... 4 more h6. was: h6. Global Sort CSV loading flow with Binary non-sort columns throws exception h6. Previous exception in task: Dataload failed, String length cannot exceed 32000 charactersPrevious exception in task: Dataload failed, String length cannot exceed 32000 characters org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:109) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139) at org.apache.spark.TaskContextImpl.markTaskFailed(TaskContextImpl.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:114) ... 4 more2019-12-28 17:37:47 ERROR TaskSetManager:70 - Task 0 in stage 0.0
[jira] [Updated] (CARBONDATA-3639) Global Sort CSV loading flow with Binary non-sort columns throws exception
[ https://issues.apache.org/jira/browse/CARBONDATA-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ajantha Bhat updated CARBONDATA-3639: - Description: h6. Global Sort CSV loading flow with Binary non-sort columns throws exception h6. Previous exception in task: Dataload failed, String length cannot exceed 32000 charactersPrevious exception in task: Dataload failed, String length cannot exceed 32000 characters org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:109) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139) at org.apache.spark.TaskContextImpl.markTaskFailed(TaskContextImpl.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:114) ... 4 more2019-12-28 17:37:47 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 1 times; aborting job2019-12-28 17:37:47 ERROR CarbonDataRDDFactory$:429 - org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.Exception: Dataload failed, String length cannot exceed 32000 characters at org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) at org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) at org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: org.apache.spark.util.TaskCompletionListenerException: Exception 0:
[jira] [Created] (CARBONDATA-3639) Global Sort CSV loading flow with Binary non-sort columns throws exception
Ajantha Bhat created CARBONDATA-3639: Summary: Global Sort CSV loading flow with Binary non-sort columns throws exception Key: CARBONDATA-3639 URL: https://issues.apache.org/jira/browse/CARBONDATA-3639 Project: CarbonData Issue Type: Bug Reporter: Ajantha Bhat Assignee: Ajantha Bhat Global Sort CSV loading flow with Binary non-sort columns throws exception Previous exception in task: Dataload failed, String length cannot exceed 32000 charactersPrevious exception in task: Dataload failed, String length cannot exceed 32000 characters org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:109) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139) at org.apache.spark.TaskContextImpl.markTaskFailed(TaskContextImpl.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:114) ... 4 more2019-12-28 17:37:47 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 1 times; aborting job2019-12-28 17:37:47 ERROR CarbonDataRDDFactory$:429 - org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.Exception: Dataload failed, String length cannot exceed 32000 characters at org.apache.carbondata.streaming.parser.FieldConverter$.objectToString(FieldConverter.scala:53) at org.apache.carbondata.spark.util.CarbonScalaUtil$.getString(CarbonScalaUtil.scala:71) at org.apache.carbondata.spark.rdd.NewRddIterator$$anonfun$next$1.apply$mcVI$sp(NewCarbonDataLoadRDD.scala:358) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.carbondata.spark.rdd.NewRddIterator.next(NewCarbonDataLoadRDD.scala:357) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:66) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$1.next(DataLoadProcessorStepOnSpark.scala:61) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:92) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$2.next(DataLoadProcessorStepOnSpark.scala:83) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:253) at org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark$$anon$6.next(DataLoadProcessorStepOnSpark.scala:248) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
[GitHub] [carbondata] Indhumathi27 opened a new pull request #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive
Indhumathi27 opened a new pull request #3541: [WIP]Timeseries query is not hitting datamap if granularity in query is given case insensitive URL: https://github.com/apache/carbondata/pull/3541 Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns
CarbonDataQA1 commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns URL: https://github.com/apache/carbondata/pull/3504#issuecomment-569402137 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1336/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3638) For CSV loading flow, avoid creation of new String object as it is already String object
Ajantha Bhat created CARBONDATA-3638: Summary: For CSV loading flow, avoid creation of new String object as it is already String object Key: CARBONDATA-3638 URL: https://issues.apache.org/jira/browse/CARBONDATA-3638 Project: CarbonData Issue Type: Sub-task Reporter: Ajantha Bhat Assignee: Ajantha Bhat -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] niuge01 removed a comment on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
niuge01 removed a comment on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569400607 please test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] niuge01 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
niuge01 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569401658 please test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3637) Improve insert into performance and decrease memory foot print
Ajantha Bhat created CARBONDATA-3637: Summary: Improve insert into performance and decrease memory foot print Key: CARBONDATA-3637 URL: https://issues.apache.org/jira/browse/CARBONDATA-3637 Project: CarbonData Issue Type: Improvement Reporter: Ajantha Bhat Assignee: Ajantha Bhat Currently carbondata "insert into" uses the CarbonLoadDataCommand itself. Load process has steps like parsing and converter step with bad record support. Insert into doesn't require these steps as data is already validated and converted from source table or dataframe. Some identified changes are below. 1. Need to refactor and separate load and insert at driver side to skip converter step and unify flow for No sort and global sort insert. 2. Need to avoid reorder of each row. By changing select dataframe's projection order itself during the insert into. 3. For carbon to carbon insert, need to provide the ReadSupport and use RecordReader (vector reader currently doesn't support ReadSupport) to handle null values, time stamp cutoff (direct dictionary) from scanRDD result. 4. Need to handle insert into partition/non-partition table in local sort, global sort, no sort, range columns, compaction flow. The final goal is to improve insert performance by keeping only required logic and also decrease the memory footprint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (CARBONDATA-3636) Timeseries query is not hitting datamap if granularity in query is case insensitive
Indhumathi Muthumurugesh created CARBONDATA-3636: Summary: Timeseries query is not hitting datamap if granularity in query is case insensitive Key: CARBONDATA-3636 URL: https://issues.apache.org/jira/browse/CARBONDATA-3636 Project: CarbonData Issue Type: Bug Reporter: Indhumathi Muthumurugesh -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature
CarbonDataQA1 commented on issue #3502: [CARBONATA-3605] Remove global dictionary feature URL: https://github.com/apache/carbondata/pull/3502#issuecomment-569401521 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1317/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns
CarbonDataQA1 commented on issue #3504: [CARBONDATA-3614] Support Alter table properties set/unset for longstring columns URL: https://github.com/apache/carbondata/pull/3504#issuecomment-569401428 Build Success with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1323/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV
CarbonDataQA1 commented on issue #3530: [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV URL: https://github.com/apache/carbondata/pull/3530#issuecomment-569401396 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1316/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3509: [CARBONDATA-3618] Update query should throw exception if key has more than one value
CarbonDataQA1 commented on issue #3509: [CARBONDATA-3618] Update query should throw exception if key has more than one value URL: https://github.com/apache/carbondata/pull/3509#issuecomment-569401263 Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1335/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3635) 【Carbon-Flink】Reduce the time interval at which data is visible
Xingjun Hao created CARBONDATA-3635: --- Summary: 【Carbon-Flink】Reduce the time interval at which data is visible Key: CARBONDATA-3635 URL: https://issues.apache.org/jira/browse/CARBONDATA-3635 Project: CarbonData Issue Type: Improvement Reporter: Xingjun Hao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [WIP] Separate Insert and load to later optimize insert.
CarbonDataQA1 commented on issue #3538: [WIP] Separate Insert and load to later optimize insert. URL: https://github.com/apache/carbondata/pull/3538#issuecomment-569400849 Build Failed with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/1333/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] niuge01 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table
niuge01 commented on issue #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#issuecomment-569400607 please test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3538: [WIP] Separate Insert and load to later optimize insert.
CarbonDataQA1 commented on issue #3538: [WIP] Separate Insert and load to later optimize insert. URL: https://github.com/apache/carbondata/pull/3538#issuecomment-569400483 Build Failed with Spark 2.2.1, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.2/1320/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (CARBONDATA-3634) Flink Loading support Complex\Array\Map\Binary
Xingjun Hao created CARBONDATA-3634: --- Summary: Flink Loading support Complex\Array\Map\Binary Key: CARBONDATA-3634 URL: https://issues.apache.org/jira/browse/CARBONDATA-3634 Project: CarbonData Issue Type: New Feature Reporter: Xingjun Hao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [carbondata] CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code
CarbonDataQA1 commented on issue #3539: [HOTFIX] Optimize array length in loop in scala code URL: https://github.com/apache/carbondata/pull/3539#issuecomment-569400341 Build Success with Spark 2.1.0, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.1/1315/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services