[phoenix] 25/34: PHOENIX-5137 check region close before commiting a batch for index rebuild
This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git commit 91988be2055423b5623576bbd1fdab4ea9e75d86 Author: Kiran Kumar Maturi AuthorDate: Fri Feb 22 09:45:13 2019 +0530 PHOENIX-5137 check region close before commiting a batch for index rebuild --- .../UngroupedAggregateRegionObserver.java | 30 +- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 703ff97..2eb15a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -260,7 +260,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return; } -Mutation[] mutationArray = new Mutation[mutations.size()]; + Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { @@ -371,6 +371,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver super.clear(); } } + + private long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDesc().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1); + } @Override protected RegionScanner doPostScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws IOException, SQLException { @@ -487,12 +498,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); -long flushSize = region.getTableDesc().getMemStoreFlushSize(); - -if (flushSize <= 0) { -flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, -HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); -} /** * Slow down the writes if the memstore size more than @@ -500,9 +505,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * bytes. This avoids flush storm to hdfs for cases like index building where reads and * write happen to all the table regions in the server. */ -final long blockingMemStoreSize = flushSize * ( -conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; +final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if(buildLocalIndex) { @@ -1043,6 +1046,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); +final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); @@ -1084,7 +1088,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { -commitBatchWithRetries(region, mutations, -1); +checkForRegionClosingOrSplitting(); +commitBatchWithRetries(region, mutations, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1093,
[phoenix] 25/34: PHOENIX-5137 check region close before commiting a batch for index rebuild
This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git commit da6248a78c1861613cb43eb90c988eaf9457d36e Author: Kiran Kumar Maturi AuthorDate: Fri Feb 22 09:45:13 2019 +0530 PHOENIX-5137 check region close before commiting a batch for index rebuild --- .../UngroupedAggregateRegionObserver.java | 30 +- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 703ff97..2eb15a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -260,7 +260,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return; } -Mutation[] mutationArray = new Mutation[mutations.size()]; + Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { @@ -371,6 +371,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver super.clear(); } } + + private long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDesc().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1); + } @Override protected RegionScanner doPostScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws IOException, SQLException { @@ -487,12 +498,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); -long flushSize = region.getTableDesc().getMemStoreFlushSize(); - -if (flushSize <= 0) { -flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, -HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); -} /** * Slow down the writes if the memstore size more than @@ -500,9 +505,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * bytes. This avoids flush storm to hdfs for cases like index building where reads and * write happen to all the table regions in the server. */ -final long blockingMemStoreSize = flushSize * ( -conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; +final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if(buildLocalIndex) { @@ -1043,6 +1046,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); +final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); @@ -1084,7 +1088,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { -commitBatchWithRetries(region, mutations, -1); +checkForRegionClosingOrSplitting(); +commitBatchWithRetries(region, mutations, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1093,
[phoenix] 25/34: PHOENIX-5137 check region close before commiting a batch for index rebuild
This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git commit 1916b3d76cdc9a190680476828442da1dd9b91cc Author: Kiran Kumar Maturi AuthorDate: Fri Feb 22 09:45:13 2019 +0530 PHOENIX-5137 check region close before commiting a batch for index rebuild --- .../UngroupedAggregateRegionObserver.java | 30 +- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 703ff97..2eb15a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -260,7 +260,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return; } -Mutation[] mutationArray = new Mutation[mutations.size()]; + Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { @@ -371,6 +371,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver super.clear(); } } + + private long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDesc().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1); + } @Override protected RegionScanner doPostScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws IOException, SQLException { @@ -487,12 +498,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); -long flushSize = region.getTableDesc().getMemStoreFlushSize(); - -if (flushSize <= 0) { -flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, -HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); -} /** * Slow down the writes if the memstore size more than @@ -500,9 +505,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * bytes. This avoids flush storm to hdfs for cases like index building where reads and * write happen to all the table regions in the server. */ -final long blockingMemStoreSize = flushSize * ( -conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; +final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if(buildLocalIndex) { @@ -1043,6 +1046,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); +final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); @@ -1084,7 +1088,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { -commitBatchWithRetries(region, mutations, -1); +checkForRegionClosingOrSplitting(); +commitBatchWithRetries(region, mutations, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1093,