This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f6e96  PHOENIX-5137 check region close before commiting a batch for 
index rebuild
d9f6e96 is described below

commit d9f6e969e66b99604a654a3ba51084b622723a74
Author: Kiran Kumar Maturi <maturi.ki...@gmail.com>
AuthorDate: Tue Feb 26 17:09:41 2019 +0530

    PHOENIX-5137 check region close before commiting a batch for index rebuild
---
 .../UngroupedAggregateRegionObserver.java          | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6b27a88..40b6faa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -392,7 +392,18 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             super.clear();
         }
     }
-    
+
+    private long getBlockingMemstoreSize(Region region, Configuration conf) {
+        long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
+
+        if (flushSize <= 0) {
+            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+                    TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
+        }
+        return flushSize * 
(conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+                        
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+    }
+
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException, SQLException {
         RegionCoprocessorEnvironment env = c.getEnvironment();
@@ -524,12 +535,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         MutationList mutations = new MutationList();
         boolean needToWrite = false;
         Configuration conf = env.getConfiguration();
-        long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
-
-        if (flushSize <= 0) {
-            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-                    TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
-        }
 
         /**
          * Slow down the writes if the memstore size more than
@@ -537,9 +542,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
          * bytes. This avoids flush storm to hdfs for cases like index 
building where reads and
          * write happen to all the table regions in the server.
          */
-        final long blockingMemStoreSize = flushSize * (
-                conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-                        
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
+        final long blockingMemStoreSize = getBlockingMemstoreSize(region, 
conf);
 
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
         if(buildLocalIndex) {
@@ -1101,6 +1104,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
             long maxBatchSizeBytes = 
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            final long blockingMemstoreSize = getBlockingMemstoreSize(region, 
config);
             MutationList mutations = new MutationList(maxBatchSize);
             region.startRegionOperation();
             byte[] uuidValue = ServerCacheClient.generateId();
@@ -1142,7 +1146,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             }
                         }
                         if (ServerUtil.readyToCommit(mutations.size(), 
mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commitBatchWithRetries(region, mutations, -1);
+                            checkForRegionClosing();
+                            commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
                             uuidValue = ServerCacheClient.generateId();
                             mutations.clear();
                         }
@@ -1151,7 +1156,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    commitBatchWithRetries(region, mutations, -1);
+                    checkForRegionClosing();
+                    commitBatchWithRetries(region, mutations, 
blockingMemstoreSize);
                 }
             }
         } catch (IOException e) {

Reply via email to