This is an automated email from the ASF dual-hosted git repository. pboado pushed a commit to branch 5.x-cdh6 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 0a5aec82e1c978306ea2cd13e76caa4ed2e00733 Author: Kiran Kumar Maturi <maturi.ki...@gmail.com> AuthorDate: Tue Feb 26 11:39:41 2019 +0000 PHOENIX-5137 check region close before commiting a batch for index rebuild --- .../UngroupedAggregateRegionObserver.java | 30 +++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 6b27a88..40b6faa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -392,7 +392,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver super.clear(); } } - + + private long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; + } + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { RegionCoprocessorEnvironment env = c.getEnvironment(); @@ -524,12 +535,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); - long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); - - if (flushSize <= 0) { - flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, - TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); - } /** * Slow down the writes if the memstore size more than @@ -537,9 +542,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * bytes. This avoids flush storm to hdfs for cases like index building where reads and * write happen to all the table regions in the server. */ - final long blockingMemStoreSize = flushSize * ( - conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; + final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf); boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if(buildLocalIndex) { @@ -1101,6 +1104,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); @@ -1142,7 +1146,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - commitBatchWithRetries(region, mutations, -1); + checkForRegionClosing(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1151,7 +1156,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { - commitBatchWithRetries(region, mutations, -1); + checkForRegionClosing(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); } } } catch (IOException e) {