PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> mutations (addendum)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5b493962
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5b493962
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5b493962

Branch: refs/heads/4.x-HBase-1.2
Commit: 5b493962a437a264bb45f9698223a59f43623373
Parents: 4968e88
Author: Thomas D'Silva <tdsi...@apache.org>
Authored: Tue Nov 21 03:13:53 2017 +0000
Committer: James Taylor <jtay...@salesforce.com>
Committed: Sat Dec 16 16:42:54 2017 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java |   5 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  11 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   7 +-
 .../apache/phoenix/execute/MutationState.java   | 127 ++++++++++++-------
 .../java/org/apache/phoenix/util/IndexUtil.java |   4 +-
 .../org/apache/phoenix/util/KeyValueUtil.java   |   5 +-
 6 files changed, 98 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 10fd7f8..e5b57e3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -33,7 +33,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -52,8 +51,8 @@ import 
org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseOwnClusterIT;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.monitoring.MetricType;
@@ -285,7 +284,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
     private PhoenixConnection 
getConnectionWithTableOrderPreservingMutationState() throws SQLException {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new 
PhoenixConnection(con.unwrap(PhoenixConnection.class));
-        final 
Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = 
Maps.newTreeMap(new TableRefComparator());
+        final Map<TableRef, MultiRowMutationState> mutations = 
Maps.newTreeMap(new TableRefComparator());
         // passing a null mutation state forces the 
connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f9ca300..a06e2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -43,6 +42,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -91,7 +91,6 @@ import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
 
 public class DeleteCompiler {
@@ -121,14 +120,14 @@ public class DeleteCompiler {
         final int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int maxSizeBytes = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         final int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
-        Map<ImmutableBytesPtr,RowMutationState> mutations = 
Maps.newHashMapWithExpectedSize(batchSize);
-        List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
+        MultiRowMutationState mutations = new MultiRowMutationState(batchSize);
+        List<MultiRowMutationState> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the 
index table and
         // the data table through a single query to save executing an 
additional one.
         if (!otherTableRefs.isEmpty()) {
             indexMutations = 
Lists.newArrayListWithExpectedSize(otherTableRefs.size());
             for (int i = 0; i < otherTableRefs.size(); i++) {
-                
indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
+                indexMutations.add(new MultiRowMutationState(batchSize));
             }
         }
         List<PColumn> pkColumns = table.getPKColumns();
@@ -644,7 +643,7 @@ public class DeleteCompiler {
             // keys for our ranges
             ScanRanges ranges = context.getScanRanges();
             Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
-            Map<ImmutableBytesPtr,RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+            MultiRowMutationState mutation = new 
MultiRowMutationState(ranges.getPointLookupCount());
             while (iterator.hasNext()) {
                 mutation.put(new 
ImmutableBytesPtr(iterator.next().getLowerRange()),
                         new RowMutationState(PRow.DELETE_MARKER, 0,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index a51fd4c..a81a427 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 import org.apache.phoenix.expression.Determinism;
@@ -116,7 +117,7 @@ import com.google.common.collect.Sets;
 public class UpsertCompiler {
 
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes,
-            PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
+            PTable table, MultiRowMutationState mutation,
             PhoenixStatement statement, boolean useServerTimestamp, 
IndexMaintainer maintainer,
             byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) 
throws SQLException {
         long columnValueSize = 0;
@@ -197,7 +198,7 @@ public class UpsertCompiler {
             }
         }
         int rowCount = 0;
-        Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(batchSize);
+        MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
         PTable table = tableRef.getTable();
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
@@ -1177,7 +1178,7 @@ public class UpsertCompiler {
                     throw new IllegalStateException();
                 }
             }
-            Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(1);
+            MultiRowMutationState mutation = new MultiRowMutationState(1);
             IndexMaintainer indexMaintainer = null;
             byte[][] viewConstants = null;
             if (table.getIndexType() == IndexType.LOCAL) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 15e905a..993438e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -123,7 +123,7 @@ public class MutationState implements SQLCloseable {
     private final long batchSize;
     private final long batchSizeBytes;
     private long batchCount = 0L;
-    private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
+    private final Map<TableRef, MultiRowMutationState> mutations;
     private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
 
     private long sizeOffset;
@@ -131,7 +131,7 @@ public class MutationState implements SQLCloseable {
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations 
= Collections.emptyMap();
+    private Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
 
     final PhoenixTransactionContext phoenixTransactionContext;
 
@@ -159,12 +159,12 @@ public class MutationState implements SQLCloseable {
     }
 
     private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, boolean subTask, PhoenixTransactionContext txContext, long 
sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, 
txContext);
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
 
     MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            Map<TableRef, MultiRowMutationState> mutations,
             boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
@@ -189,7 +189,7 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+    public MutationState(TableRef table, MultiRowMutationState mutations, long 
sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  
throws SQLException {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         if (!mutations.isEmpty()) {
             this.mutations.put(table, mutations);
@@ -350,7 +350,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, long 
maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, 
connection, Collections.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, 
connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, 
null);
         state.sizeOffset = 0;
         return state;
     }
@@ -372,12 +372,12 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
     
-    private void joinMutationState(TableRef tableRef, 
Map<ImmutableBytesPtr,RowMutationState> srcRows,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
dstMutations) {
+    private void joinMutationState(TableRef tableRef, MultiRowMutationState 
srcRows,
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         PTable table = tableRef.getTable();
         boolean isIndex = table.getType() == PTableType.INDEX;
         boolean incrementRowCount = dstMutations == this.mutations;
-        Map<ImmutableBytesPtr,RowMutationState> existingRows = 
dstMutations.put(tableRef, srcRows);
+        MultiRowMutationState existingRows = dstMutations.put(tableRef, 
srcRows);
         if (existingRows != null) { // Rows for that table already exist
             // Loop through new rows and replace existing with new
             for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : 
srcRows.entrySet()) {
@@ -389,8 +389,12 @@ public class MutationState implements SQLCloseable {
                         Map<PColumn,byte[]> newRow = 
rowEntry.getValue().getColumnValues();
                         // if new row is PRow.DELETE_MARKER, it means delete, 
and we don't need to merge it with existing row. 
                         if (newRow != PRow.DELETE_MARKER) {
+                            // decrement estimated size by the size of the old 
row
+                            
estimatedSize-=existingRowMutationState.calculateEstimatedSize();
                             // Merge existing column values with new column 
values
                             existingRowMutationState.join(rowEntry.getValue());
+                            // increment estimated size by the size of the new 
row
+                            
estimatedSize+=existingRowMutationState.calculateEstimatedSize();
                             // Now that the existing row has been merged with 
the new row, replace it back
                             // again (since it was merged with the new one 
above).
                             existingRows.put(rowEntry.getKey(), 
existingRowMutationState);
@@ -399,6 +403,8 @@ public class MutationState implements SQLCloseable {
                 } else {
                     if (incrementRowCount && !isIndex) { // Don't count index 
rows in row count
                         numRows++;
+                        // increment estimated size by the size of the new row
+                        estimatedSize += 
rowEntry.getValue().calculateEstimatedSize();
                     }
                 }
             }
@@ -406,22 +412,25 @@ public class MutationState implements SQLCloseable {
             dstMutations.put(tableRef, existingRows);
         } else {
             // Size new map at batch size as that's what it'll likely grow to.
-            Map<ImmutableBytesPtr,RowMutationState> newRows = 
Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+            MultiRowMutationState newRows = new 
MultiRowMutationState(connection.getMutateBatchSize());
             newRows.putAll(srcRows);
             dstMutations.put(tableRef, newRows);
             if (incrementRowCount && !isIndex) {
                 numRows += srcRows.size();
+                // if we added all the rows from newMutationState we can just 
increment the
+                // estimatedSize by newMutationState.estimatedSize
+                estimatedSize +=  srcRows.estimatedSize;
             }
         }
     }
     
-    private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, 
RowMutationState>> srcMutations, 
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
dstMutations) {
+    private void joinMutationState(Map<TableRef, MultiRowMutationState> 
srcMutations, 
+            Map<TableRef, MultiRowMutationState> dstMutations) {
         // Merge newMutation with this one, keeping state from newMutation for 
any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : srcMutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
srcMutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
-            Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+            MultiRowMutationState srcRows = entry.getValue();
             joinMutationState(tableRef, srcRows, dstMutations);
         }
     }
@@ -439,19 +448,7 @@ public class MutationState implements SQLCloseable {
         
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
 
         this.sizeOffset += newMutationState.sizeOffset;
-        int oldNumRows = this.numRows;
         joinMutationState(newMutationState.mutations, this.mutations);
-        if (newMutationState.numRows>0) {
-            // if we added all the rows from newMutationState we can just 
increment the
-            // estimatedSize by newMutationState.estimatedSize
-            if (newMutationState.numRows == this.numRows-oldNumRows) {
-                this.estimatedSize +=  newMutationState.estimatedSize;
-            }
-            // we merged the two mutation states so we need to recalculate the 
size
-            else {
-                this.estimatedSize = 
KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
-            }
-        }
         if (!newMutationState.txMutations.isEmpty()) {
             if (txMutations.isEmpty()) {
                 txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
@@ -489,7 +486,7 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final 
TableRef tableRef, final MultiRowMutationState values,
             final long mutationTimestamp, final long serverTimestamp, boolean 
includeAllIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with 
immutable rows through this client-side mechanism
@@ -524,10 +521,10 @@ public class MutationState implements SQLCloseable {
                     // we may also have to include delete mutations for 
immutable tables if we are not processing all the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
-                        Map<ImmutableBytesPtr, RowMutationState> 
rowToColumnMap = mutations.remove(key);
-                        if (rowToColumnMap!=null) {
+                        MultiRowMutationState multiRowMutationState = 
mutations.remove(key);
+                        if (multiRowMutationState!=null) {
                             final List<Mutation> deleteMutations = 
Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, 
serverTimestamp, rowToColumnMap, deleteMutations, null);
+                            generateMutations(tableRef, mutationTimestamp, 
serverTimestamp, multiRowMutationState, deleteMutations, null);
                             indexMutations.addAll(deleteMutations);
                         }
                     }
@@ -546,14 +543,14 @@ public class MutationState implements SQLCloseable {
     }
 
     private void generateMutations(final TableRef tableRef, final long 
mutationTimestamp,
-            final long serverTimestamp, final Map<ImmutableBytesPtr, 
RowMutationState> values,
+            final long serverTimestamp, final MultiRowMutationState values,
             final List<Mutation> mutationList, final List<Mutation> 
mutationsPertainingToIndex) {
         final PTable table = tableRef.getTable();
         boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
         Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
                 values.entrySet().iterator();
         long timestampToUse = mutationTimestamp;
-        Map<ImmutableBytesPtr, RowMutationState> modifiedValues = 
Maps.newHashMap();
+        MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
         while (iterator.hasNext()) {
             Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = 
iterator.next();
             byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes();
@@ -628,7 +625,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean 
includeMutableIndexes, final Long tableTimestamp) {
-        final Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>> iterator = 
this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = 
this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Collections.emptyIterator();
         }
@@ -636,7 +633,7 @@ public class MutationState implements SQLCloseable {
         final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
         final long mutationTimestamp = getMutationTimestamp(scn);
         return new Iterator<Pair<byte[],List<Mutation>>>() {
-            private Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+            private Map.Entry<TableRef, MultiRowMutationState> current = 
iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = 
init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -700,14 +697,14 @@ public class MutationState implements SQLCloseable {
     private long[] validateAll() throws SQLException {
         int i = 0;
         long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : mutations.entrySet()) {
+        for (Map.Entry<TableRef, MultiRowMutationState> entry : 
mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
             timeStamps[i++] = validateAndGetServerTimestamp(tableRef, 
entry.getValue());
         }
         return timeStamps;
     }
     
-    private long validateAndGetServerTimestamp(TableRef tableRef, 
Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException 
{
+    private long validateAndGetServerTimestamp(TableRef tableRef, 
MultiRowMutationState rowKeyToColumnMap) throws SQLException {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long serverTimeStamp = tableRef.getTimeStamp();
@@ -919,7 +916,7 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
-        Map<ImmutableBytesPtr, RowMutationState> valuesMap;
+        MultiRowMutationState multiRowMutationState;
         Map<TableInfo,List<Mutation>> physicalTableMutationMap = 
Maps.newLinkedHashMap(); 
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
@@ -928,16 +925,16 @@ public class MutationState implements SQLCloseable {
             while (tableRefIterator.hasNext()) {
                 // at this point we are going through mutations for each table
                 final TableRef tableRef = tableRefIterator.next();
-                valuesMap = mutations.get(tableRef);
-                if (valuesMap == null || valuesMap.isEmpty()) {
+                multiRowMutationState = mutations.get(tableRef);
+                if (multiRowMutationState == null || 
multiRowMutationState.isEmpty()) {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
-                long serverTimestamp = serverTimeStamps == null ? 
validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
+                long serverTimestamp = serverTimeStamps == null ? 
validateAndGetServerTimestamp(tableRef, multiRowMutationState) : 
serverTimeStamps[i++];
                 Long scn = connection.getSCN();
                 long mutationTimestamp = scn == null ? 
HConstants.LATEST_TIMESTAMP : scn;
                 final PTable table = tableRef.getTable();
-                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, 
sendAll);
+                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, 
serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
@@ -955,7 +952,7 @@ public class MutationState implements SQLCloseable {
                 // involved in the transaction since none of them would have 
been
                 // committed in the event of a failure.
                 if (table.isTransactional()) {
-                    addUncommittedStatementIndexes(valuesMap.values());
+                    
addUncommittedStatementIndexes(multiRowMutationState.values());
                     if (txMutations.isEmpty()) {
                         txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
                     }
@@ -964,7 +961,7 @@ public class MutationState implements SQLCloseable {
                     // in the event that we need to replay the commit.
                     // Copy TableRef so we have the original PTable and know 
when the
                     // indexes have changed.
-                    joinMutationState(new TableRef(tableRef), valuesMap, 
txMutations);
+                    joinMutationState(new TableRef(tableRef), 
multiRowMutationState, txMutations);
                 }
             }
             long serverTimestamp = HConstants.LATEST_TIMESTAMP;
@@ -1188,7 +1185,7 @@ public class MutationState implements SQLCloseable {
     }
     
     private int[] getUncommittedStatementIndexes() {
-        for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : 
mutations.values()) {
+        for (MultiRowMutationState rowMutationMap : mutations.values()) {
             addUncommittedStatementIndexes(rowMutationMap.values());
         }
         return uncommittedStatementIndexes;
@@ -1221,7 +1218,7 @@ public class MutationState implements SQLCloseable {
     }
 
     public void commit() throws SQLException {
-        Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = 
Collections.emptyMap();
+        Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
         int retryCount = 0;
         do {
             boolean sendSuccessful=false;
@@ -1431,6 +1428,46 @@ public class MutationState implements SQLCloseable {
         }
     }
     
+    public static class MultiRowMutationState {
+        private Map<ImmutableBytesPtr,RowMutationState> 
rowKeyToRowMutationState;
+        private long estimatedSize;
+        
+        public MultiRowMutationState(int size) {
+            this.rowKeyToRowMutationState = 
Maps.newHashMapWithExpectedSize(size);
+            this.estimatedSize = 0;
+        }
+        
+        public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState 
rowMutationState) { 
+            estimatedSize += rowMutationState.calculateEstimatedSize();
+            return rowKeyToRowMutationState.put(ptr, rowMutationState);
+        }
+        
+        public void putAll(MultiRowMutationState other) {
+            estimatedSize += other.estimatedSize;
+            rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
+        }
+        
+        public boolean isEmpty() {
+            return rowKeyToRowMutationState.isEmpty();
+        }
+        
+        public int size() {
+            return rowKeyToRowMutationState.size();
+        }
+        
+        public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
+            return rowKeyToRowMutationState.entrySet();
+        }
+        
+        public void clear(){
+            rowKeyToRowMutationState.clear();
+        }
+        
+        public Collection<RowMutationState> values() {
+            return rowKeyToRowMutationState.values();
+        }
+    }
+    
     public static class RowMutationState {
         @Nonnull private Map<PColumn,byte[]> columnValues;
         private int[] statementIndexes;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index b23ea1b..74f91b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -73,7 +73,7 @@ import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -296,7 +296,7 @@ public class IndexUtil {
     }
     
     public static List<Mutation> generateIndexData(final PTable table, PTable 
index,
-            final Map<ImmutableBytesPtr, RowMutationState> valuesMap, 
List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, 
PhoenixConnection connection)
+            final MultiRowMutationState multiRowMutationState, List<Mutation> 
dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
             throws SQLException {
         try {
                final ImmutableBytesPtr ptr = new ImmutableBytesPtr();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5b493962/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index 318c9d6..df6a349 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -188,10 +189,10 @@ public class KeyValueUtil {
      * @return estimated row size
      */
     public static long
-            getEstimatedRowMutationSize(Map<TableRef, Map<ImmutableBytesPtr, 
RowMutationState>> tableMutationMap) {
+            getEstimatedRowMutationSize(Map<TableRef, MultiRowMutationState> 
tableMutationMap) {
         long size = 0;
         // iterate over table
-        for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
tableEntry : tableMutationMap.entrySet()) {
+        for (Entry<TableRef, MultiRowMutationState> tableEntry : 
tableMutationMap.entrySet()) {
             // iterate over rows
             for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : 
tableEntry.getValue().entrySet()) {
                 size += calculateRowMutationSize(rowEntry);

Reply via email to