This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3098fec  [CARBONDATA-3599] Support insert data from stage files 
written by SDK
3098fec is described below

commit 3098fece425ec22282dd5f9d0b0508929a524a0e
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Sun Dec 22 15:46:43 2019 +0800

    [CARBONDATA-3599] Support insert data from stage files written by SDK
    
    A new SQL command is added to insert data from stage files written by
    SDK, like in Flink application.
    
    Usage:
    Use carbondata-flink module to write data to HDFS, S3, etc
    Then use following command to trigger loading into carbon table
    periodically.
    
    INSERT INTO dbName.tableName STAGE
    For more detail, see TestCarbonWriter.scala
    
    This closes #3488
---
 .../core/datastore/impl/FileFactory.java           |  63 ++++
 .../core/indexstore/ExtendedBlocklet.java          |   6 +-
 .../apache/carbondata/core/locks/LockUsage.java    |   1 +
 .../core/reader/CarbonFooterReaderV3.java          |  28 +-
 .../carbondata/core/statusmanager/StageInput.java  |  20 ++
 .../core/util/DataFileFooterConverterV3.java       |   3 +-
 .../carbondata/core/util/path/CarbonTablePath.java |  59 ++--
 .../apache/carbondata/hadoop/CarbonInputSplit.java |   2 +-
 docs/dml-of-carbondata.md                          |  17 +
 .../org/apache/carbon/flink/TestCarbonWriter.scala |  29 +-
 .../TestCreateTableWithBlockletSize.scala          |   3 +-
 .../TestNonTransactionalCarbonTable.scala          |   3 +-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  87 ++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   2 +
 .../spark/rdd/CarbonTableCompactor.scala           |  77 +----
 .../management/CarbonInsertFromStageCommand.scala  | 379 +++++++++++++++++++++
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  11 +-
 .../processing/util/CarbonLoaderUtil.java          |  18 +
 .../carbondata/sdk/file/CarbonSchemaReader.java    |   3 +-
 .../carbondata/sdk/file/CSVCarbonWriterTest.java   |   2 +-
 .../java/org/apache/carbondata/tool/DataFile.java  |   2 +-
 21 files changed, 690 insertions(+), 125 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 711996f..d1d8c33 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -17,19 +17,30 @@
 
 package org.apache.carbondata.core.datastore.impl;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.lang.reflect.Method;
 import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.List;
 import java.util.Locale;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 
 import org.apache.commons.io.FileUtils;
@@ -614,4 +625,56 @@ public final class FileFactory {
   public static short getDefaultReplication(String path) {
     return getCarbonFile(path).getDefaultReplication();
   }
+
+  /**
+   * Write content into specified file path
+   * @param content content to write
+   * @param filePath file path to write
+   * @throws IOException if IO errors
+   */
+  public static void writeFile(String content, String filePath) throws 
IOException {
+    AtomicFileOperations fileWrite = 
AtomicFileOperationFactory.getAtomicFileOperations(filePath);
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      brWriter.write(content);
+    } catch (IOException ie) {
+      LOGGER.error("Error message: " + ie.getLocalizedMessage());
+      fileWrite.setFailed();
+      throw ie;
+    } finally {
+      try {
+        CarbonUtil.closeStreams(brWriter);
+      } finally {
+        fileWrite.close();
+      }
+    }
+  }
+
+  /**
+   * Read all lines in a specified file
+   *
+   * @param filePath file to read
+   * @param conf hadoop configuration
+   * @return file content
+   * @throws IOException if IO errors
+   */
+  public static List<String> readLinesInFile(
+      String filePath, Configuration conf) throws IOException {
+    DataInputStream fileReader = null;
+    BufferedReader bufferedReader = null;
+    try {
+      fileReader = FileFactory.getDataInputStream(filePath, -1, conf);
+      bufferedReader =
+          new BufferedReader(
+              new InputStreamReader(
+                  fileReader, 
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      return bufferedReader.lines().collect(Collectors.toList());
+    } finally {
+      CarbonUtil.closeStreams(fileReader, bufferedReader);
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index e391602..b20689c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -51,11 +51,7 @@ public class ExtendedBlocklet extends Blocklet {
   public ExtendedBlocklet(String filePath, String blockletId,
       boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion 
version) {
     super(filePath, blockletId, compareBlockletIdForObjectMatching);
-    try {
-      this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 
-1, version, null);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, -1, 
version, null);
   }
 
   public ExtendedBlocklet(String filePath, String blockletId, 
ColumnarFormatVersion version) {
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java 
b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index 14907c5..e0c04df 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -37,5 +37,6 @@ public class LockUsage {
   public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock";
   public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock";
   public static final String UPDATE_LOCK = "update.lock";
+  public static final String INGEST_LOCK = "ingest.lock";
 
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
index 3fa714c..e0c6121 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
@@ -17,10 +17,14 @@
 
 package org.apache.carbondata.core.reader;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.format.FileFooter3;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -32,11 +36,15 @@ public class CarbonFooterReaderV3 {
   //Fact file path
   private String filePath;
 
+  // size of the file
+  private long fileSize;
+
   //start offset of the file footer
   private long footerOffset;
 
-  public CarbonFooterReaderV3(String filePath, long offset) {
+  public CarbonFooterReaderV3(String filePath, long fileSize, long offset) {
     this.filePath = filePath;
+    this.fileSize = fileSize;
     this.footerOffset = offset;
   }
 
@@ -49,7 +57,23 @@ public class CarbonFooterReaderV3 {
   public FileFooter3 readFooterVersion3() throws IOException {
     ThriftReader thriftReader = openThriftReader(filePath);
     thriftReader.open();
-    //Set the offset from where it should read
+
+    // If footer offset is 0, means caller does not set it,
+    // so we read it from the end of the file
+    if (footerOffset == 0) {
+      Configuration conf = FileFactory.getConfiguration();
+      try (DataInputStream reader = FileFactory.getDataInputStream(filePath, 
conf)) {
+        long skipBytes = fileSize - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
+        long skipped = reader.skip(skipBytes);
+        if (skipped != skipBytes) {
+          throw new IOException(String.format(
+              "expect skip %d bytes, but actually skipped %d bytes", 
skipBytes, skipped));
+        }
+        footerOffset = reader.readLong();
+      }
+    }
+
+    // Set the offset from where it should read
     thriftReader.setReadOffset(footerOffset);
     FileFooter3 footer = (FileFooter3) thriftReader.read();
     thriftReader.close();
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
index 60f8b32..b4bf084 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -17,7 +17,15 @@
 
 package org.apache.carbondata.core.statusmanager;
 
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
 
 public class StageInput {
 
@@ -55,4 +63,16 @@ public class StageInput {
   public void setFiles(Map<String, Long> files) {
     this.files = files;
   }
+
+  public List<InputSplit> createSplits() {
+    return
+        files.entrySet().stream().filter(
+            entry -> 
entry.getKey().endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+        ).map(
+            entry -> CarbonInputSplit.from("-1", "0",
+                base + CarbonCommonConstants.FILE_SEPARATOR + entry.getKey(),
+                0, entry.getValue(), ColumnarFormatVersion.V3, null)
+        ).collect(Collectors.toList());
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 65ba609..b917559 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -64,7 +64,8 @@ public class DataFileFooterConverterV3 extends 
AbstractDataFileFooterConverter {
     CarbonHeaderReader carbonHeaderReader = new 
CarbonHeaderReader(tableBlockInfo.getFilePath());
     FileHeader fileHeader = carbonHeaderReader.readHeader();
     CarbonFooterReaderV3 reader =
-        new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), 
tableBlockInfo.getBlockOffset());
+        new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), 
tableBlockInfo.getBlockLength(),
+            tableBlockInfo.getBlockOffset());
     FileFooter3 footer = reader.readFooterVersion3();
     return convertDataFileFooter(fileHeader, footer);
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 6f46fb9..05894ea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.util.path;
 
-import java.io.File;
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -57,6 +55,7 @@ public class CarbonTablePath {
   private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
   private static final String STAGE_DIR = "stage";
   public static final String  SUCCESS_FILE_SUBFIX = ".success";
+  private static final String SNAPSHOT_FILE_NAME = "snapshot";
 
   /**
    * This class provides static utility only.
@@ -68,6 +67,11 @@ public class CarbonTablePath {
     return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
STAGE_DIR;
   }
 
+  public static String getStageSnapshotFile(String tablePath) {
+    return CarbonTablePath.getStageDir(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR +
+        SNAPSHOT_FILE_NAME;
+  }
+
   /**
    * The method returns the folder path containing the carbon file.
    *
@@ -77,7 +81,7 @@ public class CarbonTablePath {
     int lastIndex = carbonFilePath.lastIndexOf('/');
     // below code for handling windows environment
     if (-1 == lastIndex) {
-      lastIndex = carbonFilePath.lastIndexOf(File.separator);
+      lastIndex = 
carbonFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
     }
     return carbonFilePath.substring(0, lastIndex);
   }
@@ -132,42 +136,45 @@ public class CarbonTablePath {
    * Return absolute path of dictionary file
    */
   public static String getDictionaryFilePath(String tablePath, String 
columnId) {
-    return getMetadataPath(tablePath) + File.separator + 
getDictionaryFileName(columnId);
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
+        getDictionaryFileName(columnId);
   }
 
   /**
    * Return absolute path of dictionary file
    */
   public static String getExternalDictionaryFilePath(String dictionaryPath, 
String columnId) {
-    return dictionaryPath + File.separator + getDictionaryFileName(columnId);
+    return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR +
+        getDictionaryFileName(columnId);
   }
 
   /**
    * Return metadata path
    */
   public static String getMetadataPath(String tablePath) {
-    return tablePath + File.separator + METADATA_DIR;
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + METADATA_DIR;
   }
 
   /**
    * Return absolute path of dictionary meta file
    */
   public static String getExternalDictionaryMetaFilePath(String 
dictionaryPath, String columnId) {
-    return dictionaryPath + File.separator + columnId + DICTIONARY_META_EXT;
+    return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + columnId + 
DICTIONARY_META_EXT;
   }
 
   /**
    * Return absolute path of dictionary meta file
    */
   public static String getDictionaryMetaFilePath(String tablePath, String 
columnId) {
-    return getMetadataPath(tablePath) + File.separator + columnId + 
DICTIONARY_META_EXT;
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
columnId +
+        DICTIONARY_META_EXT;
   }
 
   /**
    * Return sortindex file path based on specified dictionary path
    */
   public static String getExternalSortIndexFilePath(String dictionaryPath, 
String columnId) {
-    return dictionaryPath + File.separator + columnId + SORT_INDEX_EXT;
+    return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR + columnId + 
SORT_INDEX_EXT;
   }
 
   /**
@@ -175,7 +182,8 @@ public class CarbonTablePath {
    */
   public static String getExternalSortIndexFilePath(String dictionaryPath, 
String columnId,
       long dictOffset) {
-    return dictionaryPath + File.separator + columnId + "_" + dictOffset + 
SORT_INDEX_EXT;
+    return dictionaryPath + CarbonCommonConstants.FILE_SEPARATOR +
+        columnId + "_" + dictOffset + SORT_INDEX_EXT;
   }
 
   /**
@@ -222,7 +230,7 @@ public class CarbonTablePath {
    * Return absolute path of table status file
    */
   public static String getTableStatusFilePath(String tablePath) {
-    return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE;
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
TABLE_STATUS_FILE;
   }
 
   public static String getTableStatusFilePathWithUUID(String tablePath, String 
uuid) {
@@ -285,7 +293,7 @@ public class CarbonTablePath {
         return getCarbonIndexFilePath(tablePath, taskId, segmentId, 
bucketNumber);
       default:
         String segmentDir = getSegmentPath(tablePath, segmentId);
-        return segmentDir + File.separator + getCarbonIndexFileName(taskId,
+        return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + 
getCarbonIndexFileName(taskId,
             Integer.parseInt(bucketNumber), timeStamp, segmentId);
     }
   }
@@ -303,7 +311,8 @@ public class CarbonTablePath {
    * Return the segment path from table path and segmentId
    */
   public static String getSegmentPath(String tablePath, String segmentId) {
-    return getPartitionDir(tablePath) + File.separator + SEGMENT_PREFIX + 
segmentId;
+    return getPartitionDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+        + SEGMENT_PREFIX + segmentId;
   }
 
   /**
@@ -344,27 +353,29 @@ public class CarbonTablePath {
   }
 
   public static String getCarbonStreamIndexFilePath(String segmentDir) {
-    return segmentDir + File.separator + getCarbonStreamIndexFileName();
+    return segmentDir + CarbonCommonConstants.FILE_SEPARATOR + 
getCarbonStreamIndexFileName();
   }
 
   // This partition is not used in any code logic, just keep backward 
compatibility
   public static final String DEPRECATED_PARTITION_ID = "0";
 
   public static String getPartitionDir(String tablePath) {
-    return getFactDir(tablePath) + File.separator + PARTITION_PREFIX +
+    return getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
PARTITION_PREFIX +
         CarbonTablePath.DEPRECATED_PARTITION_ID;
   }
 
   public static String getFactDir(String tablePath) {
-    return tablePath + File.separator + FACT_DIR;
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + FACT_DIR;
   }
 
   public static String getStreamingLogDir(String tablePath) {
-    return tablePath + File.separator + STREAMING_DIR + File.separator + 
STREAMING_LOG_DIR;
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR +
+        CarbonCommonConstants.FILE_SEPARATOR + STREAMING_LOG_DIR;
   }
 
   public static String getStreamingCheckpointDir(String tablePath) {
-    return tablePath + File.separator + STREAMING_DIR + File.separator + 
STREAMING_CHECKPOINT_DIR;
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + STREAMING_DIR +
+        CarbonCommonConstants.FILE_SEPARATOR + STREAMING_CHECKPOINT_DIR;
   }
 
   /**
@@ -378,7 +389,7 @@ public class CarbonTablePath {
       String dataMapName, String shardName) {
     return new StringBuilder()
         .append(getDataMapStorePath(tablePath, segmentId, dataMapName))
-        .append(File.separator)
+        .append(CarbonCommonConstants.FILE_SEPARATOR)
         .append(shardName)
         .toString();
   }
@@ -392,9 +403,9 @@ public class CarbonTablePath {
       String dataMapName) {
     return new StringBuilder()
         .append(tablePath)
-        .append(File.separator)
+        .append(CarbonCommonConstants.FILE_SEPARATOR)
         .append(dataMapName)
-        .append(File.separator)
+        .append(CarbonCommonConstants.FILE_SEPARATOR)
         .append(segmentId)
         .toString();
   }
@@ -760,11 +771,11 @@ public class CarbonTablePath {
   public static String generateBadRecordsPath(String badLogStoreLocation, 
String segmentId,
       String taskNo, boolean isTransactionalTable) {
     if (!isTransactionalTable) {
-      return badLogStoreLocation + File.separator + "SdkWriterBadRecords"
+      return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + 
"SdkWriterBadRecords"
           + CarbonCommonConstants.FILE_SEPARATOR + taskNo;
     } else {
-      return badLogStoreLocation + File.separator + segmentId + 
CarbonCommonConstants.FILE_SEPARATOR
-          + taskNo;
+      return badLogStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + 
segmentId +
+          CarbonCommonConstants.FILE_SEPARATOR + taskNo;
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java 
b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 14ea5b5..3092a4a 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -291,7 +291,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public static CarbonInputSplit from(String segmentId, String blockletId, 
String path, long start,
-      long length, ColumnarFormatVersion version, String dataMapWritePath) 
throws IOException {
+      long length, ColumnarFormatVersion version, String dataMapWritePath) {
     return new CarbonInputSplit(segmentId, blockletId, path, start, length, 
version, null,
         dataMapWritePath);
   }
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 33faa9c..190551a 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -21,6 +21,7 @@ CarbonData DML statements are documented here,which includes:
 
 * [LOAD DATA](#load-data)
 * [INSERT DATA](#insert-data-into-carbondata-table)
+* [INSERT DATA 
STAGE](#insert-data-into-carbondata-table-from-stage-input-files)
 * [Load Data Using Static Partition](#load-data-using-static-partition)
 * [Load Data Using Dynamic Partition](#load-data-using-dynamic-partition)
 * [UPDATE AND DELETE](#update-and-delete)
@@ -356,6 +357,22 @@ CarbonData DML statements are documented here,which 
includes:
   INSERT OVERWRITE TABLE table1 SELECT * FROM TABLE2
   ```
 
+### INSERT DATA INTO CARBONDATA TABLE From Stage Input Files
+
+  Stage input files are data files written by external application (such as 
Flink). These files 
+  are committed but not loaded into the table. 
+  
+  You can use this command to insert them into the table, so that making them 
visible for query.
+  
+  ```
+  INSERT INTO <CARBONDATA TABLE> STAGE
+  ```
+
+  Examples:
+  ```
+  INSERT INTO table1 STAGE
+  ```
+
 ### Load Data Using Static Partition 
 
   This command allows you to load data using static partition.
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 55085f8..799c810 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbon.flink
 
+import java.io.File
 import java.util.Properties
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -26,9 +27,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-
 import org.junit.Test
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 class TestCarbonWriter extends QueryTest {
 
   val tableName = "test_flink"
@@ -43,12 +46,14 @@ class TestCarbonWriter extends QueryTest {
       """.stripMargin
     ).collect()
 
-    try {
-      val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
-      val dataTempPath = rootPath + "/data/temp/"
-      val dataPath = rootPath + "/data/"
+    val dataTempPath = rootPath + "/data/temp/"
+    val dataPath = rootPath + "/data/"
+    new File(dataPath).delete()
+    new File(dataPath).mkdir()
 
+    try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
       val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
@@ -69,7 +74,7 @@ class TestCarbonWriter extends QueryTest {
 
         @throws[InterruptedException]
         override def onFinish(): Unit = {
-          Thread.sleep(10000L)
+          Thread.sleep(5000L)
         }
       }
       val stream = environment.addSource(source)
@@ -92,9 +97,17 @@ class TestCarbonWriter extends QueryTest {
           throw new UnsupportedOperationException(exception)
       }
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0)))
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+
+      // ensure the stage snapshot file and all stage files are deleted
+      
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
+      
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+
     } finally {
-//      sql(s"drop table if exists $tableName").collect()
+      sql(s"drop table if exists $tableName").collect()
+      new File(dataPath).delete()
     }
   }
 
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
index 9b37446..2054a7b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
@@ -62,8 +62,7 @@ class TestCreateTableWithBlockletSize extends QueryTest with 
BeforeAndAfterAll {
       val buffer = fileReader
         .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), 
dataFile.getSize - 8, 8)
       val footerReader = new CarbonFooterReaderV3(
-        dataFile.getAbsolutePath,
-        buffer.getLong)
+        dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
       val footer = footerReader.readFooterVersion3
       assertResult(2)(footer.blocklet_index_list.size)
       assertResult(2)(footer.blocklet_info_list3.size)
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index cfd23ed..0360e35 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2576,8 +2576,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       val buffer = fileReader
         .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), 
dataFile.getSize - 8, 8)
       val footerReader = new CarbonFooterReaderV3(
-        dataFile.getAbsolutePath,
-        buffer.getLong)
+        dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
       val footer = footerReader.readFooterVersion3
       // without page_size configuration there will be only 1 page, now it 
will be more.
       assert(footer.getBlocklet_info_list3.get(0).number_number_of_pages != 1)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index ea83827..f7b8668 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -19,31 +19,40 @@ package org.apache.carbondata.spark.load
 
 import java.util.Comparator
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.spark.{Accumulator, CarbonInputMetrics, 
DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, 
DataField, DataLoadProcessBuilder, FailureCauses}
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
TableOptionConstant}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.store.CarbonRowReadSupport
 
 /**
  * Use sortBy operator in spark to load the data
@@ -369,6 +378,72 @@ object DataLoadProcessBuilderOnSpark {
     carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
   }
+
+  /**
+   * create CarbonLoadModel for global_sort
+   */
+  def createLoadModelForGlobalSort(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable
+  ): CarbonLoadModel = {
+    val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+    CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
+    CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
+    val fieldList = carbonTable.getCreateOrderColumn
+      .asScala
+      .map { column =>
+        new StructField(column.getColName, column.getDataType)
+      }
+    CarbonTableOutputFormat.setInputSchema(conf, new 
StructType(fieldList.asJava))
+    val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
+    loadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    loadModel.setBadRecordsLoggerEnable(
+      TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + ",false")
+    loadModel.setBadRecordsAction(
+      TableOptionConstant.BAD_RECORDS_ACTION.getName + ",force")
+    loadModel.setIsEmptyDataBadRecord(
+      DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
+    val globalSortPartitions =
+      
carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions")
+    if (globalSortPartitions != null) {
+      loadModel.setGlobalSortPartitions(globalSortPartitions)
+    }
+    loadModel
+  }
+
+  /**
+   * create DataFrame basing on specified splits
+   */
+  def createInputDataFrame(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      splits: Seq[InputSplit]
+  ): DataFrame = {
+    val columns = carbonTable
+      .getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .toArray
+    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+    val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+      sparkSession,
+      columnProjection = new CarbonProjection(columns),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize,
+      carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      null,
+      null,
+      classOf[CarbonRowReadSupport],
+      splits.asJava)
+      .map { row =>
+        new GenericRow(row.getData.asInstanceOf[Array[Any]])
+      }
+    sparkSession.createDataFrame(rdd, schema)
+  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 635afee..0656e49 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -192,6 +192,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
   protected val CARBONCLI = carbonKeyWord("CARBONCLI")
   protected val PATH = carbonKeyWord("PATH")
+  protected val INSERT = carbonKeyWord("INSERT")
+  protected val STAGE = carbonKeyWord("STAGE")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 0eef9dc..56b6a2e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -374,14 +374,17 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
       carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
-    val dataFrame = dataFrameOfSegments(
+    val splits = splitsOfSegments(
       sparkSession,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
       carbonMergerMapping.validSegments)
+    val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(
+      sparkSession,
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+      splits.asScala)
     // generate LoadModel which can be used global_sort flow
-    val outputModel = getLoadModelForGlobalSort(
-      sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-      carbonMergerMapping.validSegments)
+    val outputModel = 
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
+      sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
     outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
     DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
       sparkSession,
@@ -394,38 +397,6 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
   }
 
   /**
-   * create DataFrame basing on specified segments
-   */
-  def dataFrameOfSegments(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      segments: Array[Segment]
-  ): DataFrame = {
-    val columns = carbonTable
-          .getCreateOrderColumn()
-      .asScala
-      .map(_.getColName)
-      .toArray
-    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
-      sparkSession,
-      columnProjection = new CarbonProjection(columns),
-      null,
-      carbonTable.getAbsoluteTableIdentifier,
-      carbonTable.getTableInfo.serialize,
-      carbonTable.getTableInfo,
-      new CarbonInputMetrics,
-      null,
-      null,
-      classOf[CarbonRowReadSupport],
-      splitsOfSegments(sparkSession, carbonTable, segments))
-      .map { row =>
-        new GenericRow(row.getData.asInstanceOf[Array[Any]])
-      }
-    sparkSession.createDataFrame(rdd, schema)
-  }
-
-  /**
    * get splits of specified segments
    */
   def splitsOfSegments(
@@ -445,38 +416,4 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
     new CarbonTableInputFormat[Object].getSplits(job)
   }
 
-  /**
-   * create CarbonLoadModel for global_sort compaction
-   */
-  def getLoadModelForGlobalSort(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      segments: Array[Segment]
-  ): CarbonLoadModel = {
-    val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
-    CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
-    CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
-    CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
-    val fieldList = carbonTable.getCreateOrderColumn
-      .asScala
-      .map { column =>
-        new StructField(column.getColName, column.getDataType)
-      }
-    CarbonTableOutputFormat.setInputSchema(conf, new 
StructType(fieldList.asJava))
-    val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
-    loadModel.setSerializationNullFormat(
-      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N")
-    loadModel.setBadRecordsLoggerEnable(
-      TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false")
-    loadModel.setBadRecordsAction(
-      TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force")
-    loadModel.setIsEmptyDataBadRecord(
-      DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
-    val globalSortPartitions =
-      
carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions")
-    if (globalSortPartitions != null) {
-      loadModel.setGlobalSortPartitions(globalSortPartitions)
-    }
-    loadModel
-  }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
new file mode 100644
index 0000000..0c665a8
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.{InputStreamReader, IOException}
+import java.util
+import java.util.Collections
+import java.util.concurrent.{Executors, ExecutorService}
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager, StageInput}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+
+/**
+ * Collect stage input files and trigger a loading into carbon table.
+ *
+ * @param databaseNameOp database name
+ * @param tableName table name
+ */
+case class CarbonInsertFromStageCommand(
+    databaseNameOp: Option[String],
+    tableName: String
+) extends DataCommand {
+
+  @transient var LOGGER: Logger = _
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    FileFactory.getConfiguration.addResource(hadoopConf)
+    setAuditTable(table)
+
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+    if (table.isChildDataMap || table.isChildTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on 
child table")
+    }
+
+    val tablePath = table.getTablePath
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
+    var loadModel: CarbonLoadModel = null
+    val lock = acquireIngestLock(table)
+
+    try {
+      // 1. Check whether we need to recover from previous failure
+      // We use a snapshot file to indicate whether there was failure in 
previous
+      // ingest operation. A Snapshot file will be created when an ingest 
operation
+      // starts and will be deleted only after the whole ingest operation is 
finished,
+      // which includes two actions:
+      //   1) action1: changing segment status to SUCCESS and
+      //   2) action2: deleting all involved stage files.
+      //
+      // If one of these two actions is failed, the snapshot file will be 
exist, so
+      // that recovery is needed.
+      //
+      // To do the recovery, do following steps:
+      //   1) Check if corresponding segment in table status is SUCCESS,
+      //      means deleting stage files had failed. So need to read the stage
+      //      file list from the snapshot file and delete them again.
+      //   2) Check if corresponding segment in table status is 
INSERT_IN_PROGRESS,
+      //      means data loading had failed. So need to read the stage file 
list
+      //      from the snapshot file and load again.
+      recoverIfRequired(snapshotFilePath, table, hadoopConf)
+
+      // 2. Start ingesting, steps:
+      //   1) read all existing stage files
+      //   2) read all stage files to collect input files for data loading
+      //   3) add a new segment entry in table status as INSERT_IN_PROGRESS,
+      //   4) write all existing stage file names into a new snapshot file
+      //   5) do actual loading
+      //   6) write segment file and update segment state to SUCCESS in table 
status
+      //   7) delete stage files used for loading
+      //   8) delete the snapshot file
+
+      // 1) read all existing stage files
+      val stageFiles = listStageFiles(stagePath, hadoopConf)
+      if (stageFiles.isEmpty) {
+        // no stage files, so do nothing
+        LOGGER.warn("files not found under stage metadata folder")
+        return Seq.empty
+      }
+
+      // 2) read all stage files to collect input files for data loading
+      // create a thread pool to read them
+      val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
+      val executorService = Executors.newFixedThreadPool(numThreads)
+      val stageInputs = collectStageInputs(executorService, stageFiles)
+
+      // 3) add new segment with INSERT_IN_PROGRESS into table status
+      loadModel = 
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
+      CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
+
+      // 4) write all existing stage file names and segmentId into a new 
snapshot file
+      // The content of snapshot file is: first line is segmentId, followed by 
each line is
+      // one stage file name
+      val content =
+        (Seq(loadModel.getSegmentId) ++ 
stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
+      FileFactory.writeFile(content, snapshotFilePath)
+
+      // 5) perform data loading
+      startLoading(spark, table, loadModel, stageInputs)
+
+      // 6) write segment file and update the segment entry to SUCCESS
+      val segmentFileName = SegmentFileStore.writeSegmentFile(
+        table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+      SegmentFileStore.updateTableStatusFile(
+        table, loadModel.getSegmentId, segmentFileName,
+        table.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(table.getTablePath, segmentFileName),
+        SegmentStatus.SUCCESS)
+
+      // 7) delete stage files
+      deleteStageFiles(executorService, stageFiles)
+
+      // 8) delete the snapshot file
+      FileFactory.getCarbonFile(snapshotFilePath).delete()
+    } catch {
+      case ex: Throwable =>
+        LOGGER.error(s"failed to insert 
${table.getDatabaseName}.${table.getTableName}", ex)
+        if (loadModel != null) {
+          CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
+        }
+        throw ex
+    } finally {
+      lock.unlock()
+    }
+    Seq.empty
+  }
+
+  /**
+   * Check whether there was failure in previous ingest process and try to 
recover
+   */
+  private def recoverIfRequired(
+      snapshotFilePath: String,
+      table: CarbonTable,
+      conf: Configuration): Unit = {
+    if (!FileFactory.isFileExist(snapshotFilePath)) {
+      // everything is fine
+      return
+    }
+
+    // something wrong, read the snapshot file and do recover steps
+    // 1. check segment state in table status file
+    // 2. If in SUCCESS state, delete all stage files read inn snapshot file
+    // 3. If in IN_PROGRESS state, delete the entry in table status and load 
again
+    LOGGER.info(s"snapshot file found ($snapshotFilePath), start recovery 
process")
+    val lines = FileFactory.readLinesInFile(snapshotFilePath, conf)
+    if (lines.size() < 2) {
+      throw new RuntimeException("Invalid snapshot file, " + lines.size() + " 
lines")
+    }
+
+    val segmentId = lines.get(0)
+    val stageFileNames = lines.remove(0)
+    LOGGER.info(s"Segment $segmentId need recovery, ${stageFileNames.length} 
stage files")
+
+    // lock the table status
+    var lock = CarbonLockFactory.getCarbonLockObj(
+      table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    if (!lock.lockWithRetries()) {
+      throw new RuntimeException(s"Failed to lock table status for " +
+                                 
s"${table.getDatabaseName}.${table.getTableName}")
+    }
+    try {
+      val segments = SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(table.getTablePath)
+      )
+      val matchedSegment = segments.filter(_.getLoadName.equals(segmentId))
+      if (matchedSegment.length != 1) {
+        throw new RuntimeException("unexpected " + matchedSegment.length + " 
segment found")
+      }
+      matchedSegment(0).getSegmentStatus match {
+        case SegmentStatus.SUCCESS =>
+          // delete all stage files
+          lock.unlock()
+          lock = null
+          LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to 
delete " +
+                      s"${stageFileNames.length} stage files")
+          val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
+          val executorService = Executors.newFixedThreadPool(numThreads)
+          stageFileNames.map { fileName =>
+            executorService.submit(new Runnable {
+              override def run(): Unit = {
+                FileFactory.getCarbonFile(
+                  CarbonTablePath.getStageDir(table.getTablePath) +
+                  CarbonCommonConstants.FILE_SEPARATOR + fileName
+                ).delete()
+              }
+            })
+          }.map { future =>
+            future.get()
+          }
+        case SegmentStatus.INSERT_IN_PROGRESS =>
+          // delete entry in table status and load again
+          LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state, 
about to delete the " +
+                      s"segment entry and load again")
+          val segmentToWrite = 
segments.filterNot(_.getLoadName.equals(segmentId))
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+            CarbonTablePath.getTableStatusFilePath(table.getTablePath),
+            segmentToWrite)
+        case other =>
+          throw new RuntimeException(s"Segment $segmentId is in unexpected 
state: $other")
+      }
+    } finally {
+      if (lock != null) {
+        lock.unlock()
+      }
+    }
+    LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath")
+    FileFactory.getCarbonFile(snapshotFilePath).delete()
+  }
+
+  /**
+   * Start global sort loading
+   */
+  private def startLoading(
+      spark: SparkSession,
+      table: CarbonTable,
+      loadModel: CarbonLoadModel,
+      stageInput: Seq[StageInput]
+  ): Unit = {
+    val splits = stageInput.flatMap(_.createSplits().asScala)
+    LOGGER.info(s"start to load ${splits.size} files into " +
+                s"${table.getDatabaseName}.${table.getTableName}")
+    val start = System.currentTimeMillis()
+    val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, 
table, splits)
+    DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+      spark,
+      Option(dataFrame),
+      loadModel,
+      SparkSQLUtil.sessionState(spark).newHadoopConf()
+    ).map { row =>
+        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+      }
+
+    LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() 
- start}ms")
+  }
+
+  /**
+   * Read stage files and return input files
+   */
+  private def collectStageInputs(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]
+  ): Seq[StageInput] = {
+    val startTime = System.currentTimeMillis()
+    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    val gson = new Gson()
+    stageFiles.map { stage =>
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          val filePath = stage._1.getAbsolutePath
+          val stream = FileFactory.getDataInputStream(filePath)
+          try {
+            val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
+            output.add(stageInput)
+          } finally {
+            stream.close()
+          }
+        }
+      })
+    }.map { future =>
+      future.get()
+    }
+    LOGGER.info(s"read stage files taken ${System.currentTimeMillis() - 
startTime}ms")
+    output.asScala
+  }
+
+  /**
+   * Delete stage file and success file
+   */
+  private def deleteStageFiles(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    stageFiles.map { files =>
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          files._1.delete()
+          files._2.delete()
+        }
+      })
+    }.map { future =>
+      future.get()
+    }
+    LOGGER.info(s"finished to delete stage files, time taken: " +
+                s"${System.currentTimeMillis() - startTime}ms")
+  }
+
+  /*
+   * Collect all stage files and matched success files.
+   * A stage file without success file will not be collected
+   */
+  private def listStageFiles(
+      loadDetailsDir: String,
+      hadoopConf: Configuration
+  ): Array[(CarbonFile, CarbonFile)] = {
+    val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
+    if (dir.exists()) {
+      val allFiles = dir.listFiles()
+      val successFiles = allFiles.filter { file =>
+        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.map { file =>
+        (file.getName.substring(0, file.getName.indexOf(".")), file)
+      }.toMap
+      allFiles.filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.filter { file =>
+        successFiles.contains(file.getName)
+      }.map { file =>
+        (file, successFiles(file.getName))
+      }
+    } else {
+      Array.empty
+    }
+  }
+
+  /**
+   * INGEST operation does not support concurrent, so it is one lock for one 
table
+   */
+  private def acquireIngestLock(table: CarbonTable): ICarbonLock = {
+    val tableIdentifier = table.getAbsoluteTableIdentifier
+    val lock = CarbonLockFactory.getCarbonLockObj(tableIdentifier, 
LockUsage.INGEST_LOCK)
+    val retryCount = CarbonLockUtil.getLockProperty(
+      CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK,
+      CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT
+    )
+    val maxTimeout = CarbonLockUtil.getLockProperty(
+      CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+      CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT
+    )
+    if (lock.lockWithRetries(retryCount, maxTimeout)) {
+      lock
+    } else {
+      throw new IOException(
+        s"Not able to acquire the lock for table status file for 
$tableIdentifier")
+    }
+  }
+
+  override protected def opName: String = "INSERT STAGE"
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 81923ea..7e0b13e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -77,7 +77,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val startCommand: Parser[LogicalPlan] =
     loadManagement | showLoads | alterTable | restructure | updateTable | 
deleteRecords |
     datamapManagement | alterTableFinishStreaming | stream | cli |
-    cacheManagement | alterDataMap
+    cacheManagement | alterDataMap | insertStageData
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | 
addLoad
@@ -458,6 +458,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         CarbonAddLoadCommand(dbName, tableName, optionsList.toMap)
     }
 
+  /**
+   * INSERT INTO [dbName.]tableName STAGE
+   */
+  protected lazy val insertStageData: Parser[LogicalPlan] =
+    INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ {
+      case dbName ~ tableName =>
+        CarbonInsertFromStageCommand(dbName, tableName)
+    }
+
   protected lazy val cleanFiles: Parser[LogicalPlan] =
     CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
       case databaseName ~ tableName =>
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1a40ff4..622dfaa 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -163,6 +163,24 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * Append a new load metadata into table status file
+   *
+   * @param loadModel load model
+   * @return boolean which determines whether status update is done or not
+   * @throws IOException
+   */
+  public static boolean recordNewLoadMetadata(CarbonLoadModel loadModel) 
throws IOException {
+    LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
+    loadModel.setFactTimeStamp(System.currentTimeMillis());
+    CarbonLoaderUtil.populateNewLoadMetaEntry(
+        newLoadMetaEntry,
+        SegmentStatus.INSERT_IN_PROGRESS,
+        loadModel.getFactTimeStamp(),
+        false);
+    return recordNewLoadMetadata(newLoadMetaEntry, loadModel, true, false);
+  }
+
+  /**
    * This API deletes the content of the non Transactional Tables when insert 
overwrite is set true.
    *
    * @param loadModel
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index 7929810..2732d3a 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -319,7 +319,8 @@ public class CarbonSchemaReader {
     ByteBuffer buffer =
         
fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath), 
fileSize - 8, 8);
     fileReader.finish();
-    CarbonFooterReaderV3 footerReader = new CarbonFooterReaderV3(dataFilePath, 
buffer.getLong());
+    CarbonFooterReaderV3 footerReader =
+        new CarbonFooterReaderV3(dataFilePath, fileSize, buffer.getLong());
     FileFooter3 footer = footerReader.readFooterVersion3();
     if (null != footer.getExtra_info()) {
       return 
footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO)
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 533480c..ef2ec5e 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -428,7 +428,7 @@ public class CSVCarbonWriterTest {
           dataFile.getPath()), dataFile.getSize() - 8, 8);
       fileReader.finish();
       CarbonFooterReaderV3 footerReader =
-          new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
buffer.getLong());
+          new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
dataFile.getSize(), buffer.getLong());
       FileFooter3 footer = footerReader.readFooterVersion3();
       Assert.assertEquals(2, footer.blocklet_index_list.size());
       Assert.assertEquals(2, footer.blocklet_info_list3.size());
diff --git a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java 
b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
index 4ed3945..a1fdad5 100644
--- a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
+++ b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
@@ -170,7 +170,7 @@ class DataFile {
     this.footerOffset = buffer.getLong();
     this.footerSizeInBytes = this.fileSizeInBytes - footerOffset;
     CarbonFooterReaderV3 footerReader =
-        new CarbonFooterReaderV3(dataFile.getAbsolutePath(), footerOffset);
+        new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
dataFile.getSize(), footerOffset);
     return footerReader.readFooterVersion3();
   }
 

Reply via email to