Repository: carbondata
Updated Branches:
  refs/heads/master fb1516c00 -> 248e0c850


[CARBONDATA-2314] Removed block for Streaming with Preaggregate table

1. Removed block for Streaming with Preaggregate table
2. Added test case
3. Fixed loading issue if aggregate table is created after load

This closes #2137


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/248e0c85
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/248e0c85
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/248e0c85

Branch: refs/heads/master
Commit: 248e0c85031fe347a667b9f832c7248d0cb9f166
Parents: fb1516c
Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com>
Authored: Tue Apr 3 17:50:21 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Thu Apr 5 13:54:53 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 10 +--
 .../preaaggregate/PreAggregateTableHelper.scala | 11 ++-
 .../sql/execution/strategy/DDLStrategy.scala    |  4 -
 .../strategy/StreamingTableStrategy.scala       |  2 +-
 .../TestStreamingTableOperation.scala           | 88 ++++++++++++++++++--
 5 files changed, 94 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 29acfff..efb20eb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -570,12 +570,12 @@ class CarbonScanRDD(
         .getProperty(queryOnPreAggStreamingKey, "false").toBoolean
       CarbonInputFormat.setAccessStreamingSegments(conf, 
queryOnPreAggStreaming)
       val inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + 
tableUniqueKey
+      CarbonInputFormat.setValidateSegmentsToAccess(conf, 
carbonSessionInfo.getThreadParams
+        .getProperty(validateInputSegmentsKey, "true").toBoolean)
+      CarbonInputFormat
+        .setQuerySegment(conf,
+          carbonSessionInfo.getThreadParams.getProperty(inputSegmentsKey, "*"))
       if(queryOnPreAggStreaming) {
-        CarbonInputFormat.setValidateSegmentsToAccess(conf, 
carbonSessionInfo.getThreadParams
-          .getProperty(validateInputSegmentsKey, "true").toBoolean)
-        CarbonInputFormat
-          .setQuerySegment(conf,
-            carbonSessionInfo.getThreadParams.getProperty(inputSegmentsKey, 
"*"))
         
carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey)
         carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey)
         
carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index 71545e7..94a8e81 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -183,8 +183,11 @@ case class PreAggregateTableHelper(
     }
     // check if any segment if available for load in the parent table
     val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
-      .filter(segment => segment.getSegmentStatus == SegmentStatus.SUCCESS ||
-                         segment.getSegmentStatus == 
SegmentStatus.LOAD_PARTIAL_SUCCESS)
+      .collect {
+        case segment if segment.getSegmentStatus == SegmentStatus.SUCCESS ||
+          segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS =>
+          segment.getLoadName
+      }
     if (loadAvailable.nonEmpty) {
       // Passing segmentToLoad as * because we want to load all the segments 
into the
       // pre-aggregate table even if the user has set some segments on the 
parent table.
@@ -192,8 +195,8 @@ case class PreAggregateTableHelper(
         .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
       PreAggregateUtil.startDataLoadForDataMap(
         TableIdentifier(parentTable.getTableName, 
Some(parentTable.getDatabaseName)),
-        segmentToLoad = "*",
-        validateSegments = true,
+        segmentToLoad = loadAvailable.mkString(","),
+        validateSegments = false,
         loadCommand,
         isOverwrite = false,
         sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7c5b0f0..a5a96af 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -231,10 +231,6 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         // if the table has 'preaggregate' DataMap, it doesn't support 
streaming now
         val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           
.lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-        if (carbonTable.hasAggregationDataMap) {
-          throw new MalformedCarbonCommandException(
-            "The table has 'preaggregate' DataMap, it doesn't support 
streaming")
-        }
 
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 7028dcf..f9c6c5f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -42,7 +42,7 @@ private[sql] class StreamingTableStrategy(sparkSession: 
SparkSession) extends Sp
       case CarbonProjectForDeleteCommand(_, databaseNameOp, tableName, 
timestamp) =>
         rejectIfStreamingTable(
           TableIdentifier(tableName, databaseNameOp),
-          "Date delete")
+          "Data delete")
         Nil
       case CarbonAlterTableAddColumnCommand(model) =>
         rejectIfStreamingTable(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/248e0c85/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 9761671..71ce2b2 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -157,7 +157,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       sql("""DELETE FROM source WHERE d.c1 = 'a'""").show()
     }
     assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed 
for streaming table"))
-    assert(exceptionMsgDelete.getMessage.equals("Date delete is not allowed 
for streaming table"))
+    assert(exceptionMsgDelete.getMessage.equals("Data delete is not allowed 
for streaming table"))
   }
 
   test("test blocking alter table operation on streaming table") {
@@ -336,6 +336,86 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table agg_table2")
   }
 
+  test("test whether data is loaded into preaggregate after handoff is fired") 
{
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = 
false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = 
CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, 
intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql(s"load data inpath '$csvDataDir' into table agg_table2 
options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, 
updated, file')")
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select 
name, sum(salary) from agg_table2 group by name")
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"),
+        Seq(
+          Row("name_10", 400000.0),
+          Row("name_14", 560000.0),
+          Row("name_12", 480000.0),
+          Row("name_11", 440000.0),
+          Row("name_13", 520000.0)))
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0),
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+
+    sql("drop table agg_table2")
+  }
+
+  test("test whether data is loaded into preaggregate before handoff is 
fired") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = 
false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = 
CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, 
intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql(s"load data inpath '$csvDataDir' into table agg_table2 
options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, 
updated, file')")
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select 
name, sum(salary) from agg_table2 group by name")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"),
+      Seq(
+        Row("name_10", 400000.0),
+        Row("name_14", 560000.0),
+        Row("name_12", 480000.0),
+        Row("name_11", 440000.0),
+        Row("name_13", 520000.0)))
+    //    sql("select * from agg_table2_p1").show()
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+
+    sql("drop table agg_table2")
+  }
+
   test("test if timeseries load is successful when created on streaming 
table") {
     sql("drop table if exists timeseries_table")
     createTable(tableName = "timeseries_table", streaming = true, 
withBatchLoad = false)
@@ -1419,12 +1499,6 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     assertResult("true")(resultStreaming(0).getString(1).trim)
   }
 
-  test("block streaming for 'preaggregate' table") {
-    sql("create datamap agg_table_block_agg0 on table 
streaming.agg_table_block using 'preaggregate' as select city, count(name) from 
streaming.agg_table_block group by city")
-    val msg = intercept[MalformedCarbonCommandException](sql("ALTER TABLE 
streaming.agg_table_block SET TBLPROPERTIES('streaming'='true')"))
-    assertResult("The table has 'preaggregate' DataMap, it doesn't support 
streaming")(msg.getMessage)
-  }
-
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,

Reply via email to