PHOENIX-4601 Perform server-side retries if client version < 4.14

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7afec21
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7afec21
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7afec21

Branch: refs/heads/system-catalog
Commit: d7afec21c500ccd7f5334e9c0958c40fcc95df14
Parents: ccd41de
Author: Vincent Poon <vincentp...@apache.org>
Authored: Thu Apr 19 17:07:56 2018 -0700
Committer: Vincent Poon <vincentp...@apache.org>
Committed: Fri Apr 20 14:22:43 2018 -0700

----------------------------------------------------------------------
 .../hbase/index/write/IndexWriterUtils.java     | 36 ++++++++++++++++----
 .../write/ParallelWriterIndexCommitter.java     | 12 +++++--
 .../TrackingParallelWriterIndexCommitter.java   | 13 ++++---
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  1 +
 4 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7afec21/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 76d6800..0d3004f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -58,15 +59,19 @@ public class IndexWriterUtils {
    public static final String INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY = 
"phoenix.index.writes.threads.max";
    public static final String HTABLE_KEEP_ALIVE_KEY = 
"hbase.htable.threads.keepalivetime";
 
+   @Deprecated
    public static final String INDEX_WRITER_RPC_RETRIES_NUMBER = 
"phoenix.index.writes.rpc.retries.number";
-    /**
-     * Retry server-server index write rpc only once, and let the client retry 
the data write
-     * instead to avoid typing up the handler
-     */
-   // note in HBase 2+, numTries = numRetries + 1
-   // in prior versions, numTries = numRetries
-   public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 1;
+   /**
+    * Based on the logic in HBase's AsyncProcess, a default of 11 retries with 
a pause of 100ms
+    * approximates 48 sec total retry time (factoring in backoffs).  The total 
time should be less
+    * than HBase's rpc timeout (default of 60 sec) or else the client will 
retry before receiving
+    * the response
+    */
+   @Deprecated
+   public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 11;
+   @Deprecated
    public static final String INDEX_WRITER_RPC_PAUSE = 
"phoenix.index.writes.rpc.pause";
+   @Deprecated
    public static final int DEFAULT_INDEX_WRITER_RPC_PAUSE = 100;
 
   private IndexWriterUtils() {
@@ -76,12 +81,29 @@ public class IndexWriterUtils {
     public static HTableFactory 
getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
         // create a simple delegate factory, setup the way we need
         Configuration conf = 
PropertiesUtil.cloneConfig(env.getConfiguration());
+        setHTableThreads(conf);
+        return ServerUtil.getDelegateHTableFactory(env, conf);
+    }
+
+    private static void setHTableThreads(Configuration conf) {
         // set the number of threads allowed per table.
         int htableThreads =
                 
conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
                     IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
         LOG.trace("Creating HTableFactory with " + htableThreads + " threads 
for each HTable.");
         IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, 
htableThreads);
+    }
+
+    /**
+     * Retry server-server index write rpc only once, and let the client retry 
the data write
+     * instead to avoid tying up the handler
+     */
+    public static HTableFactory 
getNoRetriesHTableFactory(CoprocessorEnvironment env) {
+        Configuration conf = 
PropertiesUtil.cloneConfig(env.getConfiguration());
+        setHTableThreads(conf);
+        // note in HBase 2+, numTries = numRetries + 1
+        // in prior versions, numTries = numRetries
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
         return ServerUtil.getDelegateHTableFactory(env, conf);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7afec21/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index aba2678..cd196d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.util.IndexUtil;
 
 import com.google.common.collect.Multimap;
@@ -57,7 +58,8 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
     public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = 
"index.writer.threads.keepalivetime";
     private static final Log LOG = 
LogFactory.getLog(ParallelWriterIndexCommitter.class);
 
-    private HTableFactory factory;
+    private HTableFactory retryingFactory;
+    private HTableFactory noRetriesfactory;
     private Stoppable stopped;
     private QuickFailingTaskRunner pool;
     private KeyValueBuilder kvBuilder;
@@ -80,6 +82,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                                 
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), 
env.getRegionServerServices(), parent, env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+        this.noRetriesfactory = 
IndexWriterUtils.getNoRetriesHTableFactory(env);
     }
 
     /**
@@ -88,7 +91,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
      * Exposed for TESTING
      */
     void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
-        this.factory = factory;
+        this.retryingFactory = factory;
         this.pool = new QuickFailingTaskRunner(pool);
         this.stopped = stop;
     }
@@ -162,6 +165,8 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                                 }
                             }
                         }
+                     // if the client can retry index writes, then we don't 
need to retry here
+                        HTableFactory factory = clientVersion < 
PhoenixDatabaseMetaData.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : 
noRetriesfactory;
                         table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);
@@ -226,7 +231,8 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
     public void stop(String why) {
         LOG.info("Shutting down " + this.getClass().getSimpleName() + " 
because " + why);
         this.pool.stop(why);
-        this.factory.shutdown();
+        this.retryingFactory.shutdown();
+        this.noRetriesfactory.shutdown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7afec21/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 4dbad63..1727fff 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.util.IndexUtil;
 
 import com.google.common.collect.Multimap;
@@ -74,7 +75,8 @@ public class TrackingParallelWriterIndexCommitter implements 
IndexCommitter {
     private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = 
"index.writer.threads.keepalivetime";
 
     private TaskRunner pool;
-    private HTableFactory factory;
+    private HTableFactory retryingFactory;
+    private HTableFactory noRetriesFactory;
     private CapturingAbortable abortable;
     private Stoppable stopped;
     private RegionCoprocessorEnvironment env;
@@ -98,6 +100,7 @@ public class TrackingParallelWriterIndexCommitter implements 
IndexCommitter {
                                 
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), 
env.getRegionServerServices(), parent, env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+        this.noRetriesFactory = 
IndexWriterUtils.getNoRetriesHTableFactory(env);
     }
 
     /**
@@ -108,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
     void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop,
             RegionCoprocessorEnvironment env) {
         this.pool = new WaitForCompletionTaskRunner(pool);
-        this.factory = factory;
+        this.retryingFactory = factory;
         this.abortable = new CapturingAbortable(abortable);
         this.stopped = stop;
         this.env = env;
@@ -175,7 +178,8 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Writing index update:" + mutations + " 
to table: " + tableReference);
                         }
-
+                        // if the client can retry index writes, then we don't 
need to retry here
+                        HTableFactory factory = clientVersion < 
PhoenixDatabaseMetaData.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : 
noRetriesFactory;
                         table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);
@@ -238,7 +242,8 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
     public void stop(String why) {
         LOG.info("Shutting down " + this.getClass().getSimpleName());
         this.pool.stop(why);
-        this.factory.shutdown();
+        this.retryingFactory.shutdown();
+        this.noRetriesFactory.shutdown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7afec21/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index add0628..d56628a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -327,6 +327,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = 
VersionUtil.encodeVersion("4", "8", "0");
     public static final int MIN_PENDING_ACTIVE_INDEX = 
VersionUtil.encodeVersion("4", "12", "0");
     public static final int MIN_PENDING_DISABLE_INDEX = 
VersionUtil.encodeVersion("4", "14", "0");
+    public static final int MIN_CLIENT_RETRY_INDEX_WRITES = 
VersionUtil.encodeVersion("4", "14", "0");
     public static final int MIN_TX_CLIENT_SIDE_MAINTENANCE = 
VersionUtil.encodeVersion("4", "14", "0");
     
     // Version below which we should turn off essential column family.

Reply via email to