Klaus-xjp commented on a change in pull request #3947:
URL: https://github.com/apache/carbondata/pull/3947#discussion_r495646580
##
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##
@@ -92,128 +78,159 @@ class TestInsertAndOtherCommandConcurrent extends
QueryTest with BeforeAndAfterA
private def createTable(tableName: String, schema: StructType): Unit = {
val schemaString = schema.fields.map(x => x.name + " " +
x.dataType.typeName).mkString(", ")
sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata
tblproperties" +
-
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
+
-s"o_comment')")
- }
-
- override def afterAll {
-executorService.shutdownNow()
-dropTable()
+
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
+
+ s"o_comment')")
}
override def beforeEach(): Unit = {
Global.loading = false
}
- private def dropTable() = {
-sql("DROP TABLE IF EXISTS orders")
-sql("DROP TABLE IF EXISTS orders_overwrite")
- }
+ // --- INSERT OVERWRITE --
- // run the input SQL and block until it is running
- private def runSqlAsync(sql: String): Future[String] = {
-assert(!Global.loading)
-var count = 0
-val future = executorService.submit(
- new QueryTask(sql)
-)
-while (!Global.loading && count < 1000) {
- Thread.sleep(10)
- // to avoid dead loop in case WaitingIndexFactory is not invoked
- count += 1
-}
-future
+ test("insert into should fail if insert overwrite is in progress") {
+val firstCommand = "insert overwrite table orders select * from
orders_overwrite"
+val secondCommand = "insert into table orders select * from
orders_overwrite"
+
+val errorMessage = "Already insert overwrite is in progress"
+testConcurrentCommandFail(firstCommand, secondCommand,
+ ExceptionType.RUNTIME_EXCEPTION, errorMessage)
}
- // --- INSERT OVERWRITE --
+ test("insert overwrite should fail if insert overwrite is in progress") {
+val firstCommand = "insert overwrite table orders select * from
orders_overwrite"
+val secondCommand = "insert overwrite table orders select * from
orders_overwrite"
+
+val errorMessage = "Already insert overwrite is in progress"
+testConcurrentCommandFail(firstCommand, secondCommand,
+ ExceptionType.RUNTIME_EXCEPTION, errorMessage)
+ }
test("compaction should fail if insert overwrite is in progress") {
-val future = runSqlAsync("insert overwrite table orders select * from
orders_overwrite")
-val ex = intercept[ConcurrentOperationException]{
- sql("alter table orders compact 'MINOR'")
-}
-assert(future.get.contains("PASS"))
-assert(ex.getMessage.contains(
- "insert overwrite is in progress for table default.orders, compaction
operation is not allowed"))
+val firstCommand = "insert overwrite table orders select * from
orders_overwrite"
+val secondCommand = "alter table orders compact 'MINOR'"
+
+val errorMessage = "insert overwrite is in progress for table
default.orders, " +
+ "compaction operation is not allowed"
+testConcurrentCommandFail(firstCommand, secondCommand,
+ ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
}
- // block updating records from table which has index. see PR2483
- ignore("update should fail if insert overwrite is in progress") {
-val future = runSqlAsync("insert overwrite table orders select * from
orders_overwrite")
-val ex = intercept[ConcurrentOperationException] {
- sql("update orders set (o_country)=('newCountry') where
o_country='china'").show
-}
-assert(future.get.contains("PASS"))
-assert(ex.getMessage.contains(
- "loading is in progress for table default.orders, data update operation
is not allowed"))
+ test("update should fail if insert overwrite is in progress") {
+val firstCommand = "insert overwrite table orders select * from
orders_overwrite"
+val secondCommand = "update orders set (o_country)=('newCountry') where
o_country='china'"
+
+val errorMessage = "insert overwrite is in progress for table
default.orders, " +
+"data update operation is not allowed"
+testConcurrentCommandFail(firstCommand, secondCommand,
+ ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
}
- // block deleting records from table which has index. see PR2483
- ignore("delete should fail if insert overwrite is in progress") {
-val future = runSqlAsync("insert overwrite table orders select * from
orders_overwrite")
-val ex = intercept[ConcurrentOperationException] {
- sql("delete from orders where o_country='china'").show
-}
-assert(future.get.contains("PASS"))
-assert(ex.getMessage.contains(
-