[CARBONDATA-2463] concurrent insert requires separtate temp path which is differentiated with seg_id only
issue : for non-transaction table we are not setting any segment-id in loadmodel . temp location contains databasenaem_tablename_segId_taskid . during concurrent insert in same table, temp folder has to be created at separate location . only segment_id value will change during two concurrent insert. solution: insert flow for external table is same as carbon table., and we are not storing segment info to anywhere. so dummy unique segment-id can be assigned to loadmodel for nontransactional table. This closes #2284 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc4b7f9b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc4b7f9b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc4b7f9b Branch: refs/heads/spark-2.3 Commit: fc4b7f9b96eebdcaf6cd513d911a21761bbc5a77 Parents: 3d23fa6 Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Tue May 8 17:11:41 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Tue May 15 13:17:15 2018 +0530 ---------------------------------------------------------------------- .../TestNonTransactionalCarbonTable.scala | 43 +++++++++++++++++++- .../management/CarbonLoadDataCommand.scala | 2 + .../sdk/file/CarbonWriterBuilder.java | 4 +- .../sdk/file/CSVCarbonWriterTest.java | 2 +- .../CSVNonTransactionalCarbonWriterTest.java | 4 +- 5 files changed, 49 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index fb9c862..5ab1c60 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -19,8 +19,10 @@ package org.apache.carbondata.spark.testsuite.createTable import java.sql.Timestamp import java.io.{File, FileFilter, IOException} -import java.io.{File, FileFilter} import java.util +import java.util.concurrent.TimeUnit + +import scala.concurrent.ExecutionContext.Implicits.global import org.apache.commons.io.FileUtils import org.apache.spark.sql.Row @@ -35,6 +37,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration import org.apache.avro import org.apache.commons.lang.CharEncoding @@ -263,6 +267,43 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS sdkOutputTable") } + test("concurrently insert operation"){ + cleanTestData() + buildTestDataSingleFile() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + // with partition + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY + |'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("drop table if exists t1") + sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'") + var i =0; + while (i<50){ + sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false) + i = i+1; + } + checkAnswer(sql("select count(*) from t1"),Seq(Row(50))) + val one = Future { + sql("insert into sdkOutputTable select * from t1 ") + } + val two = Future { + sql("insert into sdkOutputTable select * from t1 ") + } + + Await.result(Future.sequence(Seq(one, two)), Duration(300, TimeUnit.SECONDS)) + + checkAnswer(sql("select count(*) from sdkOutputTable"),Seq(Row(103))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + test( "Read two sdk writer outputs before and after deleting the existing files and creating new " + "files with same schema and UUID") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 3e306fb..3bef4b6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -272,6 +272,8 @@ case class CarbonLoadDataCommand( if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } + } else { + carbonLoadModel.setSegmentId(System.currentTimeMillis().toString) } val partitionStatus = SegmentStatus.SUCCESS val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 1816539..a01f0d7 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -109,8 +109,8 @@ public class CarbonWriterBuilder { * by default it is system time in nano seconds. * @return updated CarbonWriterBuilder */ - public CarbonWriterBuilder taskNo(String taskNo) { - this.taskNo = taskNo; + public CarbonWriterBuilder taskNo(long taskNo) { + this.taskNo = String.valueOf(taskNo); return this; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index ba3d3ac..d68d85b 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -257,7 +257,7 @@ public class CSVCarbonWriterTest { try { CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(new Schema(fields)) - .isTransactionalTable(true).taskNo("5") + .isTransactionalTable(true).taskNo(5) .outputPath(path); CarbonWriter writer = builder.buildWriterForCSVInput(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc4b7f9b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java index 32fe6e8..19b0a42 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java @@ -96,7 +96,7 @@ public class CSVNonTransactionalCarbonWriterTest { .withSchema(schema) .isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()) - .taskNo(Long.toString(System.nanoTime())) + .taskNo(System.nanoTime()) .outputPath(path); if (sortColumns != null) { builder = builder.sortBy(sortColumns); @@ -160,7 +160,7 @@ public class CSVNonTransactionalCarbonWriterTest { .withSchema(new Schema(fields)) .uniqueIdentifier(System.currentTimeMillis()) .isTransactionalTable(false) - .taskNo(Long.toString(System.nanoTime())) + .taskNo(System.nanoTime()) .outputPath(path); CarbonWriter writer = builder.buildWriterForCSVInput();