PHOENIX-4381 Calculate the estimatedSize of MutationState incrementally
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ef3bce18 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ef3bce18 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ef3bce18 Branch: refs/heads/system-catalog Commit: ef3bce18fe7373b66136d933cc364001dff2c3f8 Parents: 2053905 Author: Thomas D'Silva <tdsi...@apache.org> Authored: Wed Nov 15 18:54:04 2017 -0800 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Wed Nov 15 20:59:53 2017 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 15 ++++- .../org/apache/phoenix/util/KeyValueUtil.java | 65 +++++++++----------- 2 files changed, 43 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef3bce18/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 1f47a33..0cdb010 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -127,6 +127,7 @@ public class MutationState implements SQLCloseable { private long sizeOffset; private int numRows = 0; + private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); @@ -193,6 +194,7 @@ public class MutationState implements SQLCloseable { this.mutations.put(table, mutations); } this.numRows = mutations.size(); + this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations); throwIfTooBig(); } @@ -354,7 +356,6 @@ public class MutationState implements SQLCloseable { throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build() .buildException(); } - long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations); if (estimatedSize > maxSizeBytes) { resetState(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED) @@ -433,7 +434,12 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; + int oldNumRows = this.numRows; joinMutationState(newMutationState.mutations, this.mutations); + // here we increment the estimated size by the fraction of new rows we added from the newMutationState + if (newMutationState.numRows>0) { + this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize; + } if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); @@ -968,6 +974,8 @@ public class MutationState implements SQLCloseable { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; + long startNumRows = numRows; + long startEstimatedSize = estimatedSize; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1004,8 +1012,8 @@ public class MutationState implements SQLCloseable { for (List<Mutation> mutationBatch : mutationBatchList) { hTable.batch(mutationBatch); batchCount++; + if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName)); } - if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); child.stop(); child.stop(); shouldRetry = false; @@ -1015,6 +1023,8 @@ public class MutationState implements SQLCloseable { if (tableInfo.isDataTable()) { numRows -= numMutations; + // decrement estimated size by the fraction of rows we sent to hbase + estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize; } // Remove batches as we process them mutations.remove(origTableRef); @@ -1180,6 +1190,7 @@ public class MutationState implements SQLCloseable { private void resetState() { numRows = 0; + estimatedSize = 0; this.mutations.clear(); resetTransactionalState(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef3bce18/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index 4234df5..2dfe1b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -192,46 +192,41 @@ public class KeyValueUtil { * @return estimated row size */ public static long - getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { + getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) { long size = 0; - // iterate over tables - for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations - .entrySet()) { - PTable table = tableEntry.getKey().getTable(); - // iterate over rows - for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue() - .entrySet()) { - int rowLength = rowEntry.getKey().getLength(); - Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); - switch (table.getImmutableStorageScheme()) { - case ONE_CELL_PER_COLUMN: - // iterate over columns - for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { - PColumn pColumn = colValueEntry.getKey(); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - pColumn.getFamilyName().getBytes().length, - pColumn.getColumnQualifierBytes().length, - colValueEntry.getValue().length); - } - break; - case SINGLE_CELL_ARRAY_WITH_OFFSETS: - // we store all the column values in a single key value that contains all the - // column values followed by an offset array + PTable table = tableRef.getTable(); + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) { + int rowLength = rowEntry.getKey().getLength(); + Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); + switch (table.getImmutableStorageScheme()) { + case ONE_CELL_PER_COLUMN: + // iterate over columns + for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { + PColumn pColumn = colValueEntry.getKey(); size += - PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, - colValueMap); - break; + KeyValue.getKeyValueDataStructureSize(rowLength, + pColumn.getFamilyName().getBytes().length, + pColumn.getColumnQualifierBytes().length, + colValueEntry.getValue().length); } - // count the empty key value - Pair<byte[], byte[]> emptyKeyValueInfo = - EncodedColumnsUtil.getEmptyKeyValueInfo(table); + break; + case SINGLE_CELL_ARRAY_WITH_OFFSETS: + // we store all the column values in a single key value that contains all the + // column values followed by an offset array size += - KeyValue.getKeyValueDataStructureSize(rowLength, - SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), - emptyKeyValueInfo.getFirst().length, - emptyKeyValueInfo.getSecond().length); + PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, + colValueMap); + break; } + // count the empty key value + Pair<byte[], byte[]> emptyKeyValueInfo = + EncodedColumnsUtil.getEmptyKeyValueInfo(table); + size += + KeyValue.getKeyValueDataStructureSize(rowLength, + SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), + emptyKeyValueInfo.getFirst().length, + emptyKeyValueInfo.getSecond().length); } return size; }