This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 3098fec [CARBONDATA-3599] Support insert data from stage files written by SDK 3098fec is described below commit 3098fece425ec22282dd5f9d0b0508929a524a0e Author: Jacky Li <jacky.li...@qq.com> AuthorDate: Sun Dec 22 15:46:43 2019 +0800 [CARBONDATA-3599] Support insert data from stage files written by SDK A new SQL command is added to insert data from stage files written by SDK, like in Flink application. Usage: Use carbondata-flink module to write data to HDFS, S3, etc Then use following command to trigger loading into carbon table periodically. INSERT INTO dbName.tableName STAGE For more detail, see TestCarbonWriter.scala This closes #3488 --- .../core/datastore/impl/FileFactory.java | 63 ++++ .../core/indexstore/ExtendedBlocklet.java | 6 +- .../apache/carbondata/core/locks/LockUsage.java | 1 + .../core/reader/CarbonFooterReaderV3.java | 28 +- .../carbondata/core/statusmanager/StageInput.java | 20 ++ .../core/util/DataFileFooterConverterV3.java | 3 +- .../carbondata/core/util/path/CarbonTablePath.java | 59 ++-- .../apache/carbondata/hadoop/CarbonInputSplit.java | 2 +- docs/dml-of-carbondata.md | 17 + .../org/apache/carbon/flink/TestCarbonWriter.scala | 29 +- .../TestCreateTableWithBlockletSize.scala | 3 +- .../TestNonTransactionalCarbonTable.scala | 3 +- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 87 ++++- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 + .../spark/rdd/CarbonTableCompactor.scala | 77 +---- .../management/CarbonInsertFromStageCommand.scala | 379 +++++++++++++++++++++ .../spark/sql/parser/CarbonSpark2SqlParser.scala | 11 +- .../processing/util/CarbonLoaderUtil.java | 18 + .../carbondata/sdk/file/CarbonSchemaReader.java | 3 +- .../carbondata/sdk/file/CSVCarbonWriterTest.java | 2 +- .../java/org/apache/carbondata/tool/DataFile.java | 2 +- 21 files changed, 690 insertions(+), 125 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 711996f..d1d8c33 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -17,19 +17,30 @@ package org.apache.carbondata.core.datastore.impl; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.lang.reflect.Method; import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.util.List; import java.util.Locale; +import java.util.stream.Collectors; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.commons.io.FileUtils; @@ -614,4 +625,56 @@ public final class FileFactory { public static short getDefaultReplication(String path) { return getCarbonFile(path).getDefaultReplication(); } + + /** + * Write content into specified file path + * @param content content to write + * @param filePath file path to write + * @throws IOException if IO errors + */ + public static void writeFile(String content, String filePath) throws IOException { + AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath); + BufferedWriter brWriter = null; + DataOutputStream dataOutputStream = null; + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + brWriter.write(content); + } catch (IOException ie) { + LOGGER.error("Error message: " + ie.getLocalizedMessage()); + fileWrite.setFailed(); + throw ie; + } finally { + try { + CarbonUtil.closeStreams(brWriter); + } finally { + fileWrite.close(); + } + } + } + + /** + * Read all lines in a specified file + * + * @param filePath file to read + * @param conf hadoop configuration + * @return file content + * @throws IOException if IO errors + */ + public static List<String> readLinesInFile( + String filePath, Configuration conf) throws IOException { + DataInputStream fileReader = null; + BufferedReader bufferedReader = null; + try { + fileReader = FileFactory.getDataInputStream(filePath, -1, conf); + bufferedReader = + new BufferedReader( + new InputStreamReader( + fileReader, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + return bufferedReader.lines().collect(Collectors.toList()); + } finally { + CarbonUtil.closeStreams(fileReader, bufferedReader); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index e391602..b20689c 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -51,11 +51,7 @@ public class ExtendedBlocklet extends Blocklet { public ExtendedBlocklet(String filePath, String blockletId, boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) { super(filePath, blockletId, compareBlockletIdForObjectMatching); - try { - this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, -1, version, null); - } catch (IOException e) { - throw new RuntimeException(e); - } + this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, -1, version, null); } public ExtendedBlocklet(String filePath, String blockletId, ColumnarFormatVersion version) { diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java index 14907c5..e0c04df 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java @@ -37,5 +37,6 @@ public class LockUsage { public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock"; public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock"; public static final String UPDATE_LOCK = "update.lock"; + public static final String INGEST_LOCK = "ingest.lock"; } diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java index 3fa714c..e0c6121 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java @@ -17,10 +17,14 @@ package org.apache.carbondata.core.reader; +import java.io.DataInputStream; import java.io.IOException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.format.FileFooter3; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; /** @@ -32,11 +36,15 @@ public class CarbonFooterReaderV3 { //Fact file path private String filePath; + // size of the file + private long fileSize; + //start offset of the file footer private long footerOffset; - public CarbonFooterReaderV3(String filePath, long offset) { + public CarbonFooterReaderV3(String filePath, long fileSize, long offset) { this.filePath = filePath; + this.fileSize = fileSize; this.footerOffset = offset; } @@ -49,7 +57,23 @@ public class CarbonFooterReaderV3 { public FileFooter3 readFooterVersion3() throws IOException { ThriftReader thriftReader = openThriftReader(filePath); thriftReader.open(); - //Set the offset from where it should read + + // If footer offset is 0, means caller does not set it, + // so we read it from the end of the file + if (footerOffset == 0) { + Configuration conf = FileFactory.getConfiguration(); + try (DataInputStream reader = FileFactory.getDataInputStream(filePath, conf)) { + long skipBytes = fileSize - CarbonCommonConstants.LONG_SIZE_IN_BYTE; + long skipped = reader.skip(skipBytes); + if (skipped != skipBytes) { + throw new IOException(String.format( + "expect skip %d bytes, but actually skipped %d bytes", skipBytes, skipped)); + } + footerOffset = reader.readLong(); + } + } + + // Set the offset from where it should read thriftReader.setReadOffset(footerOffset); FileFooter3 footer = (FileFooter3) thriftReader.read(); thriftReader.close(); diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java index 60f8b32..b4bf084 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java @@ -17,7 +17,15 @@ package org.apache.carbondata.core.statusmanager; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +import org.apache.hadoop.mapreduce.InputSplit; public class StageInput { @@ -55,4 +63,16 @@ public class StageInput { public void setFiles(Map<String, Long> files) { this.files = files; } + + public List<InputSplit> createSplits() { + return + files.entrySet().stream().filter( + entry -> entry.getKey().endsWith(CarbonCommonConstants.FACT_FILE_EXT) + ).map( + entry -> CarbonInputSplit.from("-1", "0", + base + CarbonCommonConstants.FILE_SEPARATOR + entry.getKey(), + 0, entry.getValue(), ColumnarFormatVersion.V3, null) + ).collect(Collectors.toList()); + } + } diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java index 65ba609..b917559 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java @@ -64,7 +64,8 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter { CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath()); FileHeader fileHeader = carbonHeaderReader.readHeader(); CarbonFooterReaderV3 reader = - new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset()); + new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockLength(), + tableBlockInfo.getBlockOffset()); FileFooter3 footer = reader.readFooterVersion3(); return convertDataFileFooter(fileHeader, footer); } diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 6f46fb9..05894ea 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -17,8 +17,6 @@ package org.apache.carbondata.core.util.path; -import java.io.File; - import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; @@ -57,6 +55,7 @@ public class CarbonTablePath { private static final String STREAMING_CHECKPOINT_DIR = "checkpoint"; private static final String STAGE_DIR = "stage"; public static final String SUCCESS_FILE_SUBFIX = ".success"; + private static final String SNAPSHOT_FILE_NAME = "snapshot"; /** * This class provides static utility only. @@ -68,6 +67,11 @@ public class CarbonTablePath { return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + STAGE_DIR; } + public static String getStageSnapshotFile(String tablePath) { + return CarbonTablePath.getStageDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + SNAPSHOT_FILE_NAME; + } + /** * The method returns the folder path containing the carbon file. * @@ -77,7 +81,7 @@ public class CarbonTablePath { int lastIndex = carbonFilePath.lastIndexOf('/'); // below code for handling windows environment if (-1 == lastIndex) { - lastIndex = carbonFilePath.lastIndexOf(File.separator); + lastIndex = carbonFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR); } return carbonFilePath.substring(0, lastIndex); } @@ -132,42 +136,45 @@ public class CarbonTablePath { * Return absolute path of dictionary file */ public static String getDictionaryFilePath(String tablePath, String columnId) { - return getMetadataPath(tablePath) + File.separator + getDictionaryFileName(columnId); + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + getDictionaryFileName(columnId); } /** * Return absolute path of dictionary file */ public static String getExternalDictionaryFilePath(String dictionaryPath, String columnId) { - return dictionaryPath + File.separator + getDictionaryFileName(columnId); + return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + + getDictionaryFileName(columnId); } /** * Return metadata path */ public static String getMetadataPath(String tablePath) { - return tablePath + File.separator + METADATA_DIR; + return tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR; } /** * Return absolute path of dictionary meta file */ public static String getExternalDictionaryMetaFilePath(String dictionaryPath, String columnId) { - return dictionaryPath + File.separator + columnId + DICTIONARY_META_EXT; + return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + columnId + DICTIONARY_META_EXT; } /** * Return absolute path of dictionary meta file */ public static String getDictionaryMetaFilePath(String tablePath, String columnId) { - return getMetadataPath(tablePath) + File.separator + columnId + DICTIONARY_META_EXT; + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + columnId + + DICTIONARY_META_EXT; } /** * Return sortindex file path based on specified dictionary path */ public static String getExternalSortIndexFilePath(String dictionaryPath, String columnId) { - return dictionaryPath + File.separator + columnId + SORT_INDEX_EXT; + return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + columnId + SORT_INDEX_EXT; } /** @@ -175,7 +182,8 @@ public class CarbonTablePath { */ public static String getExternalSortIndexFilePath(String dictionaryPath, String columnId, long dictOffset) { - return dictionaryPath + File.separator + columnId + "_" + dictOffset + SORT_INDEX_EXT; + return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + + columnId + "_" + dictOffset + SORT_INDEX_EXT; } /** @@ -222,7 +230,7 @@ public class CarbonTablePath { * Return absolute path of table status file */ public static String getTableStatusFilePath(String tablePath) { - return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE; + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + TABLE_STATUS_FILE; } public static String getTableStatusFilePathWithUUID(String tablePath, String uuid) { @@ -285,7 +293,7 @@ public class CarbonTablePath { return getCarbonIndexFilePath(tablePath, taskId, segmentId, bucketNumber); default: String segmentDir = getSegmentPath(tablePath, segmentId); - return segmentDir + File.separator + getCarbonIndexFileName(taskId, + return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + getCarbonIndexFileName(taskId, Integer.parseInt(bucketNumber), timeStamp, segmentId); } } @@ -303,7 +311,8 @@ public class CarbonTablePath { * Return the segment path from table path and segmentId */ public static String getSegmentPath(String tablePath, String segmentId) { - return getPartitionDir(tablePath) + File.separator + SEGMENT_PREFIX + segmentId; + return getPartitionDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + SEGMENT_PREFIX + segmentId; } /** @@ -344,27 +353,29 @@ public class CarbonTablePath { } public static String getCarbonStreamIndexFilePath(String segmentDir) { - return segmentDir + File.separator + getCarbonStreamIndexFileName(); + return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + getCarbonStreamIndexFileName(); } // This partition is not used in any code logic, just keep backward compatibility public static final String DEPRECATED_PARTITION_ID = "0"; public static String getPartitionDir(String tablePath) { - return getFactDir(tablePath) + File.separator + PARTITION_PREFIX + + return getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + PARTITION_PREFIX + CarbonTablePath.DEPRECATED_PARTITION_ID; } public static String getFactDir(String tablePath) { - return tablePath + File.separator + FACT_DIR; + return tablePath + CarbonCommonConstants.FILE_SEPARATOR + FACT_DIR; } public static String getStreamingLogDir(String tablePath) { - return tablePath + File.separator + STREAMING_DIR + File.separator + STREAMING_LOG_DIR; + return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR + + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_LOG_DIR; } public static String getStreamingCheckpointDir(String tablePath) { - return tablePath + File.separator + STREAMING_DIR + File.separator + STREAMING_CHECKPOINT_DIR; + return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR + + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_CHECKPOINT_DIR; } /** @@ -378,7 +389,7 @@ public class CarbonTablePath { String dataMapName, String shardName) { return new StringBuilder() .append(getDataMapStorePath(tablePath, segmentId, dataMapName)) - .append(File.separator) + .append(CarbonCommonConstants.FILE_SEPARATOR) .append(shardName) .toString(); } @@ -392,9 +403,9 @@ public class CarbonTablePath { String dataMapName) { return new StringBuilder() .append(tablePath) - .append(File.separator) + .append(CarbonCommonConstants.FILE_SEPARATOR) .append(dataMapName) - .append(File.separator) + .append(CarbonCommonConstants.FILE_SEPARATOR) .append(segmentId) .toString(); } @@ -760,11 +771,11 @@ public class CarbonTablePath { public static String generateBadRecordsPath(String badLogStoreLocation, String segmentId, String taskNo, boolean isTransactionalTable) { if (!isTransactionalTable) { - return badLogStoreLocation + File.separator + "SdkWriterBadRecords" + return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + "SdkWriterBadRecords" + CarbonCommonConstants.FILE_SEPARATOR + taskNo; } else { - return badLogStoreLocation + File.separator + segmentId + CarbonCommonConstants.FILE_SEPARATOR - + taskNo; + return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + segmentId + + CarbonCommonConstants.FILE_SEPARATOR + taskNo; } } } diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 14ea5b5..3092a4a 100644 --- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -291,7 +291,7 @@ public class CarbonInputSplit extends FileSplit } public static CarbonInputSplit from(String segmentId, String blockletId, String path, long start, - long length, ColumnarFormatVersion version, String dataMapWritePath) throws IOException { + long length, ColumnarFormatVersion version, String dataMapWritePath) { return new CarbonInputSplit(segmentId, blockletId, path, start, length, version, null, dataMapWritePath); } diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md index 33faa9c..190551a 100644 --- a/docs/dml-of-carbondata.md +++ b/docs/dml-of-carbondata.md @@ -21,6 +21,7 @@ CarbonData DML statements are documented here,which includes: * [LOAD DATA](#load-data) * [INSERT DATA](#insert-data-into-carbondata-table) +* [INSERT DATA STAGE](#insert-data-into-carbondata-table-from-stage-input-files) * [Load Data Using Static Partition](#load-data-using-static-partition) * [Load Data Using Dynamic Partition](#load-data-using-dynamic-partition) * [UPDATE AND DELETE](#update-and-delete) @@ -356,6 +357,22 @@ CarbonData DML statements are documented here,which includes: INSERT OVERWRITE TABLE table1 SELECT * FROM TABLE2 ``` +### INSERT DATA INTO CARBONDATA TABLE From Stage Input Files + + Stage input files are data files written by external application (such as Flink). These files + are committed but not loaded into the table. + + You can use this command to insert them into the table, so that making them visible for query. + + ``` + INSERT INTO <CARBONDATA TABLE> STAGE + ``` + + Examples: + ``` + INSERT INTO table1 STAGE + ``` + ### Load Data Using Static Partition This command allows you to load data using static partition. diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala index 55085f8..799c810 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala @@ -17,6 +17,7 @@ package org.apache.carbon.flink +import java.io.File import java.util.Properties import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -26,9 +27,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest - import org.junit.Test +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.path.CarbonTablePath + class TestCarbonWriter extends QueryTest { val tableName = "test_flink" @@ -43,12 +46,14 @@ class TestCarbonWriter extends QueryTest { """.stripMargin ).collect() - try { - val rootPath = System.getProperty("user.dir") + "/target/test-classes" + val rootPath = System.getProperty("user.dir") + "/target/test-classes" - val dataTempPath = rootPath + "/data/temp/" - val dataPath = rootPath + "/data/" + val dataTempPath = rootPath + "/data/temp/" + val dataPath = rootPath + "/data/" + new File(dataPath).delete() + new File(dataPath).mkdir() + try { val tablePath = storeLocation + "/" + tableName + "/" val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation) @@ -69,7 +74,7 @@ class TestCarbonWriter extends QueryTest { @throws[InterruptedException] override def onFinish(): Unit = { - Thread.sleep(10000L) + Thread.sleep(5000L) } } val stream = environment.addSource(source) @@ -92,9 +97,17 @@ class TestCarbonWriter extends QueryTest { throw new UnsupportedOperationException(exception) } - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0))) + sql(s"INSERT INTO $tableName STAGE") + + checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000))) + + // ensure the stage snapshot file and all stage files are deleted + assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath))) + assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty) + } finally { -// sql(s"drop table if exists $tableName").collect() + sql(s"drop table if exists $tableName").collect() + new File(dataPath).delete() } } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala index 9b37446..2054a7b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala @@ -62,8 +62,7 @@ class TestCreateTableWithBlockletSize extends QueryTest with BeforeAndAfterAll { val buffer = fileReader .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), dataFile.getSize - 8, 8) val footerReader = new CarbonFooterReaderV3( - dataFile.getAbsolutePath, - buffer.getLong) + dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong) val footer = footerReader.readFooterVersion3 assertResult(2)(footer.blocklet_index_list.size) assertResult(2)(footer.blocklet_info_list3.size) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index cfd23ed..0360e35 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -2576,8 +2576,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { val buffer = fileReader .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), dataFile.getSize - 8, 8) val footerReader = new CarbonFooterReaderV3( - dataFile.getAbsolutePath, - buffer.getLong) + dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong) val footer = footerReader.readFooterVersion3 // without page_size configuration there will be only 1 page, now it will be more. assert(footer.getBlocklet_info_list3.get(0).number_number_of_pages != 1) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index ea83827..f7b8668 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -19,31 +19,40 @@ package org.apache.carbondata.spark.load import java.util.Comparator +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses} +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant} +import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.store.CarbonRowReadSupport /** * Use sortBy operator in spark to load the data @@ -369,6 +378,72 @@ object DataLoadProcessBuilderOnSpark { carbonTaskInfo.setTaskId(CarbonUtil.generateUUID()) ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo) } + + /** + * create CarbonLoadModel for global_sort + */ + def createLoadModelForGlobalSort( + sparkSession: SparkSession, + carbonTable: CarbonTable + ): CarbonLoadModel = { + val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf() + CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName) + CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName) + CarbonTableOutputFormat.setCarbonTable(conf, carbonTable) + val fieldList = carbonTable.getCreateOrderColumn + .asScala + .map { column => + new StructField(column.getColName, column.getDataType) + } + CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava)) + val loadModel = CarbonTableOutputFormat.getLoadModel(conf) + loadModel.setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N") + loadModel.setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + ",false") + loadModel.setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName + ",force") + loadModel.setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false") + val globalSortPartitions = + carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions") + if (globalSortPartitions != null) { + loadModel.setGlobalSortPartitions(globalSortPartitions) + } + loadModel + } + + /** + * create DataFrame basing on specified splits + */ + def createInputDataFrame( + sparkSession: SparkSession, + carbonTable: CarbonTable, + splits: Seq[InputSplit] + ): DataFrame = { + val columns = carbonTable + .getCreateOrderColumn + .asScala + .map(_.getColName) + .toArray + val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) + val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow]( + sparkSession, + columnProjection = new CarbonProjection(columns), + null, + carbonTable.getAbsoluteTableIdentifier, + carbonTable.getTableInfo.serialize, + carbonTable.getTableInfo, + new CarbonInputMetrics, + null, + null, + classOf[CarbonRowReadSupport], + splits.asJava) + .map { row => + new GenericRow(row.getData.asInstanceOf[Array[Any]]) + } + sparkSession.createDataFrame(rdd, schema) + } } class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 635afee..0656e49 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -192,6 +192,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES") protected val CARBONCLI = carbonKeyWord("CARBONCLI") protected val PATH = carbonKeyWord("PATH") + protected val INSERT = carbonKeyWord("INSERT") + protected val STAGE = carbonKeyWord("STAGE") protected val doubleQuotedString = "\"([^\"]+)\"".r protected val singleQuotedString = "'([^']+)'".r diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 0eef9dc..56b6a2e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -374,14 +374,17 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = { - val dataFrame = dataFrameOfSegments( + val splits = splitsOfSegments( sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, carbonMergerMapping.validSegments) + val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame( + sparkSession, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + splits.asScala) // generate LoadModel which can be used global_sort flow - val outputModel = getLoadModelForGlobalSort( - sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, - carbonMergerMapping.validSegments) + val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort( + sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable) outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1)) DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( sparkSession, @@ -394,38 +397,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } /** - * create DataFrame basing on specified segments - */ - def dataFrameOfSegments( - sparkSession: SparkSession, - carbonTable: CarbonTable, - segments: Array[Segment] - ): DataFrame = { - val columns = carbonTable - .getCreateOrderColumn() - .asScala - .map(_.getColName) - .toArray - val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) - val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow]( - sparkSession, - columnProjection = new CarbonProjection(columns), - null, - carbonTable.getAbsoluteTableIdentifier, - carbonTable.getTableInfo.serialize, - carbonTable.getTableInfo, - new CarbonInputMetrics, - null, - null, - classOf[CarbonRowReadSupport], - splitsOfSegments(sparkSession, carbonTable, segments)) - .map { row => - new GenericRow(row.getData.asInstanceOf[Array[Any]]) - } - sparkSession.createDataFrame(rdd, schema) - } - - /** * get splits of specified segments */ def splitsOfSegments( @@ -445,38 +416,4 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, new CarbonTableInputFormat[Object].getSplits(job) } - /** - * create CarbonLoadModel for global_sort compaction - */ - def getLoadModelForGlobalSort( - sparkSession: SparkSession, - carbonTable: CarbonTable, - segments: Array[Segment] - ): CarbonLoadModel = { - val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf() - CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName) - CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName) - CarbonTableOutputFormat.setCarbonTable(conf, carbonTable) - val fieldList = carbonTable.getCreateOrderColumn - .asScala - .map { column => - new StructField(column.getColName, column.getDataType) - } - CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava)) - val loadModel = CarbonTableOutputFormat.getLoadModel(conf) - loadModel.setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N") - loadModel.setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false") - loadModel.setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force") - loadModel.setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false") - val globalSortPartitions = - carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions") - if (globalSortPartitions != null) { - loadModel.setGlobalSortPartitions(globalSortPartitions) - } - loadModel - } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala new file mode 100644 index 0000000..0c665a8 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.io.{InputStreamReader, IOException} +import java.util +import java.util.Collections +import java.util.concurrent.{Executors, ExecutorService} + +import scala.collection.JavaConverters._ + +import com.google.gson.Gson +import org.apache.hadoop.conf.Configuration +import org.apache.log4j.Logger +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.util.SparkSQLUtil + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark + +/** + * Collect stage input files and trigger a loading into carbon table. + * + * @param databaseNameOp database name + * @param tableName table name + */ +case class CarbonInsertFromStageCommand( + databaseNameOp: Option[String], + tableName: String +) extends DataCommand { + + @transient var LOGGER: Logger = _ + + override def processData(spark: SparkSession): Seq[Row] = { + LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + Checker.validateTableExists(databaseNameOp, tableName, spark) + val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark) + val hadoopConf = spark.sessionState.newHadoopConf() + FileFactory.getConfiguration.addResource(hadoopConf) + setAuditTable(table) + + if (!table.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") + } + if (table.isChildDataMap || table.isChildTable) { + throw new MalformedCarbonCommandException("Unsupported operation on child table") + } + + val tablePath = table.getTablePath + val stagePath = CarbonTablePath.getStageDir(tablePath) + val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath) + var loadModel: CarbonLoadModel = null + val lock = acquireIngestLock(table) + + try { + // 1. Check whether we need to recover from previous failure + // We use a snapshot file to indicate whether there was failure in previous + // ingest operation. A Snapshot file will be created when an ingest operation + // starts and will be deleted only after the whole ingest operation is finished, + // which includes two actions: + // 1) action1: changing segment status to SUCCESS and + // 2) action2: deleting all involved stage files. + // + // If one of these two actions is failed, the snapshot file will be exist, so + // that recovery is needed. + // + // To do the recovery, do following steps: + // 1) Check if corresponding segment in table status is SUCCESS, + // means deleting stage files had failed. So need to read the stage + // file list from the snapshot file and delete them again. + // 2) Check if corresponding segment in table status is INSERT_IN_PROGRESS, + // means data loading had failed. So need to read the stage file list + // from the snapshot file and load again. + recoverIfRequired(snapshotFilePath, table, hadoopConf) + + // 2. Start ingesting, steps: + // 1) read all existing stage files + // 2) read all stage files to collect input files for data loading + // 3) add a new segment entry in table status as INSERT_IN_PROGRESS, + // 4) write all existing stage file names into a new snapshot file + // 5) do actual loading + // 6) write segment file and update segment state to SUCCESS in table status + // 7) delete stage files used for loading + // 8) delete the snapshot file + + // 1) read all existing stage files + val stageFiles = listStageFiles(stagePath, hadoopConf) + if (stageFiles.isEmpty) { + // no stage files, so do nothing + LOGGER.warn("files not found under stage metadata folder") + return Seq.empty + } + + // 2) read all stage files to collect input files for data loading + // create a thread pool to read them + val numThreads = Math.min(Math.max(stageFiles.length, 1), 10) + val executorService = Executors.newFixedThreadPool(numThreads) + val stageInputs = collectStageInputs(executorService, stageFiles) + + // 3) add new segment with INSERT_IN_PROGRESS into table status + loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table) + CarbonLoaderUtil.recordNewLoadMetadata(loadModel) + + // 4) write all existing stage file names and segmentId into a new snapshot file + // The content of snapshot file is: first line is segmentId, followed by each line is + // one stage file name + val content = + (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n") + FileFactory.writeFile(content, snapshotFilePath) + + // 5) perform data loading + startLoading(spark, table, loadModel, stageInputs) + + // 6) write segment file and update the segment entry to SUCCESS + val segmentFileName = SegmentFileStore.writeSegmentFile( + table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString) + SegmentFileStore.updateTableStatusFile( + table, loadModel.getSegmentId, segmentFileName, + table.getCarbonTableIdentifier.getTableId, + new SegmentFileStore(table.getTablePath, segmentFileName), + SegmentStatus.SUCCESS) + + // 7) delete stage files + deleteStageFiles(executorService, stageFiles) + + // 8) delete the snapshot file + FileFactory.getCarbonFile(snapshotFilePath).delete() + } catch { + case ex: Throwable => + LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex) + if (loadModel != null) { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel) + } + throw ex + } finally { + lock.unlock() + } + Seq.empty + } + + /** + * Check whether there was failure in previous ingest process and try to recover + */ + private def recoverIfRequired( + snapshotFilePath: String, + table: CarbonTable, + conf: Configuration): Unit = { + if (!FileFactory.isFileExist(snapshotFilePath)) { + // everything is fine + return + } + + // something wrong, read the snapshot file and do recover steps + // 1. check segment state in table status file + // 2. If in SUCCESS state, delete all stage files read inn snapshot file + // 3. If in IN_PROGRESS state, delete the entry in table status and load again + LOGGER.info(s"snapshot file found ($snapshotFilePath), start recovery process") + val lines = FileFactory.readLinesInFile(snapshotFilePath, conf) + if (lines.size() < 2) { + throw new RuntimeException("Invalid snapshot file, " + lines.size() + " lines") + } + + val segmentId = lines.get(0) + val stageFileNames = lines.remove(0) + LOGGER.info(s"Segment $segmentId need recovery, ${stageFileNames.length} stage files") + + // lock the table status + var lock = CarbonLockFactory.getCarbonLockObj( + table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK) + if (!lock.lockWithRetries()) { + throw new RuntimeException(s"Failed to lock table status for " + + s"${table.getDatabaseName}.${table.getTableName}") + } + try { + val segments = SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(table.getTablePath) + ) + val matchedSegment = segments.filter(_.getLoadName.equals(segmentId)) + if (matchedSegment.length != 1) { + throw new RuntimeException("unexpected " + matchedSegment.length + " segment found") + } + matchedSegment(0).getSegmentStatus match { + case SegmentStatus.SUCCESS => + // delete all stage files + lock.unlock() + lock = null + LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to delete " + + s"${stageFileNames.length} stage files") + val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10) + val executorService = Executors.newFixedThreadPool(numThreads) + stageFileNames.map { fileName => + executorService.submit(new Runnable { + override def run(): Unit = { + FileFactory.getCarbonFile( + CarbonTablePath.getStageDir(table.getTablePath) + + CarbonCommonConstants.FILE_SEPARATOR + fileName + ).delete() + } + }) + }.map { future => + future.get() + } + case SegmentStatus.INSERT_IN_PROGRESS => + // delete entry in table status and load again + LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state, about to delete the " + + s"segment entry and load again") + val segmentToWrite = segments.filterNot(_.getLoadName.equals(segmentId)) + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(table.getTablePath), + segmentToWrite) + case other => + throw new RuntimeException(s"Segment $segmentId is in unexpected state: $other") + } + } finally { + if (lock != null) { + lock.unlock() + } + } + LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath") + FileFactory.getCarbonFile(snapshotFilePath).delete() + } + + /** + * Start global sort loading + */ + private def startLoading( + spark: SparkSession, + table: CarbonTable, + loadModel: CarbonLoadModel, + stageInput: Seq[StageInput] + ): Unit = { + val splits = stageInput.flatMap(_.createSplits().asScala) + LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}") + val start = System.currentTimeMillis() + val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + spark, + Option(dataFrame), + loadModel, + SparkSQLUtil.sessionState(spark).newHadoopConf() + ).map { row => + (row._1, FailureCauses.NONE == row._2._2.failureCauses) + } + + LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + } + + /** + * Read stage files and return input files + */ + private def collectStageInputs( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)] + ): Seq[StageInput] = { + val startTime = System.currentTimeMillis() + val output = Collections.synchronizedList(new util.ArrayList[StageInput]()) + val gson = new Gson() + stageFiles.map { stage => + executorService.submit(new Runnable { + override def run(): Unit = { + val filePath = stage._1.getAbsolutePath + val stream = FileFactory.getDataInputStream(filePath) + try { + val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput]) + output.add(stageInput) + } finally { + stream.close() + } + } + }) + }.map { future => + future.get() + } + LOGGER.info(s"read stage files taken ${System.currentTimeMillis() - startTime}ms") + output.asScala + } + + /** + * Delete stage file and success file + */ + private def deleteStageFiles( + executorService: ExecutorService, + stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { + val startTime = System.currentTimeMillis() + stageFiles.map { files => + executorService.submit(new Runnable { + override def run(): Unit = { + files._1.delete() + files._2.delete() + } + }) + }.map { future => + future.get() + } + LOGGER.info(s"finished to delete stage files, time taken: " + + s"${System.currentTimeMillis() - startTime}ms") + } + + /* + * Collect all stage files and matched success files. + * A stage file without success file will not be collected + */ + private def listStageFiles( + loadDetailsDir: String, + hadoopConf: Configuration + ): Array[(CarbonFile, CarbonFile)] = { + val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf) + if (dir.exists()) { + val allFiles = dir.listFiles() + val successFiles = allFiles.filter { file => + file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX) + }.map { file => + (file.getName.substring(0, file.getName.indexOf(".")), file) + }.toMap + allFiles.filter { file => + !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX) + }.filter { file => + successFiles.contains(file.getName) + }.map { file => + (file, successFiles(file.getName)) + } + } else { + Array.empty + } + } + + /** + * INGEST operation does not support concurrent, so it is one lock for one table + */ + private def acquireIngestLock(table: CarbonTable): ICarbonLock = { + val tableIdentifier = table.getAbsoluteTableIdentifier + val lock = CarbonLockFactory.getCarbonLockObj(tableIdentifier, LockUsage.INGEST_LOCK) + val retryCount = CarbonLockUtil.getLockProperty( + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT + ) + val maxTimeout = CarbonLockUtil.getLockProperty( + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT + ) + if (lock.lockWithRetries(retryCount, maxTimeout)) { + lock + } else { + throw new IOException( + s"Not able to acquire the lock for table status file for $tableIdentifier") + } + } + + override protected def opName: String = "INSERT STAGE" +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 81923ea..7e0b13e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -77,7 +77,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val startCommand: Parser[LogicalPlan] = loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords | datamapManagement | alterTableFinishStreaming | stream | cli | - cacheManagement | alterDataMap + cacheManagement | alterDataMap | insertStageData protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | addLoad @@ -458,6 +458,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonAddLoadCommand(dbName, tableName, optionsList.toMap) } + /** + * INSERT INTO [dbName.]tableName STAGE + */ + protected lazy val insertStageData: Parser[LogicalPlan] = + INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ { + case dbName ~ tableName => + CarbonInsertFromStageCommand(dbName, tableName) + } + protected lazy val cleanFiles: Parser[LogicalPlan] = CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ { case databaseName ~ tableName => diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 1a40ff4..622dfaa 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -163,6 +163,24 @@ public final class CarbonLoaderUtil { } /** + * Append a new load metadata into table status file + * + * @param loadModel load model + * @return boolean which determines whether status update is done or not + * @throws IOException + */ + public static boolean recordNewLoadMetadata(CarbonLoadModel loadModel) throws IOException { + LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails(); + loadModel.setFactTimeStamp(System.currentTimeMillis()); + CarbonLoaderUtil.populateNewLoadMetaEntry( + newLoadMetaEntry, + SegmentStatus.INSERT_IN_PROGRESS, + loadModel.getFactTimeStamp(), + false); + return recordNewLoadMetadata(newLoadMetaEntry, loadModel, true, false); + } + + /** * This API deletes the content of the non Transactional Tables when insert overwrite is set true. * * @param loadModel diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java index 7929810..2732d3a 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java @@ -319,7 +319,8 @@ public class CarbonSchemaReader { ByteBuffer buffer = fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath), fileSize - 8, 8); fileReader.finish(); - CarbonFooterReaderV3 footerReader = new CarbonFooterReaderV3(dataFilePath, buffer.getLong()); + CarbonFooterReaderV3 footerReader = + new CarbonFooterReaderV3(dataFilePath, fileSize, buffer.getLong()); FileFooter3 footer = footerReader.readFooterVersion3(); if (null != footer.getExtra_info()) { return footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO) diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 533480c..ef2ec5e 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -428,7 +428,7 @@ public class CSVCarbonWriterTest { dataFile.getPath()), dataFile.getSize() - 8, 8); fileReader.finish(); CarbonFooterReaderV3 footerReader = - new CarbonFooterReaderV3(dataFile.getAbsolutePath(), buffer.getLong()); + new CarbonFooterReaderV3(dataFile.getAbsolutePath(), dataFile.getSize(), buffer.getLong()); FileFooter3 footer = footerReader.readFooterVersion3(); Assert.assertEquals(2, footer.blocklet_index_list.size()); Assert.assertEquals(2, footer.blocklet_info_list3.size()); diff --git a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java index 4ed3945..a1fdad5 100644 --- a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java +++ b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java @@ -170,7 +170,7 @@ class DataFile { this.footerOffset = buffer.getLong(); this.footerSizeInBytes = this.fileSizeInBytes - footerOffset; CarbonFooterReaderV3 footerReader = - new CarbonFooterReaderV3(dataFile.getAbsolutePath(), footerOffset); + new CarbonFooterReaderV3(dataFile.getAbsolutePath(), dataFile.getSize(), footerOffset); return footerReader.readFooterVersion3(); }