Repository: carbondata Updated Branches: refs/heads/master fb1516c00 -> 248e0c850
[CARBONDATA-2314] Removed block for Streaming with Preaggregate table 1. Removed block for Streaming with Preaggregate table 2. Added test case 3. Fixed loading issue if aggregate table is created after load This closes #2137 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/248e0c85 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/248e0c85 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/248e0c85 Branch: refs/heads/master Commit: 248e0c85031fe347a667b9f832c7248d0cb9f166 Parents: fb1516c Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com> Authored: Tue Apr 3 17:50:21 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Thu Apr 5 13:54:53 2018 +0530 ---------------------------------------------------------------------- .../carbondata/spark/rdd/CarbonScanRDD.scala | 10 +-- .../preaaggregate/PreAggregateTableHelper.scala | 11 ++- .../sql/execution/strategy/DDLStrategy.scala | 4 - .../strategy/StreamingTableStrategy.scala | 2 +- .../TestStreamingTableOperation.scala | 88 ++++++++++++++++++-- 5 files changed, 94 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 29acfff..efb20eb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -570,12 +570,12 @@ class CarbonScanRDD( .getProperty(queryOnPreAggStreamingKey, "false").toBoolean CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming) val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey + CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams + .getProperty(validateInputSegmentsKey, "true").toBoolean) + CarbonInputFormat + .setQuerySegment(conf, + carbonSessionInfo.getThreadParams.getProperty(inputSegmentsKey, "*")) if(queryOnPreAggStreaming) { - CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getThreadParams - .getProperty(validateInputSegmentsKey, "true").toBoolean) - CarbonInputFormat - .setQuerySegment(conf, - carbonSessionInfo.getThreadParams.getProperty(inputSegmentsKey, "*")) carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey) carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey) carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey) http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala index 71545e7..94a8e81 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -183,8 +183,11 @@ case class PreAggregateTableHelper( } // check if any segment if available for load in the parent table val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) - .filter(segment => segment.getSegmentStatus == SegmentStatus.SUCCESS || - segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) + .collect { + case segment if segment.getSegmentStatus == SegmentStatus.SUCCESS || + segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS => + segment.getLoadName + } if (loadAvailable.nonEmpty) { // Passing segmentToLoad as * because we want to load all the segments into the // pre-aggregate table even if the user has set some segments on the parent table. @@ -192,8 +195,8 @@ case class PreAggregateTableHelper( .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) PreAggregateUtil.startDataLoadForDataMap( TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)), - segmentToLoad = "*", - validateSegments = true, + segmentToLoad = loadAvailable.mkString(","), + validateSegments = false, loadCommand, isOverwrite = false, sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 7c5b0f0..a5a96af 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -231,10 +231,6 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { // if the table has 'preaggregate' DataMap, it doesn't support streaming now val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable - if (carbonTable.hasAggregationDataMap) { - throw new MalformedCarbonCommandException( - "The table has 'preaggregate' DataMap, it doesn't support streaming") - } // TODO remove this limitation later val property = properties.find(_._1.equalsIgnoreCase("streaming")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala index 7028dcf..f9c6c5f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -42,7 +42,7 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp case CarbonProjectForDeleteCommand(_, databaseNameOp, tableName, timestamp) => rejectIfStreamingTable( TableIdentifier(tableName, databaseNameOp), - "Date delete") + "Data delete") Nil case CarbonAlterTableAddColumnCommand(model) => rejectIfStreamingTable( http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 9761671..71ce2b2 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -157,7 +157,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("""DELETE FROM source WHERE d.c1 = 'a'""").show() } assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed for streaming table")) - assert(exceptionMsgDelete.getMessage.equals("Date delete is not allowed for streaming table")) + assert(exceptionMsgDelete.getMessage.equals("Data delete is not allowed for streaming table")) } test("test blocking alter table operation on streaming table") { @@ -336,6 +336,86 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table agg_table2") } + test("test whether data is loaded into preaggregate after handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')") + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"), + Seq( + Row("name_10", 400000.0), + Row("name_14", 560000.0), + Row("name_12", 480000.0), + Row("name_11", 440000.0), + Row("name_13", 520000.0))) + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0), + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + + sql("drop table agg_table2") + } + + test("test whether data is loaded into preaggregate before handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')") + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"), + Seq( + Row("name_10", 400000.0), + Row("name_14", 560000.0), + Row("name_12", 480000.0), + Row("name_11", 440000.0), + Row("name_13", 520000.0))) + // sql("select * from agg_table2_p1").show() + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + + sql("drop table agg_table2") + } + test("test if timeseries load is successful when created on streaming table") { sql("drop table if exists timeseries_table") createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false) @@ -1419,12 +1499,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult("true")(resultStreaming(0).getString(1).trim) } - test("block streaming for 'preaggregate' table") { - sql("create datamap agg_table_block_agg0 on table streaming.agg_table_block using 'preaggregate' as select city, count(name) from streaming.agg_table_block group by city") - val msg = intercept[MalformedCarbonCommandException](sql("ALTER TABLE streaming.agg_table_block SET TBLPROPERTIES('streaming'='true')")) - assertResult("The table has 'preaggregate' DataMap, it doesn't support streaming")(msg.getMessage) - } - def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int,