This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b8ac84  [CARBONDATA-3597] Support Merge for SCD and CCD scenarios
5b8ac84 is described below

commit 5b8ac8478847ef6962f39362f8e16f30a9cb6a03
Author: ravipesala <ravi.pes...@gmail.com>
AuthorDate: Mon Dec 30 11:38:36 2019 +0800

    [CARBONDATA-3597] Support Merge for SCD and CCD scenarios
    
    Added dataframe API to merge the datasets online and applies the actions as 
per the conditions.
    
    The supported DataSet API as follows.
    
    targetDS.merge(sourceDS, <condition>).
      whenMatched(<condition>).
      updateExpr(updateMap).
      insertExpr(insertMap_u).
      whenNotMatched(<condition>).
      insertExpr(insertMap).
      whenNotMatchedAndExistsOnlyOnTarget(<condition>).
      delete().
      insertHistoryTableExpr(insertMap_d, <table_name>).
      execute()
    
    This closes #3483
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  53 +-
 .../impl/DictionaryBasedResultCollector.java       |  19 +-
 .../RestructureBasedDictionaryResultCollector.java |   9 +-
 .../collector/impl/RowIdBasedResultCollector.java  |   9 +-
 .../scan/executor/impl/AbstractQueryExecutor.java  |   1 +
 .../scan/executor/infos/BlockExecutionInfo.java    |  14 +
 .../carbondata/core/scan/model/QueryModel.java     |  14 +
 .../core/scan/result/BlockletScannedResult.java    |   1 -
 .../scan/result/impl/FilterQueryScannedResult.java |   2 -
 .../result/impl/NonFilterQueryScannedResult.java   |   1 -
 .../statusmanager/SegmentUpdateStatusManager.java  |  74 ++-
 dev/javastyle-config.xml                           |   3 -
 .../carbondata/hadoop/api/CarbonInputFormat.java   |   8 +-
 .../hadoop/api/CarbonTableInputFormat.java         |  13 +-
 .../spark/testsuite/merge/MergeTestCase.scala      | 501 +++++++++++++++++++
 .../spark/rdd/CarbonDeltaRowScanRDD.scala          |  90 ++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |   2 +-
 .../apache/spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../SparkGenericRowReadSupportImpl.java            |  59 +++
 .../indexserver/DistributedRDDUtils.scala          |   4 +-
 .../scala/org/apache/spark/sql/CarbonSession.scala |  11 +
 .../command/mutation/DeleteExecution.scala         | 260 ++++++----
 .../mutation/merge/CarbonMergeDataSetCommand.scala | 531 +++++++++++++++++++++
 .../merge/CarbonMergeDataSetException.scala        |  33 ++
 .../mutation/merge/HistoryTableLoadHelper.scala    | 136 ++++++
 .../mutation/merge/MergeDataSetBuilder.scala       | 134 ++++++
 .../command/mutation/merge/MergeProjection.scala   | 114 +++++
 .../command/mutation/merge/MutationAction.scala    | 174 +++++++
 .../command/mutation/merge/TranxManager.scala      |  62 +++
 .../command/mutation/merge/interfaces.scala        |  88 ++++
 30 files changed, 2259 insertions(+), 163 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 2b3096e..c9b4360 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -134,30 +134,7 @@ public class CarbonUpdateUtil {
         List<SegmentUpdateDetails> oldList = new 
ArrayList(Arrays.asList(oldDetails));
 
         for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
-          int index = oldList.indexOf(newBlockEntry);
-          if (index != -1) {
-            // update the element in existing list.
-            SegmentUpdateDetails blockDetail = oldList.get(index);
-            if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() || 
(isCompaction)) {
-              blockDetail
-                  
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
-            }
-            
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
-            blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
-            
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
-            // If the start and end time is different then the delta is there 
in multiple files so
-            // add them to the list to get the delta files easily with out 
listing.
-            if (!blockDetail.getDeleteDeltaStartTimestamp()
-                .equals(blockDetail.getDeleteDeltaEndTimestamp())) {
-              
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
-              
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
-            } else {
-              blockDetail.setDeltaFileStamps(null);
-            }
-          } else {
-            // add the new details to the list.
-            oldList.add(newBlockEntry);
-          }
+          mergeSegmentUpdate(isCompaction, oldList, newBlockEntry);
         }
 
         segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, 
updateStatusFileIdentifier);
@@ -180,6 +157,34 @@ public class CarbonUpdateUtil {
     return status;
   }
 
+  public static void mergeSegmentUpdate(boolean isCompaction, 
List<SegmentUpdateDetails> oldList,
+      SegmentUpdateDetails newBlockEntry) {
+    int index = oldList.indexOf(newBlockEntry);
+    if (index != -1) {
+      // update the element in existing list.
+      SegmentUpdateDetails blockDetail = oldList.get(index);
+      if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() || 
isCompaction) {
+        blockDetail
+            
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
+      }
+      
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
+      blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
+      blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
+      // If the start and end time is different then the delta is there in 
multiple files so
+      // add them to the list to get the delta files easily with out listing.
+      if (!blockDetail.getDeleteDeltaStartTimestamp()
+          .equals(blockDetail.getDeleteDeltaEndTimestamp())) {
+        
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
+        
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
+      } else {
+        blockDetail.setDeltaFileStamps(null);
+      }
+    } else {
+      // add the new details to the list.
+      oldList.add(newBlockEntry);
+    }
+  }
+
   /**
    * Update table status
    * @param updatedSegmentsList
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index d011da3..554d11a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -98,6 +98,8 @@ public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollect
   private Map<Integer, Map<CarbonDimension, ByteBuffer>> 
mergedComplexDimensionDataMap =
       new HashMap<>();
 
+  private boolean readOnlyDelta;
+
   public DictionaryBasedResultCollector(BlockExecutionInfo 
blockExecutionInfos) {
     super(blockExecutionInfos);
     queryDimensions = executionInfo.getProjectionDimensions();
@@ -105,7 +107,7 @@ public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollect
     initDimensionAndMeasureIndexesForFillingData();
     isDimensionExists = queryDimensions.length > 0;
     this.comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap();
-
+    this.readOnlyDelta = executionInfo.isReadOnlyDelta();
   }
 
   /**
@@ -136,6 +138,16 @@ public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollect
       }
     }
     while (scannedResult.hasNext() && rowCounter < batchSize) {
+      scannedResult.incrementCounter();
+      if (readOnlyDelta) {
+        if 
(!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+          continue;
+        }
+      } else {
+        if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) 
{
+          continue;
+        }
+      }
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
@@ -151,11 +163,6 @@ public class DictionaryBasedResultCollector extends 
AbstractScannedResultCollect
           fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, 
complexTypeKeyArray,
               comlexDimensionInfoMap, row, i, 
queryDimensions[i].getDimension().getOrdinal());
         }
-      } else {
-        scannedResult.incrementCounter();
-      }
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
-        continue;
       }
       fillMeasureData(scannedResult, row);
       if (isStructQueryType) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 3627e00..522aaf1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -74,6 +74,10 @@ public class RestructureBasedDictionaryResultCollector 
extends DictionaryBasedRe
     Map<Integer, GenericQueryType> comlexDimensionInfoMap =
         executionInfo.getComlexDimensionInfoMap();
     while (scannedResult.hasNext() && rowCounter < batchSize) {
+      scannedResult.incrementCounter();
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+        continue;
+      }
       Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionExists) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
@@ -101,11 +105,6 @@ public class RestructureBasedDictionaryResultCollector 
extends DictionaryBasedRe
               comlexDimensionInfoMap, row, i, executionInfo
                   
.getProjectionDimensions()[segmentDimensionsIdx++].getDimension().getOrdinal());
         }
-      } else {
-        scannedResult.incrementCounter();
-      }
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
-        continue;
       }
       fillMeasureData(scannedResult, row);
       listBasedResult.add(row);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
index 2111b02..7a0732b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
@@ -45,6 +45,10 @@ public class RowIdBasedResultCollector extends 
DictionaryBasedResultCollector {
     byte[][] complexTypeKeyArray;
     int columnCount = queryDimensions.length + queryMeasures.length;
     while (scannedResult.hasNext() && rowCounter < batchSize) {
+      scannedResult.incrementCounter();
+      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+        continue;
+      }
       Object[] row = new Object[columnCount + 3];
       row[columnCount] = scannedResult.getBlockletNumber();
       row[columnCount + 1] = scannedResult.getCurrentPageCounter();
@@ -59,13 +63,8 @@ public class RowIdBasedResultCollector extends 
DictionaryBasedResultCollector {
           fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, 
complexTypeKeyArray,
               comlexDimensionInfoMap, row, i, 
queryDimensions[i].getDimension().getOrdinal());
         }
-      } else {
-        scannedResult.incrementCounter();
       }
       row[columnCount + 2] = scannedResult.getCurrentRowId();
-      if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
-        continue;
-      }
       fillMeasureData(scannedResult, row);
       listBasedResult.add(row);
       rowCounter++;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index c891ba2..ab21819 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -487,6 +487,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
     blockExecutionInfo
         .setTotalNumberDimensionToRead(
             segmentProperties.getDimensionOrdinalToChunkMapping().size());
+    blockExecutionInfo.setReadOnlyDelta(queryModel.isReadOnlyDelta());
     if (queryModel.isReadPageByPage()) {
       blockExecutionInfo.setPrefetchBlocklet(false);
       LOGGER.info("Query prefetch is: false, read page by page");
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 0bd053c..51368dd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -228,6 +228,12 @@ public class BlockExecutionInfo {
   private ReusableDataBuffer[] measureResusableDataBuffer;
 
   /**
+   * It is used to read only the deleted data of a particular version. It will 
be used to get the
+   * old updated/deleted data before update.
+   */
+  private boolean readOnlyDelta;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -659,4 +665,12 @@ public class BlockExecutionInfo {
   public void setMeasureResusableDataBuffer(ReusableDataBuffer[] 
measureResusableDataBuffer) {
     this.measureResusableDataBuffer = measureResusableDataBuffer;
   }
+
+  public boolean isReadOnlyDelta() {
+    return readOnlyDelta;
+  }
+
+  public void setReadOnlyDelta(boolean readOnlyDelta) {
+    this.readOnlyDelta = readOnlyDelta;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d604e15..3f300e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -118,6 +118,12 @@ public class QueryModel {
    */
   private boolean isDirectVectorFill;
 
+  /**
+   * It is used to read only the deleted data of a particular version. It will 
be used to get the
+   * old updated/deleted data before update.
+   */
+  private boolean readOnlyDelta;
+
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     this.table = carbonTable;
@@ -396,6 +402,14 @@ public class QueryModel {
     isDirectVectorFill = directVectorFill;
   }
 
+  public boolean isReadOnlyDelta() {
+    return readOnlyDelta;
+  }
+
+  public void setReadOnlyDelta(boolean readOnlyDelta) {
+    this.readOnlyDelta = readOnlyDelta;
+  }
+
   @Override
   public String toString() {
     return String.format("scan on table %s.%s, %d projection columns with 
filter (%s)",
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index a4c1c6c..6c0ab4d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -247,7 +247,6 @@ public abstract class BlockletScannedResult {
       column = 
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
           .fillSurrogateKey(rowId, column, completeKey);
     }
-    rowCounter++;
     return completeKey;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 245135a..f338888 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -42,7 +42,6 @@ public class FilterQueryScannedResult extends 
BlockletScannedResult {
    */
   @Override
   public byte[] getDictionaryKeyArray() {
-    ++currentRow;
     return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
@@ -52,7 +51,6 @@ public class FilterQueryScannedResult extends 
BlockletScannedResult {
    */
   @Override
   public int[] getDictionaryKeyIntegerArray() {
-    ++currentRow;
     return 
getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index c9f6b0c..98576fa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -51,7 +51,6 @@ public class NonFilterQueryScannedResult extends 
BlockletScannedResult {
    */
   @Override
   public int[] getDictionaryKeyIntegerArray() {
-    ++currentRow;
     return getDictionaryKeyIntegerArray(currentRow);
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 4cefea2..63f9471 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -17,20 +17,8 @@
 
 package org.apache.carbondata.core.statusmanager;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.io.*;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -80,15 +68,30 @@ public class SegmentUpdateStatusManager {
 
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails) {
+    this(table, segmentDetails, null);
+  }
+
+  /**
+   * It takes the updateVersion as one of the parameter. Basically user can 
give on which
+   * updateVersion user can retrieve the data.It is useful to get the history 
changed data
+   * of a particular version.
+   */
+  public SegmentUpdateStatusManager(CarbonTable table,
+      LoadMetadataDetails[] segmentDetails, String updateVersion) {
     this.identifier = table.getAbsoluteTableIdentifier();
     // current it is used only for read function scenarios, as file update 
always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
     updateDetails = readLoadMetadata();
+    updateUpdateDetails(updateVersion);
     populateMap();
   }
 
   public SegmentUpdateStatusManager(CarbonTable table) {
+    this(table, (String) null);
+  }
+
+  public SegmentUpdateStatusManager(CarbonTable table, String updateVersion) {
     this.identifier = table.getAbsoluteTableIdentifier();
     // current it is used only for read function scenarios, as file update 
always requires to work
     // on latest file status.
@@ -104,10 +107,34 @@ public class SegmentUpdateStatusManager {
     } else {
       updateDetails = new SegmentUpdateDetails[0];
     }
+    updateUpdateDetails(updateVersion);
     populateMap();
   }
 
   /**
+   * It adds only the SegmentUpdateDetails of given updateVersion, it is used 
to get the history
+   * data of updated/deleted data.
+   */
+  private void updateUpdateDetails(String updateVersion) {
+    if (updateVersion != null) {
+      List<SegmentUpdateDetails> newupdateDetails = new ArrayList<>();
+      for (SegmentUpdateDetails updateDetail : updateDetails) {
+        if (updateDetail.getDeltaFileStamps() != null) {
+          if (updateDetail.getDeltaFileStamps().contains(updateVersion)) {
+            HashSet<String> set = new HashSet<>();
+            set.add(updateVersion);
+            updateDetail.setDeltaFileStamps(set);
+            newupdateDetails.add(updateDetail);
+          }
+        } else if 
(updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) {
+          newupdateDetails.add(updateDetail);
+        }
+      }
+      updateDetails = newupdateDetails.toArray(new SegmentUpdateDetails[0]);
+    }
+  }
+
+  /**
    * populate the block and its details in a map.
    */
   private void populateMap() {
@@ -640,21 +667,30 @@ public class SegmentUpdateStatusManager {
    * @return
    */
   public SegmentUpdateDetails[] readLoadMetadata() {
+    // get the updated status file identifier from the table status.
+    String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
+    return readLoadMetadata(tableUpdateStatusIdentifier, 
identifier.getTablePath());
+  }
+
+  /**
+   * This method loads segment update details
+   *
+   * @return
+   */
+  public static SegmentUpdateDetails[] readLoadMetadata(String 
tableUpdateStatusIdentifier,
+      String tablePath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
     SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;
 
-    // get the updated status file identifier from the table status.
-    String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
-
     if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) {
       return new SegmentUpdateDetails[0];
     }
 
     String tableUpdateStatusPath =
-        CarbonTablePath.getMetadataPath(identifier.getTablePath()) +
+        CarbonTablePath.getMetadataPath(tablePath) +
             CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier;
     AtomicFileOperations fileOperation =
         
AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath);
@@ -735,7 +771,7 @@ public class SegmentUpdateStatusManager {
    *
    * @param streams - streams to close.
    */
-  private void closeStreams(Closeable... streams) {
+  private static void closeStreams(Closeable... streams) {
     // Added if to avoid NullPointerException in case one stream is being 
passed as null
     if (null != streams) {
       for (Closeable stream : streams) {
diff --git a/dev/javastyle-config.xml b/dev/javastyle-config.xml
index 824cbc8..332c85a 100644
--- a/dev/javastyle-config.xml
+++ b/dev/javastyle-config.xml
@@ -175,9 +175,6 @@
             <message key="import.redundancy" value="Redundant import {0}."/>
         </module>
 
-        <!-- Checks for star import. -->
-        <module name="AvoidStarImport"/>
-
         <!-- Checks for placement of the left curly brace ('{'). -->
         <module name="LeftCurly"/>
 
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 10aabf2..ae8db43 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -120,6 +120,7 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
   private static final String FGDATAMAP_PRUNING = 
"mapreduce.input.carboninputformat.fgdatamap";
   private static final String READ_COMMITTED_SCOPE =
       "mapreduce.input.carboninputformat.read.committed.scope";
+  private static final String READ_ONLY_DELTA = "readDeltaOnly";
 
   // record segment number and hit blocks
   protected int numSegments = 0;
@@ -688,11 +689,16 @@ m filterExpression
     if (dataMapFilter != null) {
       checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
     }
-    return new QueryModelBuilder(carbonTable)
+    QueryModel queryModel = new QueryModelBuilder(carbonTable)
         .projectColumns(projectColumns)
         .filterExpression(dataMapFilter)
         .dataConverter(getDataTypeConverter(configuration))
         .build();
+    String readDeltaOnly = configuration.get(READ_ONLY_DELTA);
+    if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) {
+      queryModel.setReadOnlyDelta(true);
+    }
+    return queryModel;
   }
 
   /**
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 5468c24..c47cdd6 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -85,6 +85,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       "mapreduce.input.carboninputformat.transactional";
   public static final String DATABASE_NAME = 
"mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = 
"mapreduce.input.carboninputformat.tableName";
+  public static final String UPDATE_DELTA_VERSION = "updateDeltaVersion";
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
   private ReadCommittedScope readCommittedScope;
@@ -107,9 +108,15 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     }
     this.readCommittedScope = getReadCommitted(job, identifier);
     LoadMetadataDetails[] loadMetadataDetails = 
readCommittedScope.getSegmentList();
-
-    SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
+    String updateDeltaVersion = 
job.getConfiguration().get(UPDATE_DELTA_VERSION);
+    SegmentUpdateStatusManager updateStatusManager;
+    if (updateDeltaVersion != null) {
+      updateStatusManager =
+          new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails, 
updateDeltaVersion);
+    } else {
+      updateStatusManager =
+          new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
+    }
     List<String> invalidSegmentIds = new ArrayList<>();
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
new file mode 100644
index 0000000..f91bce0
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.merge
+
+import scala.collection.JavaConverters._
+import java.sql.Date
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for join query with orderby and limit
+ */
+
+class MergeTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+
+  }
+
+  def generateData(numOrders: Int = 10): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
+      }.toDF("id", "name", "c_name", "quantity", "price", "state")
+  }
+
+  def generateFullCDC(
+      numOrders: Int,
+      numUpdatedOrders: Int,
+      newState: Int,
+      oldState: Int,
+      numNewOrders: Int
+  ): DataFrame = {
+    import sqlContext.implicits._
+    val ds1 = sqlContext.sparkContext.parallelize(numNewOrders+1 to 
(numOrders), 4)
+      .map {x =>
+        if (x <= numNewOrders + numUpdatedOrders) {
+          ("id"+x, s"order$x",s"customer$x", x*10, x*75, newState)
+        } else {
+          ("id"+x, s"order$x",s"customer$x", x*10, x*75, oldState)
+        }
+      }.toDF("id", "name", "c_name", "quantity", "price", "state")
+    val ds2 = sqlContext.sparkContext.parallelize(1 to numNewOrders, 4)
+      .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, oldState)
+      }.toDS().toDF()
+    ds1.union(ds2)
+  }
+
+  private def initialize = {
+    val initframe = generateData(10)
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", 
"order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+    (dwSelframe, odsframe)
+  }
+
+  test("test basic merge update with all mappings") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    val updateMap = Map("id" -> "A.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge update with few mappings") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, "A.id=B.id").whenMatched("A.state <> 
B.state").updateExpr(updateMap).execute()
+
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge update with few mappings and expressions") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    val updateMap = Map("id" -> "A.id",
+      "price" -> "B.price * 100",
+      "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+      col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+
+    checkAnswer(sql("select price from order where where state = 2"), 
Seq(Row(22500), Row(30000)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test basic merge update with few mappings with out condition") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, 
col("A.id").equalTo(col("B.id"))).whenMatched().updateExpr(updateMap).execute()
+
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test merge insert with condition") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      "c_name" -> col("B.c_name"),
+      col("quantity") -> "B.quantity",
+      col("price") -> col("B.price"),
+      col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).
+      whenNotMatched(col("A.id").isNull.and(col("B.id").isNotNull)).
+      insertExpr(insertMap).execute()
+
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+  }
+
+  test("test merge update and insert with out condition") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> col("B.price"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+
+    val st = System.currentTimeMillis()
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test merge update and insert with condition") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> col("B.price"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= 
Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap)))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test merge update and insert with condition and expression") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= 
Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap)))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
+  }
+
+  test("test merge with only delete action") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
+  }
+
+  test("test merge update and delete action") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+  }
+
+  test("test merge update and insert with condition and expression and delete 
action") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+    matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
+  }
+
+  test("test merge update with insert, insert with condition and expression 
and delete with insert action") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> "B.price + 1",
+      col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+    val insertMap_u = Map(col("id") -> col("A.id"),
+      col("name") -> col("A.name"),
+      col("c_name") -> lit("insert"),
+      col("quantity") -> col("A.quantity"),
+      col("price") -> expr("A.price"),
+      col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]]
+
+    val insertMap_d = Map(col("id") -> col("A.id"),
+      col("name") -> col("A.name"),
+      col("c_name") -> lit("delete"),
+      col("quantity") -> col("A.quantity"),
+      col("price") -> expr("A.price"),
+      col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]]
+
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).
+      whenMatched(col("A.state") =!= col("B.state")).
+      updateExpr(updateMap).insertExpr(insertMap_u).
+      whenNotMatched().
+      insertExpr(insertMap).
+      whenNotMatchedAndExistsOnlyOnTarget().
+      delete().
+      insertExpr(insertMap_d).
+      execute()
+    sql("select * from order").show()
+    checkAnswer(sql("select count(*) from order where c_name = 'delete'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order where c_name = 'insert'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(14)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
+  }
+
+  test("test merge update with insert, insert with condition and expression 
and delete with insert history action") {
+    sql("drop table if exists order")
+    sql("drop table if exists order_hist")
+    sql("create table order_hist(id string, name string, c_name string, 
quantity int, price int, state int) stored as carbondata")
+    val (dwSelframe, odsframe) = initialize
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    val insertMap_u = Map(col("id") -> col("id"),
+      col("name") -> col("name"),
+      col("c_name") -> lit("insert"),
+      col("quantity") -> col("quantity"),
+      col("price") -> expr("price"),
+      col("state") -> col("state"))
+
+    val insertMap_d = Map(col("id") -> col("id"),
+      col("name") -> col("name"),
+      col("c_name") -> lit("delete"),
+      col("quantity") -> col("quantity"),
+      col("price") -> expr("price"),
+      col("state") -> col("state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
 TableIdentifier("order_hist"))))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+    matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
 TableIdentifier("order_hist"))))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), 
matches.toList)).run(sqlContext.sparkSession)
+    checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), 
Seq(Row(7500)))
+    checkAnswer(sql("select count(*) from order_hist where c_name = 
'delete'"), Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order_hist where c_name = 
'insert'"), Seq(Row(2)))
+  }
+
+  test("check the scd ") {
+    sql("drop table if exists customers")
+
+    val initframe =
+    sqlContext.sparkSession.createDataFrame(Seq(
+      Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
+      Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null),
+      Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null),
+      Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
+    ).asJava, StructType(Seq(StructField("customerId", IntegerType), 
StructField("address", StringType), StructField("current", BooleanType), 
StructField("effectiveDate", DateType), StructField("endDate", DateType))))
+    initframe.printSchema()
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "customers")
+      .mode(SaveMode.Overwrite)
+      .save()
+    var customer = sqlContext.read.format("carbondata").option("tableName", 
"customers").load()
+    customer = customer.as("A")
+    var updates =
+    sqlContext.sparkSession.createDataFrame(Seq(
+      Row(1, "new address for 1", Date.valueOf("2018-03-03")),
+      Row(3, "current address for 3", Date.valueOf("2018-04-04")),    // new 
address same as current address for customer 3
+      Row(4, "new address for 4", Date.valueOf("2018-04-04"))
+    ).asJava, StructType(Seq(StructField("customerId", IntegerType), 
StructField("address", StringType), StructField("effectiveDate", DateType))))
+    updates = updates.as("B")
+
+    val updateMap = Map(col("current") -> lit(false),
+      col("endDate") -> col("B.effectiveDate")).asInstanceOf[Map[Any, Any]]
+
+    val insertMap = Map(col("customerId") -> col("B.customerId"),
+      col("address") -> col("B.address"),
+      col("current") -> lit(true),
+      col("effectiveDate") -> col("B.effectiveDate"),
+      col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]]
+
+    val insertMap_u = Map(col("customerId") -> col("B.customerId"),
+      col("address") -> col("B.address"),
+      col("current") -> lit(true),
+      col("effectiveDate") -> col("B.effectiveDate"),
+      col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]]
+
+    customer.merge(updates, "A.customerId=B.customerId").
+      whenMatched((col("A.address") =!= 
col("B.address")).and(col("A.current").equalTo(lit(true)))).
+      updateExpr(updateMap).
+      insertExpr(insertMap_u).
+      
whenNotMatched(col("A.customerId").isNull.and(col("B.customerId").isNotNull)).
+      insertExpr(insertMap).
+      execute()
+
+    checkAnswer(sql("select count(*) from customers"), Seq(Row(6)))
+    checkAnswer(sql("select count(*) from customers where current='true'"), 
Seq(Row(4)))
+    checkAnswer(sql("select count(*) from customers where effectivedate is not 
null and enddate is not null"), Seq(Row(1)))
+
+  }
+
+  test("check the ccd ") {
+    sql("drop table if exists target")
+
+    val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), 
StructField("value", StringType))))
+
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = sqlContext.read.format("carbondata").option("tableName", 
"target").load()
+    var ccd =
+      sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", "10", false,  0),
+        Row("a", null, true, 1),   // a was updated and then deleted
+        Row("b", null, true, 2),   // b was just deleted once
+        Row("c", null, true, 3),   // c was deleted and then updated twice
+        Row("c", "20", false, 4),
+        Row("c", "200", false, 5),
+        Row("e", "100", false, 6)  // new key
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("newValue", StringType),
+          StructField("deleted", BooleanType), StructField("time", 
IntegerType))))
+    ccd.createOrReplaceTempView("changes")
+
+    ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as 
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM 
changes GROUP BY key)")
+
+    val updateMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+    val insertMap = Map("key" -> "B.key", "value" -> 
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+    target.as("A").merge(ccd.as("B"), "A.key=B.key").
+      whenMatched("B.deleted=false").
+      updateExpr(updateMap).
+      whenNotMatched("B.deleted=false").
+      insertExpr(insertMap).
+      whenMatched("B.deleted=true").
+      delete().execute()
+    checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
+    checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), 
Row("d", "3"), Row("e", "100")))
+  }
+
+  override def afterAll {
+    sql("drop table if exists order")
+  }
+}
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
new file mode 100644
index 0000000..ea32bdf
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Partition
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datamap.DataMapFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.util.DataTypeConverter
+import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.spark.InitInputMetrics
+
+/**
+ * It can get the deleted/updated records on any particular update version. It 
is useful to get the
+ * records changed on any particular update transaction.
+ */
+class CarbonDeltaRowScanRDD[T: ClassTag](
+    @transient private val spark: SparkSession,
+    @transient private val serializedTableInfo: Array[Byte],
+    @transient private val tableInfo: TableInfo,
+    @transient override val partitionNames: Seq[PartitionSpec],
+    override val columnProjection: CarbonProjection,
+    var filter: DataMapFilter,
+    identifier: AbsoluteTableIdentifier,
+    inputMetricsStats: InitInputMetrics,
+    override val dataTypeConverterClz: Class[_ <: DataTypeConverter] =
+    classOf[SparkDataTypeConverterImpl],
+    override val readSupportClz: Class[_ <: CarbonReadSupport[_]] =
+    SparkReadSupport.readSupportClass,
+    deltaVersionToRead: String) extends
+  CarbonScanRDD[T](
+    spark,
+    columnProjection,
+    filter,
+    identifier,
+    serializedTableInfo,
+    tableInfo,
+    inputMetricsStats,
+    partitionNames,
+    dataTypeConverterClz,
+    readSupportClz) {
+  override def internalGetPartitions: Array[Partition] = {
+    val table = CarbonTable.buildFromTableInfo(getTableInfo)
+    val updateStatusManager = new SegmentUpdateStatusManager(table, 
deltaVersionToRead)
+
+    val parts = super.internalGetPartitions
+    parts.map { p =>
+      val partition = p.asInstanceOf[CarbonSparkPartition]
+      val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s =>
+        updateStatusManager.getDetailsForABlock(
+          CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId, 
s.getBlockPath)) != null
+      }.asJava
+      new CarbonSparkPartition(partition.rddId, partition.index,
+        new CarbonMultiBlockSplit(splits, 
partition.multiBlockSplit.getLocations))
+    }.filter(p => p.multiBlockSplit.getAllSplits.size() > 
0).asInstanceOf[Array[Partition]]
+  }
+
+  override def createInputFormat(conf: Configuration): 
CarbonTableInputFormat[Object] = {
+    val format = super.createInputFormat(conf)
+    conf.set("updateDeltaVersion", deltaVersionToRead)
+    conf.set("readDeltaOnly", "true")
+    format
+  }
+}
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a6dfc8a..5d75742 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -638,7 +638,7 @@ class CarbonScanRDD[T: ClassTag](
   }
 
 
-  private def createInputFormat(conf: Configuration): 
CarbonTableInputFormat[Object] = {
+  def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = 
{
     val format = new CarbonTableInputFormat[Object]
     CarbonInputFormat.setTablePath(conf,
       identifier.appendWithLocalPrefix(identifier.getTablePath))
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index ca35adc..62e52d0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -206,7 +206,7 @@ object DistributionUtil {
    * @param sparkContext
    * @return
    */
-  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+  def getConfiguredExecutors(sparkContext: SparkContext): Int = {
     var confExecutors: Int = 0
     if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
       // default value for spark.dynamicAllocation.maxExecutors is infinity
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
new file mode 100644
index 0000000..a76c79d
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.readsupport;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+
+public class SparkGenericRowReadSupportImpl extends 
DictionaryDecodeReadSupport<Row> {
+
+  @Override
+  public void initialize(CarbonColumn[] carbonColumns,
+      CarbonTable carbonTable) throws IOException {
+    super.initialize(carbonColumns, carbonTable);
+  }
+
+  @Override
+  public Row readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+      if (dataTypes[i] == DataTypes.DATE) {
+        Calendar c = Calendar.getInstance();
+        c.setTime(new Date(0));
+        c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+        data[i] = new Date(c.getTime().getTime());
+      } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
+        data[i] = new Timestamp((long) data[i] / 1000);
+      }
+    }
+    return new GenericRow(data);
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 233ad4d..bf5bc40 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hdfs.DFSClient.Conf
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.spark.Partition
 import org.apache.spark.sql.SparkSession
@@ -33,10 +32,9 @@ import 
org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import 
org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, 
TableStatusReadCommittedScope}
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 
 object DistributedRDDUtils {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index deefcd1..063eaf5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.QueryExecution
+import 
org.apache.spark.sql.execution.command.mutation.merge.MergeDataSetBuilder
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}
@@ -283,6 +284,16 @@ object CarbonSession {
     }
   }
 
+  implicit class DataSetMerge(val ds: Dataset[Row]) {
+    def merge(srcDS: Dataset[Row], expr: String): MergeDataSetBuilder = {
+      new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
+    }
+
+    def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
+      new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
+    }
+  }
+
   def threadSet(key: String, value: String): Unit = {
     var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (currentThreadSessionInfo == null) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index ecb74dc..d744e96 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
-import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.mutate.data.{BlockMappingVO, 
RowCountDetailsVO}
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
ThreadLocalSessionInfo}
@@ -55,35 +55,71 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
 object DeleteExecution {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
+  def deleteDeltaExecution(
+      databaseNameOp: Option[String],
+      tableName: String,
+      sparkSession: SparkSession,
+      dataRdd: RDD[Row],
+      timestamp: String,
+      isUpdateOperation: Boolean,
+      executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
+
+    val (res, blockMappingVO) = deleteDeltaExecutionInternal(databaseNameOp,
+      tableName, sparkSession, dataRdd, timestamp, isUpdateOperation, 
executorErrors)
+    var segmentsTobeDeleted = Seq.empty[Segment]
+    var operatedRowCount = 0L
+    // if no loads are present then no need to do anything.
+    if (res.flatten.isEmpty) {
+      return (segmentsTobeDeleted, operatedRowCount)
+    }
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+    // update new status file
+    segmentsTobeDeleted =
+      checkAndUpdateStatusFiles(executorErrors,
+        res, carbonTable, timestamp,
+        blockMappingVO, isUpdateOperation)
+
+    if (executorErrors.failureCauses == FailureCauses.NONE) {
+      operatedRowCount = res.flatten.map(_._2._3).sum
+    }
+    (segmentsTobeDeleted, operatedRowCount)
+  }
+
   /**
    * generate the delete delta files in each segment as per the RDD.
    * @return it gives the segments which needs to be deleted.
    */
-  def deleteDeltaExecution(
+  def deleteDeltaExecutionInternal(
       databaseNameOp: Option[String],
       tableName: String,
       sparkSession: SparkSession,
       dataRdd: RDD[Row],
       timestamp: String,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
+      executorErrors: ExecutionErrors,
+      tupleId: Option[Int] = None):
+  (Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))]], BlockMappingVO) = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long))]] = null
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val tablePath = absoluteTableIdentifier.getTablePath
-    var segmentsTobeDeleted = Seq.empty[Segment]
-    var operatedRowCount = 0L
 
     val deleteRdd = if (isUpdateOperation) {
       val schema =
         
org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
           CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
           org.apache.spark.sql.types.StringType)))
-      val rdd = dataRdd
-        .map(row => Row(row.get(row.fieldIndex(
-          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+      val rdd = tupleId match {
+        case Some(id) =>
+          dataRdd
+            .map(row => Row(row.get(id)))
+        case _ =>
+          dataRdd
+            .map(row => Row(row.get(row.fieldIndex(
+              CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+      }
       sparkSession.createDataFrame(rdd, schema).rdd
     } else {
       dataRdd
@@ -91,16 +127,26 @@ object DeleteExecution {
 
     val (carbonInputFormat, job) = 
createCarbonInputFormat(absoluteTableIdentifier)
     CarbonInputFormat.setTableInfo(job.getConfiguration, 
carbonTable.getTableInfo)
-    val keyRdd = deleteRdd.map({ row =>
-      val tupleId: String = row
-        
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-      val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
-      (key, row)
-    }).groupByKey()
+    val keyRdd = tupleId match {
+      case Some(id) =>
+        deleteRdd.map { row =>
+          val tupleId: String = row.getString(id)
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          (key, row)
+        }.groupByKey()
+      case _ =>
+        deleteRdd.map { row =>
+          val tupleId: String = row
+            
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          (key, row)
+        }.groupByKey()
+    }
 
     // if no loads are present then no need to do anything.
     if (keyRdd.partitions.length == 0) {
-      return (segmentsTobeDeleted, operatedRowCount)
+      return (Array.empty[List[(SegmentStatus,
+        (SegmentUpdateDetails, ExecutionErrors, Long))]], null)
     }
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
@@ -144,76 +190,6 @@ object DeleteExecution {
           result
         }).collect()
 
-    // if no loads are present then no need to do anything.
-    if (res.flatten.isEmpty) {
-      return (segmentsTobeDeleted, operatedRowCount)
-    }
-
-    // update new status file
-    checkAndUpdateStatusFiles()
-
-    // all or none : update status file, only if complete delete opeartion is 
successfull.
-    def checkAndUpdateStatusFiles(): Unit = {
-      val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
-      val segmentDetails = new util.HashSet[Segment]()
-      res.foreach(resultOfSeg => resultOfSeg.foreach(
-        resultOfBlock => {
-          if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
-            blockUpdateDetailsList.add(resultOfBlock._2._1)
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
-            // if this block is invalid then decrement block count in map.
-            if 
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
-              CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
-                blockMappingVO.getSegmentNumberOfBlockMapping)
-            }
-          } else {
-            // In case of failure , clean all related delete delta files
-            CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-            val errorMsg =
-              "Delete data operation is failed due to failure in creating 
delete delta file for " +
-              "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
-              resultOfBlock._2._1.getBlockName
-            executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-            executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-
-            if (executorErrors.failureCauses == FailureCauses.NONE) {
-              executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-              executorErrors.errorMsg = errorMsg
-            }
-            LOGGER.error(errorMsg)
-            return
-          }
-        }
-      )
-      )
-
-      val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
-        
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
-
-      segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
-
-      // this is delete flow so no need of putting timestamp in the status 
file.
-      if (CarbonUpdateUtil
-            .updateSegmentStatus(blockUpdateDetailsList, carbonTable, 
timestamp, false) &&
-          CarbonUpdateUtil
-            .updateTableMetadataStatus(segmentDetails,
-              carbonTable,
-              timestamp,
-              !isUpdateOperation,
-              listOfSegmentToBeMarkedDeleted)
-      ) {
-        LOGGER.info(s"Delete data operation is successful for ${ database }.${ 
tableName }")
-      } else {
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-        val errorMessage = "Delete data operation is failed due to failure " +
-                           "in table status updation."
-        LOGGER.error("Delete data operation is failed due to failure in table 
status updation.")
-        executorErrors.failureCauses = 
FailureCauses.STATUS_FILE_UPDATION_FAILURE
-        executorErrors.errorMsg = errorMessage
-      }
-    }
-
     def deleteDeltaFunc(index: Int,
         key: String,
         iter: Iterator[Row],
@@ -322,10 +298,118 @@ object DeleteExecution {
       resultIter
     }
 
-    if (executorErrors.failureCauses == FailureCauses.NONE) {
-       operatedRowCount = res.flatten.map(_._2._3).sum
+    (res, blockMappingVO)
+  }
+
+  // all or none : update status file, only if complete delete opeartion is 
successfull.
+  def checkAndUpdateStatusFiles(
+      executorErrors: ExecutionErrors,
+      res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))]],
+      carbonTable: CarbonTable,
+      timestamp: String,
+      blockMappingVO: BlockMappingVO,
+      isUpdateOperation: Boolean): Seq[Segment] = {
+    val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+    val segmentDetails = new util.HashSet[Segment]()
+    res.foreach(resultOfSeg => resultOfSeg.foreach(
+      resultOfBlock => {
+        if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
+          blockUpdateDetailsList.add(resultOfBlock._2._1)
+          segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
+          // if this block is invalid then decrement block count in map.
+          if 
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
+            CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+              blockMappingVO.getSegmentNumberOfBlockMapping)
+          }
+        } else {
+          // In case of failure , clean all related delete delta files
+          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+          val errorMsg =
+            "Delete data operation is failed due to failure in creating delete 
delta file for " +
+            "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+            resultOfBlock._2._1.getBlockName
+          executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+          executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+          if (executorErrors.failureCauses == FailureCauses.NONE) {
+            executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+            executorErrors.errorMsg = errorMsg
+          }
+          LOGGER.error(errorMsg)
+          return Seq.empty[Segment]
+        }
+      }))
+
+    val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+      
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+    val segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
+
+    // this is delete flow so no need of putting timestamp in the status file.
+    if (CarbonUpdateUtil
+          .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, 
false) &&
+        CarbonUpdateUtil
+          .updateTableMetadataStatus(segmentDetails,
+            carbonTable,
+            timestamp,
+            !isUpdateOperation,
+            listOfSegmentToBeMarkedDeleted)
+    ) {
+      LOGGER.info(s"Delete data operation is successful for " +
+                  s"${ carbonTable.getDatabaseName }.${ 
carbonTable.getTableName }")
+    } else {
+      // In case of failure , clean all related delete delta files
+      CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+      val errorMessage = "Delete data operation is failed due to failure " +
+                         "in table status updation."
+      LOGGER.error("Delete data operation is failed due to failure in table 
status updation.")
+      executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
+      executorErrors.errorMsg = errorMessage
     }
-    (segmentsTobeDeleted, operatedRowCount)
+    segmentsTobeDeleted
+  }
+
+  // all or none : update status file, only if complete delete opeartion is 
successfull.
+  def processSegments(executorErrors: ExecutionErrors,
+      res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))]],
+      carbonTable: CarbonTable,
+      timestamp: String,
+      blockMappingVO: BlockMappingVO): (util.List[SegmentUpdateDetails], 
Seq[Segment]) = {
+    val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+    val segmentDetails = new util.HashSet[Segment]()
+    res.foreach(resultOfSeg => resultOfSeg.foreach(
+      resultOfBlock => {
+        if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
+          blockUpdateDetailsList.add(resultOfBlock._2._1)
+          segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
+          // if this block is invalid then decrement block count in map.
+          if 
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
+            CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+              blockMappingVO.getSegmentNumberOfBlockMapping)
+          }
+        } else {
+          // In case of failure , clean all related delete delta files
+          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+          val errorMsg =
+            "Delete data operation is failed due to failure in creating delete 
delta file for " +
+            "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+            resultOfBlock._2._1.getBlockName
+          executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+          executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+          if (executorErrors.failureCauses == FailureCauses.NONE) {
+            executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+            executorErrors.errorMsg = errorMsg
+          }
+          LOGGER.error(errorMsg)
+          return (blockUpdateDetailsList, Seq.empty[Segment])
+        }
+      }))
+
+    val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+      
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+    (blockUpdateDetailsList, listOfSegmentToBeMarkedDeleted.asScala)
   }
 
   /**
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
new file mode 100644
index 0000000..3c0acc6
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.sql.{AnalysisException, 
CarbonDatasourceHadoopRelation, Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GenericRowWithSchema}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, 
LongAccumulator}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+/**
+ * This command will merge the data of source dataset to target dataset backed 
by carbon table.
+ * @param targetDsOri Target dataset to merge the data. This dataset should be 
backed by carbontable
+ * @param srcDS  Source dataset, it can be any data.
+ * @param mergeMatches It contains the join condition and list match 
conditions to apply.
+ */
+case class CarbonMergeDataSetCommand(
+    targetDsOri: Dataset[Row],
+    srcDS: Dataset[Row],
+    var mergeMatches: MergeDataSetMatches)
+  extends DataCommand {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * It merge the data of source dataset to target dataset backed by carbon 
table. Basically it
+   * makes the full outer join with both source and target and apply the given 
conditions as "case
+   * when" to get the status to process the row. The status could be 
insert/update/delete.
+   * It also can insert the history(update/delete) data to history table.
+   *
+   */
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val rltn = collectCarbonRelation(targetDsOri.logicalPlan)
+    // Target dataset must be backed by carbondata table.
+    if (rltn.length != 1) {
+      throw new UnsupportedOperationException(
+        "Carbon table supposed to be present in merge dataset")
+    }
+    // validate the merge matches and actions.
+    validateMergeActions(mergeMatches, targetDsOri, sparkSession)
+    val carbonTable = rltn.head.carbonRelation.carbonTable
+    val hasDelAction = mergeMatches.matchList
+      .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
+    val hasUpdateAction = mergeMatches.matchList
+      .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
+    val (insertHistOfUpdate, insertHistOfDelete) = 
getInsertHistoryStatus(mergeMatches)
+    // Get all the required columns of targetDS by going through all match 
conditions and actions.
+    val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, 
sparkSession)
+    // select only the required columns, it can avoid lot of and shuffling.
+    val targetDs = targetDsOri.select(columns: _*)
+    // Update the update mapping with unfilled columns.From here on system 
assumes all mappings
+    // are existed.
+    mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
+    // Lets generate all conditions combinations as one column and add them as 
'status'.
+    val condition = generateStatusColumnWithAllCombinations(mergeMatches)
+
+    // Add the tupleid udf to get the tupleid to generate delete delta.
+    val frame = 
targetDs.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+      expr("getTupleId()")).withColumn("exist_on_target", lit(1)).join(
+      srcDS.withColumn("exist_on_src", lit(1)),
+      // Do the full outer join to get the data from both sides without 
missing anything.
+      // TODO As per the match conditions choose the join, sometimes it might 
be possible to use
+      // left_outer join.
+      mergeMatches.joinExpr, "full_outer").withColumn("status", condition)
+    if (LOGGER.isDebugEnabled) {
+      frame.explain()
+    }
+    val tableCols =
+      
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).
+        
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val builder = new CarbonLoadModelBuilder(carbonTable)
+    val options = Seq(("fileheader", tableCols.mkString(","))).toMap
+    val model = builder.build(options.asJava, 
CarbonUpdateUtil.readCurrentTime, "1")
+    model.setLoadWithoutConverterStep(true)
+    val newLoadMetaEntry = new LoadMetadataDetails
+    CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
+      SegmentStatus.INSERT_IN_PROGRESS,
+      model.getFactTimeStamp,
+      false)
+    CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, 
false)
+
+    model.setCsvHeader(tableCols.mkString(","))
+
+    val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { 
m =>
+      m.getActions.map {
+        case u: UpdateAction => MergeProjection(tableCols, frame, rltn.head, 
sparkSession, u)
+        case i: InsertAction => MergeProjection(tableCols, frame, rltn.head, 
sparkSession, i)
+        case d: DeleteAction => MergeProjection(tableCols, frame, rltn.head, 
sparkSession, d)
+        case _ => null
+      }.filter(_ != null)
+    }
+
+    val st = System.currentTimeMillis()
+    // Create accumulators to log the stats
+    val stats = Stats(createLongAccumalator("insertedRows"),
+      createLongAccumalator("updatedRows"),
+      createLongAccumalator("deletedRows"))
+    val processedRDD = processIUD(sparkSession, frame, carbonTable, model, 
projections, stats)
+
+    val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
+    val trxMgr = TranxManager(model.getFactTimeStamp)
+
+    val mutationAction = MutationActionFactory.getMutationAction(sparkSession,
+      carbonTable, hasDelAction, hasUpdateAction,
+      insertHistOfUpdate, insertHistOfDelete)
+
+    val tuple = mutationAction.handleAction(processedRDD, executorErrors, 
trxMgr)
+
+    // In case user only has insert action.
+    if (!(hasDelAction || hasUpdateAction)) {
+      processedRDD.count()
+    }
+    LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
+    LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
+    LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
+    LOGGER.info(
+      " Time taken to merge data  : " + tuple + " :: " + 
(System.currentTimeMillis() - st))
+
+    val segment = new Segment(model.getSegmentId,
+      SegmentFileStore.genSegmentFileName(
+        model.getSegmentId,
+        System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
+      CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
+        model.getSegmentId), Map.empty[String, String].asJava)
+    val writeSegment =
+      SegmentFileStore.writeSegmentFile(carbonTable, segment)
+
+    if (writeSegment) {
+      SegmentFileStore.updateTableStatusFile(
+        carbonTable,
+        model.getSegmentId,
+        segment.getSegmentFileName,
+        carbonTable.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName),
+        SegmentStatus.SUCCESS)
+    } else {
+      CarbonLoaderUtil.updateTableStatusForFailure(model)
+    }
+
+    if (hasDelAction || hasUpdateAction) {
+      if (CarbonUpdateUtil.updateSegmentStatus(tuple._1, carbonTable,
+        trxMgr.getLatestTrx.toString, false) &&
+          CarbonUpdateUtil
+            .updateTableMetadataStatus(
+              model.getLoadMetadataDetails.asScala.map(l =>
+                new Segment(l.getMergedLoadName,
+                  l.getSegmentFile)).toSet.asJava,
+              carbonTable,
+              trxMgr.getLatestTrx.toString,
+              true,
+              tuple._2.asJava)) {
+        LOGGER.info(s"Merge data operation is successful for " +
+                    s"${ carbonTable.getDatabaseName }.${ 
carbonTable.getTableName }")
+      } else {
+        throw new CarbonMergeDataSetException("Saving update status or table 
status failed")
+      }
+    }
+    // Load the history table if the inserthistorytable action is added by 
user.
+    HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head, 
carbonTable,
+      trxMgr, mutationAction, mergeMatches)
+    Seq.empty
+  }
+
+  /**
+   * As per the status of the row either it inserts the data or update/delete 
the data.
+   */
+  private def processIUD(sparkSession: SparkSession,
+      frame: DataFrame,
+      carbonTable: CarbonTable,
+      model: CarbonLoadModel,
+      projections: Seq[Seq[MergeProjection]],
+      stats: Stats) = {
+    val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, 
conf)
+    val frameCols = frame.queryExecution.analyzed.output
+    val status = frameCols.length - 1
+    val tupleId = frameCols.zipWithIndex
+      
.find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val deletedRows = stats.deletedRows
+    
frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
+      mapPartitionsWithIndex { case (index, iter) =>
+        val confB = config.value.value
+        CarbonTableOutputFormat.setCarbonTable(confB, carbonTable)
+        model.setTaskNo(index.toString)
+        CarbonTableOutputFormat.setLoadModel(confB, model)
+        val jobId = new JobID(UUID.randomUUID.toString, 0)
+        val task = new TaskID(jobId, TaskType.MAP, index)
+        val attemptID = new TaskAttemptID(task, index)
+        val context = new TaskAttemptContextImpl(confB, attemptID)
+        val writer = new CarbonTableOutputFormat().getRecordWriter(context)
+        val writable = new ObjectArrayWritable
+        val projLen = projections.length
+        val schema =
+          org.apache.spark.sql.types.StructType(Seq(
+            StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, 
StringType),
+            StructField("status", IntegerType)))
+        new Iterator[Row] {
+          override def hasNext: Boolean = {
+            if (iter.hasNext) {
+              true
+            } else {
+              writer.close(context)
+              false
+            }
+          }
+
+          override def next(): Row = {
+            val row = iter.next()
+            val rowWithSchema = row.asInstanceOf[GenericRowWithSchema]
+            val is = row.get(status)
+            var isUpdate = false
+            var isDelete = false
+            var insertedCount = 0
+            if (is != null) {
+              val isInt = is.asInstanceOf[Int]
+              var i = 0;
+              while (i < projLen) {
+                if ((isInt & (1 << i)) == (1 << i)) {
+                  projections(i).foreach { p =>
+                    if (!p.isDelete) {
+                      if (p.isUpdate) {
+                        isUpdate = p.isUpdate
+                      }
+                      writable.set(p(rowWithSchema))
+                      writer.write(NullWritable.get(), writable)
+                      insertedCount += 1
+                    } else {
+                      isDelete = true
+                    }
+                  }
+                }
+                i = i + 1
+              }
+            }
+            val newArray = new Array[Any](2)
+            newArray(0) = row.getString(tupleId)
+            if (isUpdate && isDelete) {
+              newArray(1) = 102
+              updatedRows.add(1)
+              deletedRows.add(1)
+              insertedCount -= 1
+            } else if (isUpdate) {
+              updatedRows.add(1)
+              newArray(1) = 101
+              insertedCount -= 1
+            } else if (isDelete) {
+              newArray(1) = 100
+              deletedRows.add(1)
+            } else {
+              newArray(1) = is
+            }
+            insertedRows.add(insertedCount)
+            new GenericRowWithSchema(newArray, schema)
+          }
+        }
+      }.cache()
+  }
+
+  private def createLongAccumalator(name: String) = {
+    val acc = new LongAccumulator
+    acc.setValue(0)
+    acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), 
false)
+    AccumulatorContext.register(acc)
+    acc
+  }
+
+  /**
+   * It generates conditions for all possible scenarios and add a integer 
number for each match.
+   * There could be scenarios that one row can match two conditions so it 
should apply the actions
+   * of both the matches to the row.
+   *  For example :
+   *    whenmathed(a=c1)
+   *    update()
+   *    whennotmatched(b=d1)
+   *    insert()
+   *    whennotmatched(b=d2)
+   *    insert()
+   *
+   *  The above merge statement will be converted to
+   *    (case when a=c1 and b=d1 and b=d2 then 7
+   *         when a=c1 and b=d1 then 6
+   *         when a=c1 and b=d2 then 5
+   *         when a=c1 then 4
+   *         when b=d1 and b=d2 then 3
+   *         when b=d1 then 2
+   *         when b=d2 the 1) as status
+   *
+   *   So it would not be recommended use so many merge conditions as it 
increase the case when
+   *   statements exponentially.
+   *
+   * @param mergeMatches
+   * @return
+   */
+  def generateStatusColumnWithAllCombinations(mergeMatches: 
MergeDataSetMatches): Column = {
+    var exprList = new ArrayBuffer[(Column, Int)]()
+    val matchList = mergeMatches.matchList
+    val len = matchList.length
+    val N = Math.pow(2d, len.toDouble).toInt
+    var i = 1
+    while (i < N) {
+      var status = 0
+      var column: Column = null
+      val code = Integer.toBinaryString(N | i).substring(1)
+      var j = 0
+      while (j < len) {
+        if (code.charAt(j) == '1') {
+          val mergeMatch = matchList(j)
+          if (column == null) {
+            if (mergeMatch.getExp.isDefined) {
+              column = mergeMatch.getExp.get
+            }
+          } else {
+            if (mergeMatch.getExp.isDefined) {
+              column = column.and(mergeMatch.getExp.get)
+            }
+          }
+          mergeMatch match {
+            case wm: WhenMatched =>
+              val existsOnBoth = col("exist_on_target").isNotNull.and(
+                col("exist_on_src").isNotNull)
+              column = if (column == null) {
+                existsOnBoth
+              } else {
+                column.and(existsOnBoth)
+              }
+            case wnm: WhenNotMatched =>
+              val existsOnSrc = col("exist_on_target").isNull.and(
+                col("exist_on_src").isNotNull)
+              column = if (column == null) {
+                existsOnSrc
+              } else {
+                column.and(existsOnSrc)
+              }
+            case wnm: WhenNotMatchedAndExistsOnlyOnTarget =>
+              val existsOnSrc = col("exist_on_target").isNotNull.and(
+                col("exist_on_src").isNull)
+              column = if (column == null) {
+                existsOnSrc
+              } else {
+                column.and(existsOnSrc)
+              }
+            case _ =>
+          }
+          status = status | 1 << j
+        }
+        j += 1
+      }
+      if (column == null) {
+        column = lit(true) === lit(true)
+      }
+      exprList += ((column, status))
+      i += 1
+    }
+    exprList = exprList.reverse
+    var condition: Column = null
+    exprList.foreach { case (col, status) =>
+      if (condition == null) {
+        condition = when(col, lit(status))
+      } else {
+        condition = condition.when(col, lit(status))
+      }
+    }
+    condition.otherwise(lit(null))
+  }
+
+  private def getSelectExpressionsOnExistingDF(existingDs: Dataset[Row],
+      mergeMatches: MergeDataSetMatches, sparkSession: SparkSession): 
Seq[Column] = {
+    var projects = Seq.empty[Attribute]
+    val existAttrs = existingDs.queryExecution.analyzed.output
+    projects ++= selectAttributes(mergeMatches.joinExpr.expr, existingDs, 
sparkSession)
+    mergeMatches.matchList.foreach { m =>
+      if (m.getExp.isDefined) {
+        projects ++= selectAttributes(m.getExp.get.expr, existingDs, 
sparkSession)
+      }
+      m.getActions.foreach {
+        case u: UpdateAction =>
+          projects ++= existAttrs.filterNot { f =>
+            u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name))
+          }
+        case i: InsertAction =>
+          if (!existAttrs.forall(f => i.insertMap
+            .exists(_._1.toString().equalsIgnoreCase(f.name)))) {
+            throw new CarbonMergeDataSetException(
+              "Not all source columns are mapped for insert action " + 
i.insertMap)
+          }
+          i.insertMap.foreach { case (k, v) =>
+            projects ++= selectAttributes(v.expr, existingDs, sparkSession)
+          }
+        case _ =>
+      }
+    }
+    projects.map(_.name.toLowerCase).distinct.map { p =>
+      existingDs.col(p)
+    }
+  }
+
+  private def updateMappingIfNotExists(mergeMatches: MergeDataSetMatches,
+      existingDs: Dataset[Row]): MergeDataSetMatches = {
+    val existAttrs = existingDs.queryExecution.analyzed.output
+    val updateCommand = mergeMatches.matchList.map { m =>
+      val updateAction = m.getActions.map {
+        case u: UpdateAction =>
+          if (u.updateMap.isEmpty) {
+            throw new CarbonMergeDataSetException(
+              "At least one column supposed to be updated for update action")
+          }
+          val attributes = existAttrs.filterNot { f =>
+            u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name))
+          }
+          val newMap = attributes.map(a => (existingDs.col(a.name), 
existingDs.col(a.name))).toMap
+          u.copy(u.updateMap ++ newMap)
+        case other => other
+      }
+      m.updateActions(updateAction)
+    }
+    mergeMatches.copy(matchList =
+      
updateCommand.filterNot(_.getActions.exists(_.isInstanceOf[DeleteAction]))
+      ++ 
updateCommand.filter(_.getActions.exists(_.isInstanceOf[DeleteAction])))
+  }
+
+  private def selectAttributes(expression: Expression, existingDs: 
Dataset[Row],
+      sparkSession: SparkSession, throwError: Boolean = false) = {
+    expression.collect {
+      case a: Attribute =>
+        val resolved = existingDs.queryExecution
+          .analyzed.resolveQuoted(a.name, 
sparkSession.sessionState.analyzer.resolver)
+        if (resolved.isDefined) {
+          resolved.get.toAttribute
+        } else if (throwError) {
+          throw new CarbonMergeDataSetException(
+            expression + " cannot be resolved with dataset " + existingDs)
+        } else {
+          null
+        }
+    }.filter(_ != null)
+  }
+
+  private def collectCarbonRelation(plan: LogicalPlan): 
Seq[CarbonDatasourceHadoopRelation] = {
+    plan collect {
+      case l: LogicalRelation if 
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    }
+  }
+
+  private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = {
+    val insertHistOfUpdate = mergeMatches.matchList.exists(p =>
+      p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+      && p.getActions.exists(_.isInstanceOf[UpdateAction]))
+    val insertHistOfDelete = mergeMatches.matchList.exists(p =>
+      p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+      && p.getActions.exists(_.isInstanceOf[DeleteAction]))
+    (insertHistOfUpdate, insertHistOfDelete)
+  }
+
+  private def validateMergeActions(mergeMatches: MergeDataSetMatches,
+      existingDs: Dataset[Row], sparkSession: SparkSession): Unit = {
+    val existAttrs = existingDs.queryExecution.analyzed.output
+    if (mergeMatches.matchList.exists(m => 
m.getActions.exists(_.isInstanceOf[DeleteAction])
+                                           && 
m.getActions.exists(_.isInstanceOf[UpdateAction]))) {
+      throw new AnalysisException(
+        "Delete and update action should not be under same merge condition")
+    }
+    if (mergeMatches.matchList.count(m => 
m.getActions.exists(_.isInstanceOf[DeleteAction])) > 1) {
+      throw new AnalysisException("Delete action should not be more than once 
across merge")
+    }
+    mergeMatches.matchList.foreach { f =>
+      if (f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])) {
+        if (!(f.getActions.exists(_.isInstanceOf[UpdateAction]) ||
+              f.getActions.exists(_.isInstanceOf[DeleteAction]))) {
+          throw new AnalysisException("For inserting to history table, " +
+                              "it should be along with either update or delete 
action")
+        }
+        val value = 
f.getActions.find(_.isInstanceOf[InsertInHistoryTableAction]).get.
+          asInstanceOf[InsertInHistoryTableAction]
+        if (!existAttrs.forall(f => value.insertMap
+          .exists(_._1.toString().equalsIgnoreCase(f.name)))) {
+          throw new AnalysisException(
+            "Not all source columns are mapped for insert action " + 
value.insertMap)
+        }
+        value.insertMap.foreach { case (k, v) =>
+          selectAttributes(v.expr, existingDs, sparkSession, true)
+        }
+      }
+    }
+  }
+
+  override protected def opName: String = "MERGE DATASET"
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
new file mode 100644
index 0000000..be59b9a
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+/**
+ * Exception during merge operation.
+ */
+class CarbonMergeDataSetException(msg: String, exception: Throwable)
+  extends Exception(msg, exception) {
+
+  def this(exception: Throwable) {
+    this("", exception)
+  }
+
+  def this(msg: String) {
+    this(msg, null)
+  }
+
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
new file mode 100644
index 0000000..6ef7c37
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.DataTypeConverterImpl
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonDeltaRowScanRDD
+import org.apache.carbondata.spark.readsupport.SparkGenericRowReadSupportImpl
+
+object HistoryTableLoadHelper {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * Load to history table by reading the data from target table using the 
last transactions. So
+   * here we read the deleted data from target table by using delta and load 
them to history table.
+   */
+  def loadHistoryTable(sparkSession: SparkSession,
+      rltn: CarbonDatasourceHadoopRelation,
+      carbonTable: CarbonTable,
+      trxMgr: TranxManager,
+      mutationAction: MutationAction,
+      mergeMatches: MergeDataSetMatches): Unit = {
+    if (!mutationAction.isInstanceOf[HandleUpdateAndDeleteAction]) {
+      val insert = mergeMatches
+        .matchList
+        .filter { f =>
+          f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+        }
+        .head
+        .getActions
+        .find(_.isInstanceOf[InsertInHistoryTableAction])
+        .get
+        .asInstanceOf[InsertInHistoryTableAction]
+      // Get the history table dataframe.
+      val histDataFrame: Dataset[Row] = sparkSession.table(insert.historyTable)
+      // check if the user wants to insert update history records into history 
table.
+      val updateDataFrame = if (trxMgr.getUpdateTrx != -1) {
+        // Get the insertHistoryAction related to update action.
+        val insertHist = mergeMatches.matchList.filter { f =>
+          f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) &&
+          f.getActions.exists(_.isInstanceOf[UpdateAction])
+        
}.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head.
+          asInstanceOf[InsertInHistoryTableAction]
+        // Create the dataframe to fetch history updated records.
+        Some(createHistoryDataFrame(sparkSession, rltn, carbonTable, 
insertHist,
+          histDataFrame, trxMgr.getUpdateTrx))
+      } else {
+        None
+      }
+      // check if the user wants to insert delete history records into history 
table.
+      val delDataFrame = if (trxMgr.getDeleteTrx != -1) {
+        val insertHist = mergeMatches.matchList.filter { f =>
+          f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) &&
+          f.getActions.exists(_.isInstanceOf[DeleteAction])
+        
}.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head.
+          asInstanceOf[InsertInHistoryTableAction]
+        Some(createHistoryDataFrame(sparkSession, rltn, carbonTable, 
insertHist,
+          histDataFrame: Dataset[Row], trxMgr.getDeleteTrx))
+      } else {
+        None
+      }
+
+      val unionDf = (updateDataFrame, delDataFrame) match {
+        case (Some(u), Some(d)) => u.union(d)
+        case (Some(u), None) => u
+        case (None, Some(d)) => d
+        case _ => throw new CarbonMergeDataSetException("Some thing is wrong")
+      }
+
+      val alias = carbonTable.getTableName + System.currentTimeMillis()
+      unionDf.createOrReplaceTempView(alias)
+      val start = System.currentTimeMillis()
+      sparkSession.sql(s"insert into ${ insert.historyTable.quotedString } " +
+                       s"select * from ${ alias }")
+      LOGGER.info("Time taken to insert into history table " + 
(System.currentTimeMillis() - start))
+    }
+  }
+
+  /**
+   * It creates the dataframe to fetch deleted/updated records in the 
particular transaction.
+   */
+  private def createHistoryDataFrame(sparkSession: SparkSession,
+      rltn: CarbonDatasourceHadoopRelation,
+      carbonTable: CarbonTable,
+      insertHist: InsertInHistoryTableAction,
+      histDataFrame: Dataset[Row],
+      factTimestamp: Long) = {
+    val rdd1 = new CarbonDeltaRowScanRDD[Row](sparkSession,
+      carbonTable.getTableInfo.serialize(),
+      carbonTable.getTableInfo,
+      null,
+      new CarbonProjection(
+        carbonTable.getCreateOrderColumn().asScala.map(_.getColName).toArray),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      new CarbonInputMetrics,
+      classOf[DataTypeConverterImpl],
+      classOf[SparkGenericRowReadSupportImpl],
+      factTimestamp.toString)
+
+    val frame1 = sparkSession.createDataFrame(rdd1, rltn.carbonRelation.schema)
+    val histOutput = histDataFrame.queryExecution.analyzed.output
+    val cols = histOutput.map { a =>
+      insertHist.insertMap.find(p => p._1.toString().equalsIgnoreCase(a.name)) 
match {
+        case Some((k, v)) => v
+        case _ =>
+          throw new CarbonMergeDataSetException(
+            " All columns of history table are mapped in " + insertHist)
+      }
+    }
+    frame1.select(cols: _*)
+  }
+
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
new file mode 100644
index 0000000..2525dcd
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, Column, Dataset, Row, 
SparkSession}
+import org.apache.spark.sql.functions.expr
+
+/**
+ * Builder class to generate and execute merge
+ */
+class MergeDataSetBuilder(existingDsOri: Dataset[Row], currDs: Dataset[Row],
+    joinExpr: Column, sparkSession: SparkSession) {
+
+  def this(existingDsOri: Dataset[Row], currDs: Dataset[Row],
+      joinExpr: String, sparkSession: SparkSession) {
+    this(existingDsOri, currDs, expr(joinExpr), sparkSession)
+  }
+
+  val matchList: util.List[MergeMatch] = new util.ArrayList[MergeMatch]()
+
+  def whenMatched(): MergeDataSetBuilder = {
+    matchList.add(WhenMatched())
+    this
+  }
+
+  def whenMatched(expression: String): MergeDataSetBuilder = {
+    matchList.add(WhenMatched(Some(expr(expression))))
+    this
+  }
+
+  def whenMatched(expression: Column): MergeDataSetBuilder = {
+    matchList.add(WhenMatched(Some(expression)))
+    this
+  }
+
+  def whenNotMatched(): MergeDataSetBuilder = {
+    matchList.add(WhenNotMatched())
+    this
+  }
+
+  def whenNotMatched(expression: String): MergeDataSetBuilder = {
+    matchList.add(WhenNotMatched(Some(expr(expression))))
+    this
+  }
+
+  def whenNotMatched(expression: Column): MergeDataSetBuilder = {
+    matchList.add(WhenNotMatched(Some(expression)))
+    this
+  }
+
+  def whenNotMatchedAndExistsOnlyOnTarget(): MergeDataSetBuilder = {
+    matchList.add(WhenNotMatchedAndExistsOnlyOnTarget())
+    this
+  }
+
+  def whenNotMatchedAndExistsOnlyOnTarget(expression: String): 
MergeDataSetBuilder = {
+    matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expr(expression))))
+    this
+  }
+
+  def whenNotMatchedAndExistsOnlyOnTarget(expression: Column): 
MergeDataSetBuilder = {
+    matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expression)))
+    this
+  }
+
+  def updateExpr(expression: Map[Any, Any]): MergeDataSetBuilder = {
+    checkBuilder
+    matchList.get(matchList.size() - 
1).addAction(UpdateAction(convertMap(expression)))
+    this
+  }
+
+  def insertExpr(expression: Map[Any, Any]): MergeDataSetBuilder = {
+    checkBuilder
+    matchList.get(matchList.size() - 
1).addAction(InsertAction(convertMap(expression)))
+    this
+  }
+
+  def delete(): MergeDataSetBuilder = {
+    checkBuilder
+    matchList.get(matchList.size() - 1).addAction(DeleteAction())
+    this
+  }
+
+  def build(): CarbonMergeDataSetCommand = {
+    checkBuilder
+    CarbonMergeDataSetCommand(existingDsOri, currDs,
+      MergeDataSetMatches(joinExpr, matchList.asScala.toList))
+  }
+
+  def execute(): Unit = {
+    build().run(sparkSession)
+  }
+
+  private def convertMap(exprMap: Map[Any, Any]): Map[Column, Column] = {
+    if (exprMap.exists{ case (k, v) =>
+      !(checkType(k) && checkType(v))
+    }) {
+      throw new AnalysisException(
+        "Expression map should only contain either String or Column " + 
exprMap)
+    }
+    def checkType(obj: Any) = obj.isInstanceOf[String] || 
obj.isInstanceOf[Column]
+    def convert(obj: Any) =
+      if (obj.isInstanceOf[Column]) obj.asInstanceOf[Column] else 
expr(obj.toString)
+    exprMap.map{ case (k, v) =>
+      (convert(k), convert(v))
+    }
+  }
+
+  private def checkBuilder(): Unit = {
+    if (matchList.size() == 0) {
+      throw new AnalysisException("Atleast one matcher should be called before 
calling an action")
+    }
+  }
+
+}
+
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
new file mode 100644
index 0000000..1245bd4
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GenericInternalRow, GenericRowWithSchema, InterpretedMutableProjection, 
Projection}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DateType, TimestampType}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Creates the projection for each action like update,delete or insert.
+ */
+case class MergeProjection(
+    @transient tableCols: Seq[String],
+    @transient ds: Dataset[Row],
+    @transient rltn: CarbonDatasourceHadoopRelation,
+    @transient sparkSession: SparkSession,
+    @transient mergeAction: MergeAction) {
+
+  private val cutOffDate = Integer.MAX_VALUE >> 1
+
+  val isUpdate = mergeAction.isInstanceOf[UpdateAction]
+  val isDelete = mergeAction.isInstanceOf[DeleteAction]
+
+  def apply(row: GenericRowWithSchema): Array[Object] = {
+    // TODO we can avoid these multiple conversions if this is added as a 
SparkPlan node.
+    val values = row.toSeq.map {
+      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+      case d: java.math.BigDecimal => 
org.apache.spark.sql.types.Decimal.apply(d)
+      case b: Array[Byte] => 
org.apache.spark.unsafe.types.UTF8String.fromBytes(b)
+      case d: Date => DateTimeUtils.fromJavaDate(d)
+      case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+      case value => value
+    }
+
+    val outputRow = projection(new GenericInternalRow(values.toArray))
+      .asInstanceOf[GenericInternalRow]
+
+    val array = outputRow.values.clone()
+    var i = 0
+    while (i < array.length) {
+      output(i).dataType match {
+        case d: DateType =>
+          if (array(i) == null) {
+            array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+          } else {
+            array(i) = (array(i).asInstanceOf[Int] + cutOffDate)
+          }
+        case d: TimestampType =>
+          if (array(i) == null) {
+            array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+          } else {
+            array(i) = (array(i).asInstanceOf[Long] / 1000)
+          }
+
+        case _ =>
+      }
+      i += 1
+    }
+    array.asInstanceOf[Array[Object]]
+  }
+
+  val (projection, output) = generateProjection
+
+  private def generateProjection: (Projection, Array[Expression]) = {
+    val existingDsOutput = rltn.carbonRelation.schema.toAttributes
+    val colsMap = mergeAction match {
+      case UpdateAction(updateMap) => updateMap
+      case InsertAction(insertMap) => insertMap
+      case _ => null
+    }
+    if (colsMap != null) {
+      val output = new Array[Expression](tableCols.length)
+      val expecOutput = new Array[Expression](tableCols.length)
+      colsMap.foreach { case (k, v) =>
+        val tableIndex = tableCols.indexOf(k.toString().toLowerCase)
+        if (tableIndex < 0) {
+          throw new CarbonMergeDataSetException(s"Mapping is wrong $colsMap")
+        }
+        output(tableIndex) = v.expr.transform {
+          case a: Attribute if !a.resolved =>
+            ds.queryExecution.analyzed.resolveQuoted(a.name,
+              sparkSession.sessionState.analyzer.resolver).get
+        }
+        expecOutput(tableIndex) =
+          
existingDsOutput.find(_.name.equalsIgnoreCase(tableCols(tableIndex))).get
+      }
+      if (output.contains(null)) {
+        throw new CarbonMergeDataSetException(s"Not all columns are mapped")
+      }
+      (new InterpretedMutableProjection(output, 
ds.queryExecution.analyzed.output), expecOutput)
+    } else {
+      (null, null)
+    }
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
new file mode 100644
index 0000000..7410686
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.command.mutation.DeleteExecution
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
SegmentUpdateDetails}
+import org.apache.carbondata.processing.loading.FailureCauses
+
+/**
+ * It apply the mutations like update and delete delta on to the store.
+ */
+abstract class MutationAction(sparkSession: SparkSession, carbonTable: 
CarbonTable) {
+
+  /**
+   * The RDD of tupleids and delta status will be processed here to write the 
delta on store
+   */
+  def handleAction(dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment])
+
+  protected def handle(sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      factTimestamp: Long,
+      dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      condition: (Int) => Boolean): (util.List[SegmentUpdateDetails], 
Seq[Segment]) = {
+    val update = dataRDD.filter { row =>
+      val status = row.get(1)
+      status != null && condition(status.asInstanceOf[Int])
+    }
+    val tuple1 = 
DeleteExecution.deleteDeltaExecutionInternal(Some(carbonTable.getDatabaseName),
+      carbonTable.getTableName,
+      sparkSession, update,
+      factTimestamp.toString,
+      true, executorErrors, Some(0))
+    MutationActionFactory.checkErrors(executorErrors)
+    val tupleProcessed1 = DeleteExecution.processSegments(executorErrors, 
tuple1._1, carbonTable,
+      factTimestamp.toString, tuple1._2)
+    MutationActionFactory.checkErrors(executorErrors)
+    tupleProcessed1
+  }
+
+}
+
+/**
+ * It apply the update delta records to store in one transaction
+ */
+case class HandleUpdateAction(sparkSession: SparkSession, carbonTable: 
CarbonTable)
+  extends MutationAction(sparkSession, carbonTable) {
+
+  override def handleAction(dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = 
{
+    handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+      dataRDD, executorErrors, (status) => (status == 101) || (status == 102))
+  }
+}
+
+/**
+ * It apply the delete delta records to store in one transaction
+ */
+case class HandleDeleteAction(sparkSession: SparkSession, carbonTable: 
CarbonTable)
+  extends MutationAction(sparkSession, carbonTable) {
+
+  override def handleAction(dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = 
{
+    handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+      dataRDD, executorErrors, (status) => (status == 100) || (status == 102))
+  }
+}
+
+/**
+ * It apply the multiple mutations of delta records to store in multiple 
transactions.
+ */
+case class MultipleMutationAction(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    mutations: Seq[MutationAction])
+  extends MutationAction(sparkSession, carbonTable) {
+
+  override def handleAction(dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = 
{
+    var (updates: util.List[SegmentUpdateDetails], segs: Seq[Segment]) =
+      (new util.ArrayList[SegmentUpdateDetails], Seq.empty[Segment])
+    mutations.foreach { m =>
+      val (l, r) = m.handleAction(dataRDD, executorErrors, trxMgr)
+      l.asScala.foreach { entry =>
+        CarbonUpdateUtil.mergeSegmentUpdate(false, updates, entry)
+      }
+      segs ++= r
+    }
+    (updates, segs.distinct)
+  }
+}
+
+/**
+ * It apply the delete and update delta records to store in a single 
transaction
+ */
+case class HandleUpdateAndDeleteAction(sparkSession: SparkSession, 
carbonTable: CarbonTable)
+  extends MutationAction(sparkSession, carbonTable) {
+
+  override def handleAction(dataRDD: RDD[Row],
+      executorErrors: ExecutionErrors,
+      trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = 
{
+    handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+      dataRDD, executorErrors, (status) => (status == 100) || (status == 101) 
|| (status == 102))
+  }
+}
+
+object MutationActionFactory {
+
+  /**
+   * It is a factory method to generate a respective mutation action for 
update and delete.
+   */
+  def getMutationAction(sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      hasDelAction: Boolean,
+      hasUpAction: Boolean,
+      hasInsrtHistUpd: Boolean,
+      hasInsrtHistDel: Boolean): MutationAction = {
+    var actions = Seq.empty[MutationAction]
+    // If the merge has history insert action then write the delete delta in 
two separate actions.
+    // As it is needed to know which are deleted records and which are insert 
records.
+    if (hasInsrtHistDel || hasInsrtHistUpd) {
+      if (hasUpAction) {
+        actions ++= Seq(HandleUpdateAction(sparkSession, carbonTable))
+      }
+      if (hasDelAction) {
+        actions ++= Seq(HandleDeleteAction(sparkSession, carbonTable))
+      }
+    } else {
+      // If there is no history insert action then apply it in single flow.
+      actions ++= Seq(HandleUpdateAndDeleteAction(sparkSession, carbonTable))
+    }
+    if (actions.length == 1) {
+      actions.head
+    } else {
+      // If it has multiple actions to apply then combine to multi action.
+      MultipleMutationAction(sparkSession, carbonTable, actions)
+    }
+  }
+
+  def checkErrors(executorErrors: ExecutionErrors): Unit = {
+    // Check for any failures occured during delete delta execution
+    if (executorErrors.failureCauses != FailureCauses.NONE) {
+      throw new CarbonMergeDataSetException(executorErrors.errorMsg)
+    }
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
new file mode 100644
index 0000000..8b5e4e8
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+/**
+ * It manages the transaction number for update or delete operations. Since we 
are applying update
+ * and delete delta records in a separate transactions it is required to keep 
track of transaction
+ * numbers.
+ */
+case class TranxManager(factTimestamp: Long) {
+
+  private var newFactTimestamp: Long = factTimestamp
+  private var mutationMap = Map.empty[MutationAction, Long]
+
+  def getNextTransaction(mutationAction: MutationAction): Long = {
+    if (mutationMap.isEmpty) {
+      mutationMap ++= Map[MutationAction, Long]((mutationAction, 
newFactTimestamp))
+    } else {
+      if (mutationMap.get(mutationAction).isDefined) {
+        return mutationMap(mutationAction)
+      } else {
+        newFactTimestamp = newFactTimestamp + 1
+        mutationMap ++= Map[MutationAction, Long]((mutationAction, 
newFactTimestamp))
+      }
+    }
+    newFactTimestamp
+  }
+
+  def getLatestTrx: Long = newFactTimestamp
+
+  def getUpdateTrx: Long = {
+    val map = mutationMap.filter(_._1.isInstanceOf[HandleUpdateAction])
+    if (map.isEmpty) {
+      -1
+    } else {
+      map.head._2
+    }
+  }
+
+  def getDeleteTrx: Long = {
+    val map = mutationMap.filter(_._1.isInstanceOf[HandleDeleteAction])
+    if (map.isEmpty) {
+      -1
+    } else {
+      map.head._2
+    }
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
new file mode 100644
index 0000000..91f0322
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.util.LongAccumulator
+
+/**
+ * It describes the type of match like whenmatched or whennotmatched etc., it 
holds all the actions
+ * to be done when this match passes.
+ */
+abstract class MergeMatch extends Serializable {
+
+  var list: ArrayBuffer[MergeAction] = new ArrayBuffer[MergeAction]()
+
+  def getExp: Option[Column]
+
+  def addAction(action: MergeAction): MergeMatch = {
+    list += action
+    this
+  }
+
+  def getActions: List[MergeAction] = {
+    list.toList
+  }
+
+  def updateActions(actions: List[MergeAction]): MergeMatch = {
+    list = new ArrayBuffer[MergeAction]()
+    list ++= actions
+    this
+  }
+}
+
+/**
+ * It describes the type of action like update,delete or insert
+ */
+trait MergeAction extends Serializable
+
+/**
+ * It is the holder to keep all the matches and join condition.
+ */
+case class MergeDataSetMatches(joinExpr: Column, matchList: List[MergeMatch]) 
extends Serializable
+
+case class WhenMatched(expression: Option[Column] = None) extends MergeMatch {
+  override def getExp: Option[Column] = expression
+}
+
+case class WhenNotMatched(expression: Option[Column] = None) extends 
MergeMatch {
+  override def getExp: Option[Column] = expression
+}
+
+case class WhenNotMatchedAndExistsOnlyOnTarget(expression: Option[Column] = 
None)
+  extends MergeMatch {
+  override def getExp: Option[Column] = expression
+}
+
+case class UpdateAction(updateMap: Map[Column, Column]) extends MergeAction
+
+case class InsertAction(insertMap: Map[Column, Column]) extends MergeAction
+
+/**
+ * It inserts the history data into history table
+ */
+case class InsertInHistoryTableAction(insertMap: Map[Column, Column], 
historyTable: TableIdentifier)
+  extends MergeAction
+
+case class DeleteAction() extends MergeAction
+
+case class Stats(insertedRows: LongAccumulator,
+    updatedRows: LongAccumulator,
+    deletedRows: LongAccumulator)

Reply via email to