PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra 
client embedded in the Phoenix connection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3592782a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3592782a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3592782a

Branch: refs/heads/4.9-HBase-0.98
Commit: 3592782ae2c337b696a879bc2f6610d280729dce
Parents: 4340dad
Author: Andrew Purtell <apurt...@apache.org>
Authored: Wed Jan 4 16:48:44 2017 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Sat Jan 7 18:54:14 2017 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 37 ++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3592782a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index fc0925d..0a1333f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -255,6 +255,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final boolean returnSequenceValues ;
 
     private HConnection connection;
+    private ZKClientService txZKClientService;
     private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
@@ -371,15 +372,16 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
         int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
         // Create instance of the tephra zookeeper client
-        ZKClientService tephraZKClientService = new 
TephraZKClientService(zkQuorumServersString, timeOut, null, 
ArrayListMultimap.<String, byte[]>create());
-
-        ZKClientService zkClientService = ZKClientServices.delegate(
-                ZKClients.reWatchOnExpire(
-                        ZKClients.retryOnFailure(tephraZKClientService, 
RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                        )
+        txZKClientService = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, 
null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS))
+                     )
                 );
-        zkClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(txZKClientService);
         PooledClientProvider pooledClientProvider = new PooledClientProvider(
                 config, zkDiscoveryService);
         this.txServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
@@ -390,11 +392,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             boolean transactionsEnabled = props.getBoolean(
                     QueryServices.TRANSACTIONS_ENABLED,
                     QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
-            // only initialize the tx service client if needed
+            this.connection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            // only initialize the tx service client if needed and if we 
succeeded in getting a connection
+            // to HBase
             if (transactionsEnabled) {
                 initTxServiceClient();
             }
-            this.connection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
         } catch (IOException e) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
             .setRootCause(e).build().buildException();
@@ -464,14 +467,20 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
-                    if (renewLeaseExecutor != null) {
-                        renewLeaseExecutor.shutdownNow();
-                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
                     }
-                    if (connection != null) connection.close();
+                    try {
+                        // close the HBase connection
+                        if (connection != null) connection.close();
+                    } finally {
+                        if (renewLeaseExecutor != null) {
+                            renewLeaseExecutor.shutdownNow();
+                        }
+                        // shut down the tx client service if we created one 
to support transactions
+                        if (this.txZKClientService != null) 
this.txZKClientService.stopAndWait();
+                    }
                 } catch (IOException e) {
                     if (sqlE == null) {
                         sqlE = ServerUtil.parseServerException(e);

Reply via email to