This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b121ac6  [CARBONDATA-3980] Load fails with aborted exception when Bad 
records action is unspecified
b121ac6 is described below

commit b121ac6c18d03d30a22974bad7504870dd80b41a
Author: ShreelekhyaG <shreelu_ga...@yahoo.com>
AuthorDate: Thu Sep 10 16:30:05 2020 +0530

    [CARBONDATA-3980] Load fails with aborted exception when Bad records action 
is unspecified
    
    Why is this PR needed?
    Load fails with aborted exception when Bad records action is unspecified.
    
    When the partition column is loaded with a bad record value, load fails 
with 'Job aborted' message in cluster. However in complete stack trace we can 
see the actual error message. (Like, 'Data load failed due to bad record: The 
value with column name projectjoindate and column data type TIMESTAMP is not a 
valid TIMESTAMP type')
    
    What changes were proposed in this PR?
    Fix bad record error message for the partition column. Added the error 
message to operationContext map and if its not null throwing exception with 
errorMessage from CarbonLoadDataCommand.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3919
---
 .../command/management/CarbonLoadDataCommand.scala    |  7 ++++++-
 .../command/management/CommonLoadUtils.scala          |  1 +
 .../StandardPartitionBadRecordLoggerTest.scala        | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b17969b..d5c3c84 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -191,7 +191,12 @@ case class CarbonLoadDataCommand(databaseNameOp: 
Option[String],
         if (isUpdateTableStatusRequired) {
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
         }
-        throw ex
+        val errorMessage = operationContext.getProperty("Error message")
+        if (errorMessage != null) {
+          throw new RuntimeException(errorMessage.toString, ex.getCause)
+        } else {
+          throw ex
+        }
     }
     Seq.empty
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f574e12..5c46127 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1064,6 +1064,7 @@ object CommonLoadUtils {
         if (loadParams.updateModel.isDefined) {
           CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, 
executorMessage)
         }
+        loadParams.operationContext.setProperty("Error message", errorMessage)
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
         throw ex
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index c19c51e..488291d 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -219,6 +219,25 @@ class StandardPartitionBadRecordLoggerTest extends 
QueryTest with BeforeAndAfter
     }
   }
 
+  test("test load with partition column having bad record value") {
+    sql("drop table if exists dataloadOptionTests")
+    sql("CREATE TABLE dataloadOptionTests (empno int, empname String, 
designation String, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, 
projectjoindate " +
+      "Timestamp, projectenddate Date,attendance int,utilization int,salary 
int) PARTITIONED BY " +
+      "(deptname String,doj Timestamp,projectcode int) STORED AS carbondata ")
+    val csvFilePath = s"$resourcesPath/data.csv"
+    val ex = intercept[RuntimeException] {
+      sql("LOAD DATA local inpath '" + csvFilePath +
+          "' INTO TABLE dataloadOptionTests OPTIONS 
('bad_records_action'='FAIL', 'DELIMITER'= '," +
+          "', 'QUOTECHAR'= '\"', 
'dateformat'='DD-MM-YYYY','timestampformat'='DD-MM-YYYY')")
+    }
+    assert(ex.getMessage.contains(
+      "DataLoad failure: Data load failed due to bad record: The value with 
column name " +
+      "projectjoindate and column data type TIMESTAMP is not a valid TIMESTAMP 
type.Please " +
+      "enable bad record logger to know the detail reason."))
+    sql("drop table dataloadOptionTests")
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS sales")
     sql("drop table IF EXISTS serializable_values")

Reply via email to