[24/50] carbondata git commit: [MINOR] Adding a testcase for stream-table join in StreamSQL

2018-07-30 Thread ravipesala
[MINOR] Adding a testcase for stream-table join in StreamSQL

This closes #2431


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8b313a3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8b313a3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8b313a3

Branch: refs/heads/branch-1.4
Commit: f8b313a398a803b958b588a3fa80e194fca98567
Parents: 985115f
Author: Jacky Li 
Authored: Tue Jun 26 19:10:35 2018 +0800
Committer: ravipesala 
Committed: Tue Jul 31 00:11:26 2018 +0530

--
 .../TestStreamingTableOperation.scala   | 147 +--
 1 file changed, 131 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8b313a3/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
--
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 0771403..3073c59 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 assert(ex.getMessage.contains("'ddd' not found"))
   }
 
+  test("StreamSQL: stream join dimension table") {
+sql("DROP TABLE IF EXISTS source")
+sql("DROP TABLE IF EXISTS sink")
+sql("DROP TABLE IF EXISTS dimension")
+
+sql(
+  s"""
+ |CREATE TABLE dim(
+ |  id INT,
+ |  name STRING,
+ |  country STRING
+ |)
+ |STORED AS carbondata
+   """.stripMargin)
+val inputDir = integrationPath + "/spark2/target/streamDim"
+import spark.implicits._
+spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, 
"chris", "canada")))
+  .write.mode("overwrite").csv(inputDir)
+sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim 
OPTIONS('header'='false')")
+sql("SELECT * FROM dim").show
+
+var rows = sql("SHOW STREAMS").collect()
+assertResult(0)(rows.length)
+
+val csvDataDir = integrationPath + "/spark2/target/streamSql"
+// streaming ingest 10 rows
+generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, 
SaveMode.Overwrite, false)
+
+sql(
+  s"""
+ |CREATE TABLE source(
+ | id INT,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP
+ |)
+ |STORED AS carbondata
+ |TBLPROPERTIES (
+ | 'streaming'='source',
+ | 'format'='csv',
+ | 'path'='$csvDataDir'
+ |)
+  """.stripMargin)
+
+sql(
+  s"""
+ |CREATE TABLE sink(
+ | id INT,
+ | name STRING,
+ | country STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP
+ | )
+ |STORED AS carbondata
+ |TBLPROPERTIES('streaming'='sink')
+  """.stripMargin)
+
+sql(
+  """
+|CREATE STREAM stream123 ON TABLE sink
+|STMPROPERTIES(
+|  'trigger'='ProcessingTime',
+|  'interval'='1 seconds')
+|AS
+|  SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, 
s.birthday, s.register, s.updated
+|  FROM source s
+|  JOIN dim d ON s.id = d.id
+  """.stripMargin).show(false)
+
+Thread.sleep(2000)
+sql("select * from sink").show
+
+generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, 
SaveMode.Append, false)
+Thread.sleep(5000)
+
+// after 2 minibatch, there should be 10 row added (filter condition: 
id%2=1)
+checkAnswer(sql("select count(*) from sink"), Seq(Row(20)))
+
+sql("select * from sink order by id").show
+val row = sql("select * from sink order by id, salary").head()
+val exceptedRow = Row(1, "alice", "india", 12.0, 
BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 
10:01:01.0"))
+assertResult(exceptedRow)(row)
+
+sql("DROP STREAM stream123")
+sql("DROP TABLE IF EXISTS source")
+sql("DROP TABLE IF EXISTS sink")
+sql("DROP TABLE IF EXISTS dim")
+  }
+
   def createWriteSocketThread(
   serverSocket: ServerSocket,
   writeNums: Int,
@@ -2239,23 +2335,42 @@ class TestStreamingTableOperation extends QueryTest 
with 

carbondata git commit: [MINOR] Adding a testcase for stream-table join in StreamSQL

2018-07-24 Thread qiangcai
Repository: carbondata
Updated Branches:
  refs/heads/master 498502d2b -> 8840b7b56


[MINOR] Adding a testcase for stream-table join in StreamSQL

This closes #2431


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8840b7b5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8840b7b5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8840b7b5

Branch: refs/heads/master
Commit: 8840b7b56ba6ea180d1ee15b6e0fed9c5901ef98
Parents: 498502d
Author: Jacky Li 
Authored: Tue Jun 26 19:10:35 2018 +0800
Committer: QiangCai 
Committed: Tue Jul 24 15:09:46 2018 +0800

--
 .../TestStreamingTableOperation.scala   | 147 +--
 1 file changed, 131 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8840b7b5/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
--
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 0771403..3073c59 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 assert(ex.getMessage.contains("'ddd' not found"))
   }
 
+  test("StreamSQL: stream join dimension table") {
+sql("DROP TABLE IF EXISTS source")
+sql("DROP TABLE IF EXISTS sink")
+sql("DROP TABLE IF EXISTS dimension")
+
+sql(
+  s"""
+ |CREATE TABLE dim(
+ |  id INT,
+ |  name STRING,
+ |  country STRING
+ |)
+ |STORED AS carbondata
+   """.stripMargin)
+val inputDir = integrationPath + "/spark2/target/streamDim"
+import spark.implicits._
+spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, 
"chris", "canada")))
+  .write.mode("overwrite").csv(inputDir)
+sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim 
OPTIONS('header'='false')")
+sql("SELECT * FROM dim").show
+
+var rows = sql("SHOW STREAMS").collect()
+assertResult(0)(rows.length)
+
+val csvDataDir = integrationPath + "/spark2/target/streamSql"
+// streaming ingest 10 rows
+generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, 
SaveMode.Overwrite, false)
+
+sql(
+  s"""
+ |CREATE TABLE source(
+ | id INT,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP
+ |)
+ |STORED AS carbondata
+ |TBLPROPERTIES (
+ | 'streaming'='source',
+ | 'format'='csv',
+ | 'path'='$csvDataDir'
+ |)
+  """.stripMargin)
+
+sql(
+  s"""
+ |CREATE TABLE sink(
+ | id INT,
+ | name STRING,
+ | country STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP
+ | )
+ |STORED AS carbondata
+ |TBLPROPERTIES('streaming'='sink')
+  """.stripMargin)
+
+sql(
+  """
+|CREATE STREAM stream123 ON TABLE sink
+|STMPROPERTIES(
+|  'trigger'='ProcessingTime',
+|  'interval'='1 seconds')
+|AS
+|  SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, 
s.birthday, s.register, s.updated
+|  FROM source s
+|  JOIN dim d ON s.id = d.id
+  """.stripMargin).show(false)
+
+Thread.sleep(2000)
+sql("select * from sink").show
+
+generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, 
SaveMode.Append, false)
+Thread.sleep(5000)
+
+// after 2 minibatch, there should be 10 row added (filter condition: 
id%2=1)
+checkAnswer(sql("select count(*) from sink"), Seq(Row(20)))
+
+sql("select * from sink order by id").show
+val row = sql("select * from sink order by id, salary").head()
+val exceptedRow = Row(1, "alice", "india", 12.0, 
BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 
10:01:01.0"))
+assertResult(exceptedRow)(row)
+
+sql("DROP STREAM stream123")
+sql("DROP TABLE IF EXISTS source")
+sql("DROP TABLE IF EXISTS sink")
+sql("DROP TABLE IF EXISTS dim")
+  }
+
   def createWriteSocketThread(
   serverSocket: ServerSocket,
   writeNums: Int,