[24/50] carbondata git commit: [MINOR] Adding a testcase for stream-table join in StreamSQL
[MINOR] Adding a testcase for stream-table join in StreamSQL This closes #2431 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8b313a3 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8b313a3 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8b313a3 Branch: refs/heads/branch-1.4 Commit: f8b313a398a803b958b588a3fa80e194fca98567 Parents: 985115f Author: Jacky Li Authored: Tue Jun 26 19:10:35 2018 +0800 Committer: ravipesala Committed: Tue Jul 31 00:11:26 2018 +0530 -- .../TestStreamingTableOperation.scala | 147 +-- 1 file changed, 131 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8b313a3/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala -- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 0771403..3073c59 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assert(ex.getMessage.contains("'ddd' not found")) } + test("StreamSQL: stream join dimension table") { +sql("DROP TABLE IF EXISTS source") +sql("DROP TABLE IF EXISTS sink") +sql("DROP TABLE IF EXISTS dimension") + +sql( + s""" + |CREATE TABLE dim( + | id INT, + | name STRING, + | country STRING + |) + |STORED AS carbondata + """.stripMargin) +val inputDir = integrationPath + "/spark2/target/streamDim" +import spark.implicits._ +spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, "chris", "canada"))) + .write.mode("overwrite").csv(inputDir) +sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim OPTIONS('header'='false')") +sql("SELECT * FROM dim").show + +var rows = sql("SHOW STREAMS").collect() +assertResult(0)(rows.length) + +val csvDataDir = integrationPath + "/spark2/target/streamSql" +// streaming ingest 10 rows +generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, SaveMode.Overwrite, false) + +sql( + s""" + |CREATE TABLE source( + | id INT, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + |) + |STORED AS carbondata + |TBLPROPERTIES ( + | 'streaming'='source', + | 'format'='csv', + | 'path'='$csvDataDir' + |) + """.stripMargin) + +sql( + s""" + |CREATE TABLE sink( + | id INT, + | name STRING, + | country STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + | ) + |STORED AS carbondata + |TBLPROPERTIES('streaming'='sink') + """.stripMargin) + +sql( + """ +|CREATE STREAM stream123 ON TABLE sink +|STMPROPERTIES( +| 'trigger'='ProcessingTime', +| 'interval'='1 seconds') +|AS +| SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, s.birthday, s.register, s.updated +| FROM source s +| JOIN dim d ON s.id = d.id + """.stripMargin).show(false) + +Thread.sleep(2000) +sql("select * from sink").show + +generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append, false) +Thread.sleep(5000) + +// after 2 minibatch, there should be 10 row added (filter condition: id%2=1) +checkAnswer(sql("select count(*) from sink"), Seq(Row(20))) + +sql("select * from sink order by id").show +val row = sql("select * from sink order by id, salary").head() +val exceptedRow = Row(1, "alice", "india", 12.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")) +assertResult(exceptedRow)(row) + +sql("DROP STREAM stream123") +sql("DROP TABLE IF EXISTS source") +sql("DROP TABLE IF EXISTS sink") +sql("DROP TABLE IF EXISTS dim") + } + def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int, @@ -2239,23 +2335,42 @@ class TestStreamingTableOperation extends QueryTest with
carbondata git commit: [MINOR] Adding a testcase for stream-table join in StreamSQL
Repository: carbondata Updated Branches: refs/heads/master 498502d2b -> 8840b7b56 [MINOR] Adding a testcase for stream-table join in StreamSQL This closes #2431 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8840b7b5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8840b7b5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8840b7b5 Branch: refs/heads/master Commit: 8840b7b56ba6ea180d1ee15b6e0fed9c5901ef98 Parents: 498502d Author: Jacky Li Authored: Tue Jun 26 19:10:35 2018 +0800 Committer: QiangCai Committed: Tue Jul 24 15:09:46 2018 +0800 -- .../TestStreamingTableOperation.scala | 147 +-- 1 file changed, 131 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8840b7b5/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala -- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 0771403..3073c59 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assert(ex.getMessage.contains("'ddd' not found")) } + test("StreamSQL: stream join dimension table") { +sql("DROP TABLE IF EXISTS source") +sql("DROP TABLE IF EXISTS sink") +sql("DROP TABLE IF EXISTS dimension") + +sql( + s""" + |CREATE TABLE dim( + | id INT, + | name STRING, + | country STRING + |) + |STORED AS carbondata + """.stripMargin) +val inputDir = integrationPath + "/spark2/target/streamDim" +import spark.implicits._ +spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, "chris", "canada"))) + .write.mode("overwrite").csv(inputDir) +sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim OPTIONS('header'='false')") +sql("SELECT * FROM dim").show + +var rows = sql("SHOW STREAMS").collect() +assertResult(0)(rows.length) + +val csvDataDir = integrationPath + "/spark2/target/streamSql" +// streaming ingest 10 rows +generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, SaveMode.Overwrite, false) + +sql( + s""" + |CREATE TABLE source( + | id INT, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + |) + |STORED AS carbondata + |TBLPROPERTIES ( + | 'streaming'='source', + | 'format'='csv', + | 'path'='$csvDataDir' + |) + """.stripMargin) + +sql( + s""" + |CREATE TABLE sink( + | id INT, + | name STRING, + | country STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + | ) + |STORED AS carbondata + |TBLPROPERTIES('streaming'='sink') + """.stripMargin) + +sql( + """ +|CREATE STREAM stream123 ON TABLE sink +|STMPROPERTIES( +| 'trigger'='ProcessingTime', +| 'interval'='1 seconds') +|AS +| SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, s.birthday, s.register, s.updated +| FROM source s +| JOIN dim d ON s.id = d.id + """.stripMargin).show(false) + +Thread.sleep(2000) +sql("select * from sink").show + +generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append, false) +Thread.sleep(5000) + +// after 2 minibatch, there should be 10 row added (filter condition: id%2=1) +checkAnswer(sql("select count(*) from sink"), Seq(Row(20))) + +sql("select * from sink order by id").show +val row = sql("select * from sink order by id, salary").head() +val exceptedRow = Row(1, "alice", "india", 12.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")) +assertResult(exceptedRow)(row) + +sql("DROP STREAM stream123") +sql("DROP TABLE IF EXISTS source") +sql("DROP TABLE IF EXISTS sink") +sql("DROP TABLE IF EXISTS dim") + } + def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int,