PHOENIX-3997 UngroupedAggregateRegionObserver.commitBatchWithHTable() should not check the memstore size and wait for flush.
Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e9498bf4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e9498bf4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e9498bf4 Branch: refs/heads/4.x-HBase-1.1 Commit: e9498bf4704d438969e67557c7d45b3a76c65458 Parents: bd11d86 Author: Geoffrey Jacoby <geoffrey.jac...@careerbuilder.com> Authored: Thu Jul 13 13:34:54 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Mon Jul 24 15:42:20 2017 -0700 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 127 ++++++++++--------- 1 file changed, 67 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9498bf4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a949058..a07b5d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -200,24 +200,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); } - private void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize, - byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException { + private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { if (mutations.isEmpty()) { - return; + return; } - for (Mutation m : mutations) { - if (indexMaintainersPtr != null) { - m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); - } - if (indexUUID != null) { - m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); - } - if (txState != null) { - m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - } - - Mutation[] mutationArray = new Mutation[mutations.size()]; + + Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { @@ -233,34 +221,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } - - private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID, - long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) throws IOException { - if (mutations.isEmpty()) { - return; - } + + private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) { for (Mutation m : mutations) { - if (indexMaintainersPtr != null) { - m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); - } - if (txState != null) { - m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - if (indexUUID != null) { - m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); - } - } - // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the - // flush happen which decrease the memstore size and then writes allowed on the region. - for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { - try { - checkForRegionClosing(); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } + if (indexMaintainersPtr != null) { + m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); + } + if (indexUUID != null) { + m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); + } + if (txState != null) { + m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } } + } + + private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException { + if (mutations.isEmpty()) { + return; + } + logger.debug("Committing batch of " + mutations.size() + " mutations for " + table); try { table.batch(mutations); @@ -404,7 +384,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCF = null; byte[] emptyCF = null; HTable targetHTable = null; - boolean areMutationInSameRegion = true; + boolean isPKChanging = false; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; @@ -412,10 +392,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes()); selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; - areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(), - region.getTableDesc().getTableName().getName()) == 0 - && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); - + isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); } else { byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0; @@ -724,15 +701,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, - areMutationInSameRegion, targetHTable, useIndexProto); + commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, + txState, targetHTable, useIndexProto, isPKChanging); mutations.clear(); } // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState, - useIndexProto); + setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); + commitBatch(region, indexMutations, blockingMemStoreSize); indexMutations.clear(); } aggregators.aggregate(rowAggregators, result); @@ -741,12 +718,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, - areMutationInSameRegion, targetHTable, useIndexProto); + targetHTable, useIndexProto, isPKChanging); mutations.clear(); } if (!indexMutations.isEmpty()) { - commitBatch(region, indexMutations, null, blockingMemStoreSize, indexMaintainersPtr, txState, useIndexProto); + commitBatch(region, indexMutations, blockingMemStoreSize); indexMutations.clear(); } } @@ -802,18 +779,48 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } - private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize, - byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable, boolean useIndexProto) + private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize, + byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto, + boolean isPKChanging) throws IOException { - if (!areMutationsForSameRegion) { - assert hTable != null;// table cannot be null - commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, - txState, useIndexProto); + List<Mutation> localRegionMutations = Lists.newArrayList(); + List<Mutation> remoteRegionMutations = Lists.newArrayList(); + setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); + separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, + isPKChanging); + commitBatch(region, localRegionMutations, blockingMemStoreSize); + commitBatchWithHTable(targetHTable, remoteRegionMutations); + localRegionMutations.clear(); + remoteRegionMutations.clear(); + } + + private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations, + List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, + boolean isPKChanging){ + boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region); + //if we're writing to the same table, but the PK can change, that means that some + //mutations might be in our current region, and others in a different one. + if (areMutationsInSameTable && isPKChanging) { + HRegionInfo regionInfo = region.getRegionInfo(); + for (Mutation mutation : mutations){ + if (regionInfo.containsRow(mutation.getRow())){ + localRegionMutations.add(mutation); + } else { + remoteRegionMutations.add(mutation); + } + } + } else if (areMutationsInSameTable && !isPKChanging) { + localRegionMutations.addAll(mutations); } else { - commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState, useIndexProto); + remoteRegionMutations.addAll(mutations); } } + private boolean areMutationsInSameTable(HTable targetHTable, Region region) { + return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(), + region.getTableDesc().getTableName().getName()) == 0); + } + @Override public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException {