This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b0bdab2   [CARBONDATA-3640][CARBONDATA-3557] Support flink ingest 
carbon partition table
b0bdab2 is described below

commit b0bdab2597dd658eceaea0b87672c76e06eaf340
Author: liuzhi <371684...@qq.com>
AuthorDate: Mon Dec 30 10:05:52 2019 +0800

     [CARBONDATA-3640][CARBONDATA-3557] Support flink ingest carbon partition 
table
    
     Add support for flink carbon sink to write partitioned carbondata files
     as stage files.
     Add support for INSERT STAGE command to load stage files into CarbonData
     table.
    
     This closes #3542
---
 .../carbondata/core/statusmanager/StageInput.java  |  58 ++++++
 .../apache/carbondata/core/util/DataTypeUtil.java  |  23 ++-
 .../org/apache/carbon/flink/ProxyFileWriter.java   |   4 +-
 .../carbon/flink/ProxyFileWriterFactory.java       |   2 +-
 .../org/apache/carbon/flink/ProxyRecoverable.java  |  18 +-
 .../carbon/flink/ProxyRecoverableOutputStream.java |   6 +-
 .../carbon/flink/ProxyRecoverableSerializer.java   |   8 +-
 .../apache/carbon/flink/CarbonLocalProperty.java   |   2 +
 .../org/apache/carbon/flink/CarbonLocalWriter.java | 150 +++++++-------
 .../carbon/flink/CarbonLocalWriterFactory.java     |  64 +-----
 .../org/apache/carbon/flink/CarbonS3Property.java  |   2 +
 .../org/apache/carbon/flink/CarbonS3Writer.java    | 124 ++++++------
 .../apache/carbon/flink/CarbonS3WriterFactory.java |  71 +------
 .../java/org/apache/carbon/flink/CarbonWriter.java | 221 ++++++++++++++++++++-
 .../apache/carbon/flink/CarbonWriterFactory.java   |   8 +-
 ...riter.scala => TestCarbonPartitionWriter.scala} | 123 +++++++++---
 .../org/apache/carbon/flink/TestCarbonWriter.scala |  10 +-
 .../scala/org/apache/carbon/flink/TestSource.scala |  16 +-
 .../management/CarbonInsertFromStageCommand.scala  | 137 ++++++++++++-
 .../command/management/CarbonLoadDataCommand.scala |  44 ++--
 20 files changed, 760 insertions(+), 331 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
index b4bf084..10dd51d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -39,6 +39,12 @@ public class StageInput {
    */
   private Map<String, Long> files;
 
+  /**
+   * this list of partition data information in this StageInput
+   * @see PartitionLocation
+   */
+  private List<PartitionLocation> locations;
+
   public StageInput() {
 
   }
@@ -48,6 +54,11 @@ public class StageInput {
     this.files = files;
   }
 
+  public StageInput(String base, List<PartitionLocation> locations) {
+    this.base = base;
+    this.locations = locations;
+  }
+
   public String getBase() {
     return base;
   }
@@ -64,6 +75,14 @@ public class StageInput {
     this.files = files;
   }
 
+  public List<PartitionLocation> getLocations() {
+    return this.locations;
+  }
+
+  public void setLocations(final List<PartitionLocation> locations) {
+    this.locations = locations;
+  }
+
   public List<InputSplit> createSplits() {
     return
         files.entrySet().stream().filter(
@@ -75,4 +94,43 @@ public class StageInput {
         ).collect(Collectors.toList());
   }
 
+  public static final class PartitionLocation {
+
+    public PartitionLocation() {
+
+    }
+
+    public PartitionLocation(final Map<String, String> partitions, final 
Map<String, Long> files) {
+      this.partitions = partitions;
+      this.files = files;
+    }
+
+    /**
+     * the list of (partitionColumn, partitionValue) of this partition.
+     */
+    private Map<String, String> partitions;
+
+    /**
+     * the list of (file, length) in this partition.
+     */
+    private Map<String, Long> files;
+
+    public Map<String, String> getPartitions() {
+      return this.partitions;
+    }
+
+    public void setPartitions(final Map<String, String> partitions) {
+      this.partitions = partitions;
+    }
+
+    public Map<String, Long> getFiles() {
+      return this.files;
+    }
+
+    public void setFiles(final Map<String, Long> files) {
+      this.files = files;
+    }
+
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index a33f2d4..c07f08b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -75,7 +75,7 @@ public final class DataTypeUtil {
   /**
    * DataType converter for different computing engines
    */
-  private static DataTypeConverter converter;
+  private static final ThreadLocal<DataTypeConverter> converter = new 
ThreadLocal<>();
 
   /**
    * This method will convert a given value to its specific type
@@ -105,7 +105,7 @@ public final class DataTypeUtil {
           new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return converter.convertFromBigDecimalToDecimal(decimal);
+        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -144,7 +144,7 @@ public final class DataTypeUtil {
           new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return converter.convertFromBigDecimalToDecimal(decimal);
+        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -457,7 +457,7 @@ public final class DataTypeUtil {
       }
     } else {
       // Default action for String/Varchar
-      return converter.convertFromStringToUTF8String(dimensionValue);
+      return 
getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
     }
   }
 
@@ -974,7 +974,7 @@ public final class DataTypeUtil {
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
     if (converterLocal != null) {
-      converter = converterLocal;
+      converter.set(converterLocal);
       timeStampformatter.remove();
       dateformatter.remove();
     }
@@ -989,10 +989,17 @@ public final class DataTypeUtil {
   }
 
   public static DataTypeConverter getDataTypeConverter() {
-    if (converter == null) {
-      converter = new DataTypeConverterImpl();
+    DataTypeConverter dataTypeConverter = converter.get();
+    if (dataTypeConverter == null) {
+      synchronized (converter) {
+        dataTypeConverter = converter.get();
+        if (dataTypeConverter == null) {
+          dataTypeConverter = new DataTypeConverterImpl();
+          converter.set(dataTypeConverter);
+        }
+      }
     }
-    return converter;
+    return dataTypeConverter;
   }
 
   public static DataType valueOf(String name) {
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
index f8ef039..58f9c1d 100644
--- 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
@@ -25,7 +25,9 @@ public abstract class ProxyFileWriter<OUT> implements 
BulkWriter<OUT> {
 
   public abstract ProxyFileWriterFactory getFactory();
 
-  public abstract String getPartition();
+  public abstract String getIdentifier();
+
+  public abstract String getPath();
 
   public abstract void commit() throws IOException;
 
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
index bb683a3..8fff763 100644
--- 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
@@ -74,7 +74,7 @@ public abstract class ProxyFileWriterFactory<OUT> implements 
BulkWriter.Factory<
     this.configuration = configuration;
   }
 
-  public abstract ProxyFileWriter<OUT> create(String partition) throws 
IOException;
+  public abstract ProxyFileWriter<OUT> create(String identifier, String path) 
throws IOException;
 
   public static class Configuration implements Serializable {
 
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
index 11e5d5e..3a89d34 100644
--- 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
@@ -25,18 +25,22 @@ public final class ProxyRecoverable
   public ProxyRecoverable(
       final String writerType,
       final ProxyFileWriterFactory.Configuration writerConfiguration,
-      final String partition
+      final String writerIdentifier,
+      final String writePath
   ) {
     this.writerType = writerType;
     this.writerConfiguration = writerConfiguration;
-    this.partition = partition;
+    this.writerIdentifier = writerIdentifier;
+    this.writePath = writePath;
   }
 
   private final String writerType;
 
   private final ProxyFileWriterFactory.Configuration writerConfiguration;
 
-  private final String partition;
+  private final String writerIdentifier;
+
+  private final String writePath;
 
   public String getWriterType() {
     return this.writerType;
@@ -46,8 +50,12 @@ public final class ProxyRecoverable
     return this.writerConfiguration;
   }
 
-  public String getPartition() {
-    return this.partition;
+  public String getWriterIdentifier() {
+    return this.writerIdentifier;
+  }
+
+  public String getWritePath() {
+    return this.writePath;
   }
 
 }
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
index 1e59209..18c10d7 100644
--- 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
@@ -82,7 +82,8 @@ public final class ProxyRecoverableOutputStream extends 
RecoverableFsDataOutputS
         new ProxyRecoverable(
             this.writer.getFactory().getType(),
             this.writer.getFactory().getConfiguration(),
-            this.writer.getPartition()
+            this.writer.getIdentifier(),
+            this.writer.getPath()
         )
     );
   }
@@ -118,7 +119,8 @@ public final class ProxyRecoverableOutputStream extends 
RecoverableFsDataOutputS
         throw new UnsupportedOperationException();
       }
       
writerFactory.setConfiguration(this.recoverable.getWriterConfiguration());
-      return writerFactory.create(this.recoverable.getPartition());
+      return writerFactory.create(this.recoverable.getWriterIdentifier(),
+          this.recoverable.getWritePath());
     }
 
   }
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
index 8bf80bd..a79df8d 100644
--- 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
@@ -53,7 +53,8 @@ public final class ProxyRecoverableSerializer
     final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
     serializeString(byteBuffer, proxyRecoverable.getWriterType());
     serializeConfiguration(byteBuffer, 
proxyRecoverable.getWriterConfiguration());
-    serializeString(byteBuffer, proxyRecoverable.getPartition());
+    serializeString(byteBuffer, proxyRecoverable.getWriterIdentifier());
+    serializeString(byteBuffer, proxyRecoverable.getWritePath());
     final byte[] bytes = new byte[byteBuffer.position()];
     byteBuffer.position(0);
     byteBuffer.get(bytes);
@@ -113,8 +114,9 @@ public final class ProxyRecoverableSerializer
     final String writerType = deserializeString(byteBuffer);
     final ProxyFileWriterFactory.Configuration writerConfiguration =
         deserializeConfiguration(byteBuffer);
-    final String partition = deserializeString(byteBuffer);
-    return new ProxyRecoverable(writerType, writerConfiguration, partition);
+    final String writerIdentifier = deserializeString(byteBuffer);
+    final String writePath = deserializeString(byteBuffer);
+    return new ProxyRecoverable(writerType, writerConfiguration, 
writerIdentifier, writePath);
   }
 
   private static ProxyFileWriterFactory.Configuration deserializeConfiguration(
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
index 3383e8c..c1be532 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
@@ -23,6 +23,8 @@ public final class CarbonLocalProperty {
 
   public static final String DATA_PATH = "carbon.writer.local.data.path";
 
+  static final String COMMIT_THRESHOLD = 
"carbon.writer.local.commit.threshold";
+
   private CarbonLocalProperty() {
     // private constructor.
   }
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index dcfe8b3..c24c3bf 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -19,12 +19,14 @@ package org.apache.carbon.flink;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.StageInput;
@@ -32,6 +34,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.carbon.core.metadata.StageManager;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Logger;
 
@@ -42,48 +45,63 @@ final class CarbonLocalWriter extends CarbonWriter {
 
   CarbonLocalWriter(
       final CarbonLocalWriterFactory factory,
+      final String identifier,
       final CarbonTable table,
-      final org.apache.carbondata.sdk.file.CarbonWriter writer,
-      final String writePath,
-      final String writePartition
+      final String writePath
   ) {
-    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Open writer. " + this.toString());
-    }
-    this.factory = factory;
-    this.table = table;
-    this.writer = writer;
+    super(factory, identifier, table);
+    final Properties writerProperties = 
factory.getConfiguration().getWriterProperties();
+    final String commitThreshold =
+        writerProperties.getProperty(CarbonLocalProperty.COMMIT_THRESHOLD);
+    this.writerFactory = new WriterFactory(table, writePath) {
+      @Override
+      protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
+          final Object[] row) {
+        try {
+          return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+              .outputPath(super.getWritePath(row))
+              .writtenBy("flink")
+              
.withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
+              .withCsvInput()
+              .build();
+        } catch (IOException | InvalidLoadOptionException exception) {
+          // TODO
+          throw new UnsupportedOperationException(exception);
+        }
+      }
+    };
     this.writePath = writePath;
-    this.writePartition = writePartition;
+    this.writeCommitThreshold =
+        commitThreshold == null ? Long.MAX_VALUE : 
Long.parseLong(commitThreshold);
+    this.writeCount = new AtomicLong(0);
     this.flushed = true;
   }
 
-  private final CarbonLocalWriterFactory factory;
-
-  private final CarbonTable table;
-
-  private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+  private final WriterFactory writerFactory;
 
   private final String writePath;
 
-  private final String writePartition;
+  private final long writeCommitThreshold;
 
-  private volatile boolean flushed;
+  private final AtomicLong writeCount;
 
-  @Override
-  public CarbonLocalWriterFactory getFactory() {
-    return this.factory;
-  }
+  private volatile boolean flushed;
 
   @Override
-  public String getPartition() {
-    return this.writePartition;
+  public String getPath() {
+    return this.writePath;
   }
 
   @Override
-  public void addElement(final String element) throws IOException {
-    this.writer.write(element);
+  public void addElement(final Object[] element) throws IOException {
+    this.writerFactory.getWriter(element).write(element);
+    this.writeCount.incrementAndGet();
+    if (this.writeCount.get() >= this.writeCommitThreshold) {
+      this.closeWriters();
+      this.commit();
+      this.writerFactory.reset();
+      this.writeCount.set(0);
+    }
     this.flushed = false;
   }
 
@@ -94,7 +112,7 @@ final class CarbonLocalWriter extends CarbonWriter {
     }
     synchronized (this) {
       if (!this.flushed) {
-        this.writer.close();
+        this.closeWriters();
         this.flushed = true;
       }
     }
@@ -116,23 +134,27 @@ final class CarbonLocalWriter extends CarbonWriter {
       LOGGER.debug("Commit write. " + this.toString());
     }
     try {
-      final Properties writerProperties = 
this.factory.getConfiguration().getWriterProperties();
+      final Properties writerProperties =
+          this.getFactory().getConfiguration().getWriterProperties();
       String dataPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_PATH);
       if (dataPath == null) {
         throw new IllegalArgumentException(
                 "Writer property [" + CarbonLocalProperty.DATA_PATH + "] is 
not set."
         );
       }
-      dataPath = dataPath + this.table.getDatabaseName() + "/"
-          + this.table.getTableName() + "/" + this.writePartition + "/";
-      Map<String, Long> fileList =
-              this.uploadSegmentDataFiles(this.writePath + 
"Fact/Part0/Segment_null/", dataPath);
+      dataPath = dataPath + this.table.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR
+          + this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR;
+      tryCreateLocalDirectory(new File(dataPath));
+      StageInput stageInput = this.uploadSegmentDataFiles(this.writePath, 
dataPath);
+      if (stageInput == null) {
+        return;
+      }
       try {
-        String stageDir = CarbonTablePath.getStageDir(
-            table.getAbsoluteTableIdentifier().getTablePath());
-        tryCreateLocalDirectory(new File(stageDir));
-        String stageInputPath = stageDir + "/" + this.writePartition;
-        StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, 
fileList));
+        String stageInputPath = CarbonTablePath.getStageDir(
+            table.getAbsoluteTableIdentifier().getTablePath()) +
+            CarbonCommonConstants.FILE_SEPARATOR + UUID.randomUUID();
+        tryCreateLocalDirectory(new File(stageInputPath));
+        StageManager.writeStageInput(stageInputPath, stageInput);
       } catch (Throwable exception) {
         this.deleteSegmentDataFilesQuietly(dataPath);
         throw exception;
@@ -148,13 +170,13 @@ final class CarbonLocalWriter extends CarbonWriter {
 
   @Override
   public void close() {
-    if (this.writer == null) {
+    if (this.writerFactory == null) {
       return;
     }
     try {
       synchronized (this) {
         if (!this.flushed) {
-          this.writer.close();
+          this.closeWriters();
           this.flushed = true;
         }
       }
@@ -169,33 +191,15 @@ final class CarbonLocalWriter extends CarbonWriter {
     }
   }
 
-  private Map<String, Long> uploadSegmentDataFiles(final String localPath, 
final String remotePath)
-          throws IOException {
-    final File[] files = new File(localPath).listFiles();
-    if (files == null) {
-      return new HashMap<>(0);
+  private void closeWriters() throws IOException {
+    if (this.writerFactory == null) {
+      return;
     }
-    Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
-    for (File file : files) {
-      fileNameMapLength.put(file.getName(), file.length());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
-      }
-      try {
-        final File remoteFile = new File(remotePath + file.getName());
-        if (!remoteFile.exists()) {
-          tryCreateLocalFile(remoteFile);
-        }
-        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
-      } catch (CarbonDataWriterException exception) {
-        LOGGER.error(exception.getMessage(), exception);
-        throw exception;
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
-      }
+    final List<org.apache.carbondata.sdk.file.CarbonWriter> writers =
+        this.writerFactory.getWriters();
+    for (org.apache.carbondata.sdk.file.CarbonWriter writer : writers) {
+      writer.close();
     }
-    return fileNameMapLength;
   }
 
   private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
@@ -206,18 +210,6 @@ final class CarbonLocalWriter extends CarbonWriter {
     }
   }
 
-  private static void tryCreateLocalFile(final File file) throws IOException {
-    if (file.exists()) {
-      return;
-    }
-    if (file.getParentFile() != null) {
-      tryCreateLocalDirectory(file.getParentFile());
-    }
-    if (!file.createNewFile()) {
-      throw new IOException("File [" + file.getCanonicalPath() + "] is 
exist.");
-    }
-  }
-
   private static void tryCreateLocalDirectory(final File file) throws 
IOException {
     if (file.exists()) {
       return;
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
index 4c24a8b..c1f1e99 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
@@ -18,20 +18,10 @@
 package org.apache.carbon.flink;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.sdk.file.CarbonWriter;
-import org.apache.carbondata.sdk.file.Schema;
 
 public final class CarbonLocalWriterFactory extends CarbonWriterFactory {
 
@@ -51,58 +41,16 @@ public final class CarbonLocalWriterFactory extends 
CarbonWriterFactory {
               "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is 
not set."
       );
     }
-    final String writePartition = UUID.randomUUID().toString().replace("-", 
"");
-    final String writePath = writeTempPath + "_" + writePartition + "/";
+    final String writerIdentifier = UUID.randomUUID().toString();
+    final String writePath = writeTempPath + writerIdentifier.replace("-", "") 
+ "/";
     final CarbonTable table = this.getTable();
-    final CarbonTable clonedTable =
-        
CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
-    clonedTable.getTableInfo().setTablePath(writePath);
-    final org.apache.carbondata.sdk.file.CarbonWriter writer;
-    try {
-      writer = CarbonWriter.builder()
-          .outputPath("")
-          .writtenBy("flink")
-          .withTable(clonedTable)
-          .withTableProperties(this.getTableProperties())
-          .withJsonInput(this.getTableSchema(clonedTable))
-          .build();
-    } catch (InvalidLoadOptionException exception) {
-      // TODO
-      throw new UnsupportedOperationException(exception);
-    }
-    return new CarbonLocalWriter(this, table, writer, writePath, 
writePartition);
+    return new CarbonLocalWriter(this, writerIdentifier, table, writePath);
   }
 
   @Override
-  protected CarbonLocalWriter create0(final String partition) throws 
IOException {
-    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
-    final String writeTempPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH);
-    if (writeTempPath == null) {
-      throw new IllegalArgumentException(
-              "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is 
not set."
-      );
-    }
-    final String writePath = writeTempPath + "_" + partition + "/";
-    final CarbonTable table = this.getTable();
-    return new CarbonLocalWriter(this, table, null, writePath, partition);
-  }
-
-  private Schema getTableSchema(final CarbonTable table) {
-    final List<CarbonColumn> columnList = table.getCreateOrderColumn();
-    final List<ColumnSchema> columnSchemaList = new 
ArrayList<>(columnList.size());
-    for (CarbonColumn column : columnList) {
-      columnSchemaList.add(column.getColumnSchema());
-    }
-    return new Schema(columnSchemaList);
-  }
-
-  private Map<String, String> getTableProperties() {
-    final Properties tableProperties = 
this.getConfiguration().getTableProperties();
-    final Map<String, String> tablePropertyMap = new 
HashMap<>(tableProperties.size());
-    for (String propertyName : tableProperties.stringPropertyNames()) {
-      tablePropertyMap.put(propertyName, 
tableProperties.getProperty(propertyName));
-    }
-    return tablePropertyMap;
+  protected CarbonLocalWriter create0(final String identifier, final String 
path)
+      throws IOException {
+    return new CarbonLocalWriter(this, identifier, this.getTable(), path);
   }
 
 }
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
index 6d9d94b..66fa12b 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
@@ -29,6 +29,8 @@ final class CarbonS3Property {
 
   static final String DATA_PATH = "carbon.writer.s3.data.path";
 
+  static final String COMMIT_THRESHOLD = "carbon.writer.s3.commit.threshold";
+
   private CarbonS3Property() {
     // private constructor.
   }
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index e5aeca4..0c8ccbd 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -19,13 +19,14 @@ package org.apache.carbon.flink;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.StageInput;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.carbon.core.metadata.StageManager;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
@@ -45,52 +47,68 @@ final class CarbonS3Writer extends CarbonWriter {
 
   CarbonS3Writer(
       final CarbonS3WriterFactory factory,
+      final String identifier,
       final CarbonTable table,
-      final org.apache.carbondata.sdk.file.CarbonWriter writer,
       final String writePath,
-      final String writePartition,
       final Configuration configuration
   ) {
-    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Open writer. " + this.toString());
-    }
-    this.factory = factory;
-    this.table = table;
-    this.writer = writer;
+    super(factory, identifier, table);
+    final Properties writerProperties = 
factory.getConfiguration().getWriterProperties();
+    final String commitThreshold =
+        writerProperties.getProperty(CarbonS3Property.COMMIT_THRESHOLD);
+    this.writerFactory = new WriterFactory(table, writePath) {
+      @Override
+      protected org.apache.carbondata.sdk.file.CarbonWriter newWriter(
+          final Object[] row) {
+        try {
+          return org.apache.carbondata.sdk.file.CarbonWriter.builder()
+              .outputPath(super.getWritePath(row))
+              .writtenBy("flink")
+              
.withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath()))
+              .withCsvInput()
+              .withHadoopConf(configuration)
+              .build();
+        } catch (IOException | InvalidLoadOptionException exception) {
+          // TODO
+          throw new UnsupportedOperationException(exception);
+        }
+      }
+    };
     this.writePath = writePath;
-    this.writePartition = writePartition;
+    this.writeCommitThreshold =
+        commitThreshold == null ? Long.MAX_VALUE : 
Long.parseLong(commitThreshold);
+    this.writeCount = new AtomicLong(0);
     this.configuration = configuration;
     this.flushed = true;
   }
 
-  private final CarbonS3WriterFactory factory;
-
-  private final CarbonTable table;
-
-  private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+  private final WriterFactory writerFactory;
 
   private final String writePath;
 
-  private final String writePartition;
+  private final long writeCommitThreshold;
+
+  private final AtomicLong writeCount;
 
   private final Configuration configuration;
 
   private volatile boolean flushed;
 
   @Override
-  public CarbonS3WriterFactory getFactory() {
-    return this.factory;
+  public String getPath() {
+    return this.writePath;
   }
 
   @Override
-  public String getPartition() {
-    return this.writePartition;
-  }
-
-  @Override
-  public void addElement(final String element) throws IOException {
-    this.writer.write(element);
+  public void addElement(final Object[] element) throws IOException {
+    this.writerFactory.getWriter(element).write(element);
+    this.writeCount.incrementAndGet();
+    if (this.writeCount.get() >= this.writeCommitThreshold) {
+      this.closeWriters();
+      this.commit();
+      this.writerFactory.reset();
+      this.writeCount.set(0);
+    }
     this.flushed = false;
   }
 
@@ -101,7 +119,7 @@ final class CarbonS3Writer extends CarbonWriter {
     }
     synchronized (this) {
       if (!this.flushed) {
-        this.writer.close();
+        this.closeWriters();
         this.flushed = true;
       }
     }
@@ -126,7 +144,8 @@ final class CarbonS3Writer extends CarbonWriter {
     ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
         .getNonSerializableExtraInfo().put("carbonConf", this.configuration);
     try {
-      final Properties writerProperties = 
this.factory.getConfiguration().getWriterProperties();
+      final Properties writerProperties =
+          this.getFactory().getConfiguration().getWriterProperties();
       String dataPath = 
writerProperties.getProperty(CarbonS3Property.DATA_PATH);
       if (dataPath == null) {
         throw new IllegalArgumentException(
@@ -139,15 +158,16 @@ final class CarbonS3Writer extends CarbonWriter {
         );
       }
       dataPath = dataPath + this.table.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR +
-          this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR +
-          this.writePartition + CarbonCommonConstants.FILE_SEPARATOR;
-      Map<String, Long> fileList =
-          this.uploadSegmentDataFiles(this.writePath + 
"Fact/Part0/Segment_null/", dataPath);
+          this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR;
+      StageInput stageInput = this.uploadSegmentDataFiles(this.writePath, 
dataPath);
+      if (stageInput == null) {
+        return;
+      }
       try {
         String stageInputPath = CarbonTablePath.getStageDir(
             table.getAbsoluteTableIdentifier().getTablePath()) +
-            CarbonCommonConstants.FILE_SEPARATOR + this.writePartition;
-        StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, 
fileList));
+            CarbonCommonConstants.FILE_SEPARATOR + UUID.randomUUID();
+        StageManager.writeStageInput(stageInputPath, stageInput);
       } catch (Throwable exception) {
         this.deleteSegmentDataFilesQuietly(dataPath);
         throw exception;
@@ -163,13 +183,13 @@ final class CarbonS3Writer extends CarbonWriter {
 
   @Override
   public void close() {
-    if (this.writer == null) {
+    if (this.writerFactory == null) {
       return;
     }
     try {
       synchronized (this) {
         if (!this.flushed) {
-          this.writer.close();
+          this.closeWriters();
           this.flushed = true;
         }
       }
@@ -184,29 +204,15 @@ final class CarbonS3Writer extends CarbonWriter {
     }
   }
 
-  private Map<String, Long> uploadSegmentDataFiles(
-      final String localPath, final String remotePath) {
-    final File[] files = new File(localPath).listFiles();
-    if (files == null) {
-      return new HashMap<>(0);
+  private void closeWriters() throws IOException {
+    if (this.writerFactory == null) {
+      return;
     }
-    Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
-    for (File file : files) {
-      fileNameMapLength.put(file.getName(), file.length());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
-      }
-      try {
-        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
-      } catch (CarbonDataWriterException exception) {
-        LOGGER.error(exception.getMessage(), exception);
-        throw exception;
-      }
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
-      }
+    final List<org.apache.carbondata.sdk.file.CarbonWriter> writers =
+        this.writerFactory.getWriters();
+    for (org.apache.carbondata.sdk.file.CarbonWriter writer : writers) {
+      writer.close();
     }
-    return fileNameMapLength;
   }
 
   private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
index f1ab483..5cf4ce9 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
@@ -18,21 +18,11 @@
 package org.apache.carbon.flink;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
-import org.apache.carbondata.sdk.file.CarbonWriter;
-import org.apache.carbondata.sdk.file.Schema;
 
 public final class CarbonS3WriterFactory extends CarbonWriterFactory {
 
@@ -49,46 +39,21 @@ public final class CarbonS3WriterFactory extends 
CarbonWriterFactory {
     final String writeTempPath = 
writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
     if (writeTempPath == null) {
       throw new IllegalArgumentException(
-              "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is 
not set."
+          "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is not 
set."
       );
     }
-    final String writePartition = UUID.randomUUID().toString().replace("-", 
"");
-    final String writePath = writeTempPath + "_" + writePartition + "/";
+    final String writerIdentifier = UUID.randomUUID().toString();
+    final String writePath = writeTempPath + writerIdentifier.replace("-", "") 
+ "/";
     final CarbonTable table = this.getTable();
-    final CarbonTable clonedTable =
-        
CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
-    clonedTable.getTableInfo().setTablePath(writePath);
-    final org.apache.hadoop.conf.Configuration configuration = 
this.getS3Configuration();
-    final CarbonWriter writer;
-    try {
-      writer = CarbonWriter.builder()
-          .outputPath("")
-          .writtenBy("flink")
-          .withTable(clonedTable)
-          .withTableProperties(this.getTableProperties())
-          .withJsonInput(this.getTableSchema(clonedTable))
-          .withHadoopConf(configuration)
-          .build();
-    } catch (InvalidLoadOptionException exception) {
-      // TODO
-      throw new UnsupportedOperationException(exception);
-    }
-    return new CarbonS3Writer(this, table, writer, writePath, writePartition, 
configuration);
+    return new CarbonS3Writer(this, writerIdentifier, table,
+        writePath, this.getS3Configuration());
   }
 
   @Override
-  protected CarbonS3Writer create0(final String partition) throws IOException {
-    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
-    final String writeTempPath = 
writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
-    if (writeTempPath == null) {
-      throw new IllegalArgumentException(
-              "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is 
not set."
-      );
-    }
-    final String writePath = writeTempPath + "_" + partition + "/";
-    final CarbonTable table = this.getTable();
-    final org.apache.hadoop.conf.Configuration configuration = 
this.getS3Configuration();
-    return new CarbonS3Writer(this, table, null, writePath, partition, 
configuration);
+  protected CarbonS3Writer create0(final String identifier, final String path)
+      throws IOException {
+    return new CarbonS3Writer(this, identifier, this.getTable(),
+        path, this.getS3Configuration());
   }
 
   @Override
@@ -97,24 +62,6 @@ public final class CarbonS3WriterFactory extends 
CarbonWriterFactory {
     return super.getTable();
   }
 
-  private Schema getTableSchema(final CarbonTable table) {
-    final List<CarbonColumn> columnList = table.getCreateOrderColumn();
-    final List<ColumnSchema> columnSchemaList = new 
ArrayList<>(columnList.size());
-    for (CarbonColumn column : columnList) {
-      columnSchemaList.add(column.getColumnSchema());
-    }
-    return new Schema(columnSchemaList);
-  }
-
-  private Map<String, String> getTableProperties() {
-    final Properties tableProperties = 
this.getConfiguration().getTableProperties();
-    final Map<String, String> tablePropertyMap = new 
HashMap<>(tableProperties.size());
-    for (String propertyName : tableProperties.stringPropertyNames()) {
-      tablePropertyMap.put(propertyName, 
tableProperties.getProperty(propertyName));
-    }
-    return tablePropertyMap;
-  }
-
   private org.apache.hadoop.conf.Configuration getS3Configuration() {
     final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
     final String accessKey = 
writerProperties.getProperty(CarbonS3Property.ACCESS_KEY);
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
index d5ddac5..847b312 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
@@ -17,10 +17,229 @@
 
 package org.apache.carbon.flink;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
 /**
  * This class is a wrapper of CarbonWriter in SDK.
  * It not only write data to carbon with CarbonWriter in SDK, also generate 
segment file.
  */
-public abstract class CarbonWriter extends ProxyFileWriter<String> {
+public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+  public CarbonWriter(final CarbonWriterFactory factory,
+      final String identifier, final CarbonTable table) {
+    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Open writer. " + this.toString());
+    }
+    this.factory = factory;
+    this.identifier = identifier;
+    this.table = table;
+  }
+
+  private final CarbonWriterFactory factory;
+
+  private final String identifier;
+
+  protected final CarbonTable table;
+
+  @Override
+  public CarbonWriterFactory getFactory() {
+    return this.factory;
+  }
+
+  @Override
+  public String getIdentifier() {
+    return this.identifier;
+  }
+
+  /**
+   * @return when there is no data file uploaded, then return 
<code>null</code>.
+   */
+  protected StageInput uploadSegmentDataFiles(final String localPath, final 
String remotePath) {
+    if (!this.table.isHivePartitionTable()) {
+      final File[] files = new File(localPath).listFiles();
+      if (files == null) {
+        return null;
+      }
+      Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+      for (File file : files) {
+        fileNameMapLength.put(file.getName(), file.length());
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath 
+ "] start.");
+        }
+        try {
+          
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+        } catch (CarbonDataWriterException exception) {
+          LOGGER.error(exception.getMessage(), exception);
+          throw exception;
+        }
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
+        }
+      }
+      return new StageInput(remotePath, fileNameMapLength);
+    } else {
+      final List<StageInput.PartitionLocation> partitionLocationList = new 
ArrayList<>();
+      final List<String> partitions = new ArrayList<>();
+      uploadSegmentDataFiles(new File(localPath), remotePath, 
partitionLocationList, partitions);
+      if (partitionLocationList.isEmpty()) {
+        return null;
+      } else {
+        return new StageInput(remotePath, partitionLocationList);
+      }
+    }
+  }
+
+  private static void uploadSegmentDataFiles(
+      final File directory, final String remotePath,
+      final List<StageInput.PartitionLocation> partitionLocationList,
+      final List<String> partitions
+  ) {
+    final File[] files = directory.listFiles();
+    if (files == null) {
+      return;
+    }
+    Map<String, Long> fileNameMapLength = new HashMap<>();
+    for (File file : files) {
+      if (file.isDirectory()) {
+        partitions.add(file.getName());
+        uploadSegmentDataFiles(file, remotePath, partitionLocationList, 
partitions);
+        partitions.remove(partitions.size() - 1);
+        continue;
+      }
+      fileNameMapLength.put(file.getName(), file.length());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
+      }
+      try {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+      } catch (CarbonDataWriterException exception) {
+        LOGGER.error(exception.getMessage(), exception);
+        throw exception;
+      }
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
+      }
+    }
+    if (!fileNameMapLength.isEmpty()) {
+      final Map<String, String> partitionMap = new 
HashMap<>(partitions.size());
+      for (String partition : partitions) {
+        final String[] segments = partition.split("=");
+        partitionMap.put(segments[0].trim(), segments[1].trim());
+      }
+      partitionLocationList.add(
+          new StageInput.PartitionLocation(
+              partitionMap,
+              fileNameMapLength
+          )
+      );
+    }
+  }
+
+  protected abstract static class WriterFactory {
+
+    public WriterFactory(final CarbonTable table, final String writePath) {
+      final List<ColumnSchema> partitionColumns;
+      if (table.getPartitionInfo() == null) {
+        partitionColumns = Collections.emptyList();
+      } else {
+        partitionColumns = table.getPartitionInfo().getColumnSchemaList();
+      }
+      this.table = table;
+      this.partitionColumns = partitionColumns;
+      this.writePath = writePath;
+      this.root = new Node();
+      this.writers = new ArrayList<>();
+    }
+
+    private final CarbonTable table;
+
+    private final List<ColumnSchema> partitionColumns;
+
+    private final String writePath;
+
+    private final Node root;
+
+    private final List<org.apache.carbondata.sdk.file.CarbonWriter> writers;
+
+    public List<org.apache.carbondata.sdk.file.CarbonWriter> getWriters() {
+      return this.writers;
+    }
+
+    public org.apache.carbondata.sdk.file.CarbonWriter getWriter(final 
Object[] row) {
+      Node node = this.root;
+      for (int index = 0; index < this.partitionColumns.size(); index++) {
+        final Object columnValue = 
row[this.partitionColumns.get(index).getSchemaOrdinal()];
+        if (columnValue == null) {
+          // TODO
+          throw new UnsupportedOperationException();
+        }
+        Node child = node.children.get(columnValue);
+        if (child == null) {
+          child = new Node();
+          node.children.put(columnValue, child);
+        }
+        node = child;
+      }
+      if (node.writer == null) {
+        node.writer = this.newWriter(row);
+        this.writers.add(node.writer);
+      }
+      return node.writer;
+    }
+
+    protected String getWritePath(final Object[] row) {
+      if (this.partitionColumns.isEmpty()) {
+        return this.writePath;
+      }
+      final StringBuilder stringBuilder = new StringBuilder();
+      stringBuilder.append(this.writePath);
+      for (int index = 0; index < this.partitionColumns.size(); index++) {
+        final ColumnSchema columnSchema = this.partitionColumns.get(index);
+        final Object columnValue = row[columnSchema.getSchemaOrdinal()];
+        stringBuilder.append(columnSchema.getColumnName());
+        stringBuilder.append("=");
+        stringBuilder.append(columnValue.toString());
+        stringBuilder.append(CarbonCommonConstants.FILE_SEPARATOR);
+      }
+      return stringBuilder.toString();
+    }
+
+    protected abstract org.apache.carbondata.sdk.file.CarbonWriter 
newWriter(final Object[] row);
+
+    public void reset() {
+      this.writers.clear();
+      this.root.children.clear();
+      this.root.writer = null;
+    }
+
+    private static final class Node {
+
+      final Map<Object, Node> children = new HashMap<>();
+
+      org.apache.carbondata.sdk.file.CarbonWriter writer;
+
+    }
+
+  }
 
 }
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
index d3257c9..15c608c 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 
-public abstract class CarbonWriterFactory extends 
ProxyFileWriterFactory<String> {
+public abstract class CarbonWriterFactory extends 
ProxyFileWriterFactory<Object[]> {
 
   public static CarbonWriterFactoryBuilder builder(final String type) {
     return CarbonWriterFactoryBuilder.get(type);
@@ -44,14 +44,14 @@ public abstract class CarbonWriterFactory extends 
ProxyFileWriterFactory<String>
   }
 
   @Override
-  public CarbonWriter create(final String partition) throws IOException {
+  public CarbonWriter create(final String identifier, final String path) 
throws IOException {
     this.setCarbonProperties();
-    return this.create0(partition);
+    return this.create0(identifier, path);
   }
 
   protected abstract CarbonWriter create0() throws IOException;
 
-  protected abstract CarbonWriter create0(String partition) throws IOException;
+  protected abstract CarbonWriter create0(String identifier, String path) 
throws IOException;
 
   protected CarbonTable getTable() throws IOException {
     final Configuration configuration = this.getConfiguration();
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
similarity index 52%
copy from 
integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
copy to 
integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 799c810..cc3c4b4 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -17,11 +17,17 @@
 
 package org.apache.carbon.flink
 
-import java.io.File
-import java.util.Properties
+import java.io.{File, InputStreamReader}
+import java.util
+import java.util.{Collections, Properties}
 
+import com.google.gson.Gson
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
@@ -29,12 +35,11 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Test
 
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import scala.collection.JavaConverters._
 
-class TestCarbonWriter extends QueryTest {
+class TestCarbonPartitionWriter extends QueryTest {
 
-  val tableName = "test_flink"
+  val tableName = "test_flink_partition"
 
   @Test
   def testLocal(): Unit = {
@@ -43,6 +48,8 @@ class TestCarbonWriter extends QueryTest {
       s"""
          | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
          | STORED AS carbondata
+         | PARTITIONED BY (hour_ string, date_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 
'SORT_SCOPE'='GLOBAL_SORT')
       """.stripMargin
     ).collect()
 
@@ -50,7 +57,7 @@ class TestCarbonWriter extends QueryTest {
 
     val dataTempPath = rootPath + "/data/temp/"
     val dataPath = rootPath + "/data/"
-    new File(dataPath).delete()
+    delDir(new File(dataPath))
     new File(dataPath).mkdir()
 
     try {
@@ -60,21 +67,26 @@ class TestCarbonWriter extends QueryTest {
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
-      environment.setParallelism(1)
+      environment.setParallelism(6)
       environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
       val dataCount = 10000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
-        override def get(index: Int): String = {
-          Thread.sleep(1L)
-          "{\"stringField\": \"test" + index + "\", \"intField\": " + index + 
", \"shortField\": 12345}"
+        override def get(index: Int): Array[AnyRef] = {
+          val data = new Array[AnyRef](5)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data(3) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+          data(4) = "20191218"
+          data
         }
 
         @throws[InterruptedException]
         override def onFinish(): Unit = {
-          Thread.sleep(5000L)
+          Thread.sleep(30000L)
         }
       }
       val stream = environment.addSource(source)
@@ -88,7 +100,9 @@ class TestCarbonWriter extends QueryTest {
       )
       val streamSink = StreamingFileSink.forBulkFormat(new 
Path(ProxyFileSystem.DEFAULT_URI), factory).build
 
-      stream.addSink(streamSink)
+      stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+        override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return 
hour_
+      }).addSink(streamSink)
 
       try environment.execute
       catch {
@@ -97,29 +111,40 @@ class TestCarbonWriter extends QueryTest {
           throw new UnsupportedOperationException(exception)
       }
 
+      val dataLocation = dataPath + "default" + 
CarbonCommonConstants.FILE_SEPARATOR +
+        tableName + CarbonCommonConstants.FILE_SEPARATOR
+
+      assertResult(true)(FileFactory.isFileExist(dataLocation))
+      assertResult(false)(FileFactory
+        
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+
+      // ensure the carbon data file count in data directory
+      // is same of the data file count which stage files recorded.
+      
assertResult(true)(FileFactory.getCarbonFile(dataLocation).listFiles().length ==
+        collectStageInputs(CarbonTablePath.getStageDir(tablePath)).map(
+          stageInput =>
+            stageInput.getLocations.asScala.map(location => 
location.getFiles.size()).sum
+        ).sum
+      )
+
       sql(s"INSERT INTO $tableName STAGE")
 
       checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
 
-      // ensure the stage snapshot file and all stage files are deleted
-      
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
-      
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
-
     } finally {
       sql(s"drop table if exists $tableName").collect()
-      new File(dataPath).delete()
+      delDir(new File(dataPath))
     }
   }
 
   private def newWriterProperties(
-                                   dataTempPath: String,
-                                   dataPath: String,
-                                   storeLocation: String) = {
+     dataTempPath: String,
+     dataPath: String,
+     storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
-    properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
-    properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
"1024")
+    properties.setProperty(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
     properties
   }
 
@@ -134,4 +159,56 @@ class TestCarbonWriter extends QueryTest {
     properties
   }
 
+  private def collectStageInputs(loadDetailsDir: String): Seq[StageInput] = {
+    val dir = FileFactory.getCarbonFile(loadDetailsDir)
+    val stageFiles = if (dir.exists()) {
+      val allFiles = dir.listFiles()
+      val successFiles = allFiles.filter { file =>
+        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.map { file =>
+        (file.getName.substring(0, file.getName.indexOf(".")), file)
+      }.toMap
+      allFiles.filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.filter { file =>
+        successFiles.contains(file.getName)
+      }.map { file =>
+        (file, successFiles(file.getName))
+      }
+    } else {
+      Array.empty
+    }
+
+    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    val gson = new Gson()
+    stageFiles.map { stage =>
+      val filePath = stage._1.getAbsolutePath
+      val stream = FileFactory.getDataInputStream(filePath)
+      try {
+        val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
+        output.add(stageInput)
+      } finally {
+        stream.close()
+      }
+    }
+    output.asScala
+  }
+
+  private def delDir(dir: File): Boolean = {
+    if (dir.isDirectory) {
+      val children = dir.list
+      if (children != null) {
+        val length = children.length
+        var i = 0
+        while (i < length) {
+          if (!delDir(new File(dir, children(i)))) {
+              return false
+          }
+          i += 1
+        }
+      }
+    }
+    dir.delete()
+  }
+
 }
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 799c810..67c7bab 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -67,9 +67,13 @@ class TestCarbonWriter extends QueryTest {
       val dataCount = 10000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
-        override def get(index: Int): String = {
+        override def get(index: Int): Array[AnyRef] = {
           Thread.sleep(1L)
-          "{\"stringField\": \"test" + index + "\", \"intField\": " + index + 
", \"shortField\": 12345}"
+          val data = new Array[AnyRef](3)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data
         }
 
         @throws[InterruptedException]
@@ -118,8 +122,6 @@ class TestCarbonWriter extends QueryTest {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
-    properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
-    properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
"1024")
     properties
   }
 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
index 88ac173..d257ce1 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
@@ -1,17 +1,19 @@
 package org.apache.carbon.flink
 
+import java.util.Random
+
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
 import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 
-abstract class TestSource(val dataCount: Int) extends SourceFunction[String] 
with CheckpointedFunction {
+abstract class TestSource(val dataCount: Int) extends 
SourceFunction[Array[AnyRef]] with CheckpointedFunction {
   private var dataIndex = 0
   private var dataIndexState: ListState[Integer] = _
   private var running = false
 
   @throws[Exception]
-  def get(index: Int): String
+  def get(index: Int): Array[AnyRef]
 
   @throws[Exception]
   def onFinish(): Unit = {
@@ -19,7 +21,7 @@ abstract class TestSource(val dataCount: Int) extends 
SourceFunction[String] wit
   }
 
   @throws[Exception]
-  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit 
= {
+  override def run(sourceContext: 
SourceFunction.SourceContext[Array[AnyRef]]): Unit = {
     this.running = true
     while ( {
       this.running && this.dataIndex < this.dataCount
@@ -49,4 +51,12 @@ abstract class TestSource(val dataCount: Int) extends 
SourceFunction[String] wit
       this.dataIndex = dataIndex
     }
   }
+}
+
+object TestSource {
+
+  val randomCache = new ThreadLocal[Random] {
+    override def initialValue(): Random = new Random()
+  }
+
 }
\ No newline at end of file
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 5d7359d..eb63d03 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -26,10 +26,15 @@ import scala.collection.JavaConverters._
 
 import com.google.gson.Gson
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
-import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,14 +42,17 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 /**
  * Collect stage input files and trigger a loading into carbon table.
@@ -137,7 +145,11 @@ case class CarbonInsertFromStageCommand(
       FileFactory.writeFile(content, snapshotFilePath)
 
       // 5) perform data loading
-      startLoading(spark, table, loadModel, stageInputs)
+      if (table.isHivePartitionTable) {
+        startLoadingWithPartition(spark, table, loadModel, stageInputs)
+      } else {
+        startLoading(spark, table, loadModel, stageInputs)
+      }
 
       // 6) write segment file and update the segment entry to SUCCESS
       val segmentFileName = SegmentFileStore.writeSegmentFile(
@@ -269,12 +281,95 @@ case class CarbonInsertFromStageCommand(
       SparkSQLUtil.sessionState(spark).newHadoopConf()
     ).map { row =>
         (row._1, FailureCauses.NONE == row._2._2.failureCauses)
-      }
+    }
 
     LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() 
- start}ms")
   }
 
   /**
+   * Start global sort loading of partition table
+   */
+  private def startLoadingWithPartition(
+      spark: SparkSession,
+      table: CarbonTable,
+      loadModel: CarbonLoadModel,
+      stageInput: Seq[StageInput]
+    ): Unit = {
+    val partitionDataList = listPartitionFiles(stageInput)
+    val start = System.currentTimeMillis()
+    var index = 0
+    partitionDataList.map {
+      case (partition, splits) =>
+        index = index + 1
+        LOGGER.info(s"start to load ${splits.size} files into " +
+          s"${table.getDatabaseName}.${table.getTableName}. " +
+          s"Partition information: ${partition.mkString(",")}")
+        val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits)
+        val columns = dataFrame.columns
+        val header = columns.mkString(",")
+        val selectColumns = columns.filter(!partition.contains(_))
+        val selectedDataFrame = dataFrame.select(selectColumns.head, 
selectColumns.tail: _*)
+
+        val loadCommand = CarbonLoadDataCommand(
+          databaseNameOp = Option(table.getDatabaseName),
+          tableName = table.getTableName,
+          factPathFromUser = null,
+          dimFilesPath = Seq(),
+          options = scala.collection.immutable.Map("fileheader" -> header),
+          isOverwriteTable = false,
+          inputSqlString = null,
+          dataFrame = Some(selectedDataFrame),
+          updateModel = None,
+          tableInfoOp = None,
+          internalOptions = Map.empty,
+          partition = partition
+        )
+        loadCommand.run(spark)
+    }
+    LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() 
- start}ms")
+  }
+
+  /**
+   * @return return a (partitionMap, InputSplits) pair list.
+   *         the partitionMap contains all partition column name and value.
+   *         the InputSplits is all data file information of current partition.
+   */
+  private def listPartitionFiles(
+      stageInputs : Seq[StageInput]
+    ): Seq[(Map[String, Option[String]], Seq[InputSplit])] = {
+    val partitionMap = new util.HashMap[Map[String, Option[String]], 
util.List[InputSplit]]()
+    stageInputs.foreach (
+      stageInput => {
+        val locations = stageInput.getLocations.asScala
+        locations.foreach (
+          location => {
+            val partition = location.getPartitions.asScala.map(t => (t._1, 
Option(t._2))).toMap
+            var splits = partitionMap.get(partition)
+            if (splits == null) {
+              partitionMap.put(partition, new util.ArrayList[InputSplit]())
+              splits = partitionMap.get(partition)
+            }
+            splits.addAll (
+              location.getFiles.asScala
+              .filter(_._1.endsWith(CarbonCommonConstants.FACT_FILE_EXT))
+              .map(
+                file => {
+                  CarbonInputSplit.from(
+                    "-1", "0",
+                    stageInput.getBase + CarbonCommonConstants.FILE_SEPARATOR 
+ file._1, 0,
+                    file._2, ColumnarFormatVersion.V3, null
+                  )
+                }
+              ).toList.asJava
+            )
+          }
+        )
+      }
+    )
+    partitionMap.asScala.map(entry => (entry._1, entry._2.asScala)).toSeq
+  }
+
+  /**
    * Read stage files and return input files
    */
   private def collectStageInputs(
@@ -375,5 +470,37 @@ case class CarbonInsertFromStageCommand(
     }
   }
 
+  /**
+   * create DataFrame basing on specified splits
+   */
+  private def createInputDataFrameOfInternalRow(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      splits: Seq[InputSplit]
+    ): DataFrame = {
+    val columns = carbonTable
+      .getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .toArray
+    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+    val rdd: RDD[Row] = new CarbonScanRDD[InternalRow](
+        sparkSession,
+        columnProjection = new CarbonProjection(columns),
+        null,
+        carbonTable.getAbsoluteTableIdentifier,
+        carbonTable.getTableInfo.serialize,
+        carbonTable.getTableInfo,
+        new CarbonInputMetrics,
+        null,
+        null,
+        classOf[SparkRowReadSupportImpl],
+        splits.asJava
+      ).map { row =>
+        new GenericRow(row.toSeq(schema).toArray)
+      }
+    sparkSession.createDataFrame(rdd, schema)
+  }
+
   override protected def opName: String = "INSERT STAGE"
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 382ed12..fdcb06f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command.management
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -38,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference,
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
@@ -51,30 +50,29 @@ import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants, SortScopeOptions}
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
 import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.indexserver.DistributedRDDUtils
 import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, 
TableProcessingOperations}
-import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, 
LoadOption}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, 
CarbonDataProcessorUtil, CarbonLoaderUtil}
 import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
@@ -898,12 +896,32 @@ case class CarbonLoadDataCommand(
       sortScope: SortScopeOptions.SortScope,
       isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
     // Converts the data as per the loading steps before give it to writer or 
sorter
-    val updatedRdd = convertData(
+    val convertedRdd = convertData(
       rdd,
       sparkSession,
       loadModel,
       isDataFrame,
       partitionValues)
+    val updatedRdd = if (isDataFrame) {
+      val columnCount = loadModel.getCsvHeaderColumns.length
+      convertedRdd.map { row =>
+        val array = new Array[AnyRef](columnCount)
+        val data = row.getData
+        var i = 0
+        while (i < columnCount) {
+          data(i) match {
+            case string: String =>
+              array(i) = UTF8String.fromString(string)
+            case _ =>
+              array(i) = data(i)
+          }
+          i = i + 1
+        }
+        array
+      }.map(row => InternalRow.fromSeq(row))
+    } else {
+      convertedRdd.map(row => InternalRow.fromSeq(row.getData))
+    }
     val catalogAttributes = catalogTable.schema.toAttributes
     var attributes = curAttributes.map(a => {
       catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
@@ -985,7 +1003,7 @@ case class CarbonLoadDataCommand(
       sparkSession: SparkSession,
       model: CarbonLoadModel,
       isDataFrame: Boolean,
-      partitionValues: Array[String]): RDD[InternalRow] = {
+      partitionValues: Array[String]): RDD[CarbonRow] = {
     val sc = sparkSession.sparkContext
     val info =
       
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
@@ -1021,7 +1039,7 @@ case class CarbonLoadDataCommand(
       .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
     val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
-      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+        
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         DataLoadProcessorStepOnSpark.inputAndconvertFunc(
           rows,
           index,
@@ -1029,7 +1047,7 @@ case class CarbonLoadDataCommand(
           partialSuccessAccum,
           inputStepRowCounter,
           keepActualData = true)
-      }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
+      }.filter(_ != null)
 
     finalRDD
   }

Reply via email to