This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 05e3e2bd197 Revise #24401 (#24528)
05e3e2bd197 is described below
commit 05e3e2bd197e632b91654f05f2f1dd43f0f9904d
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Mar 9 23:28:02 2023 +0800
Revise #24401 (#24528)
---
hbase/pom.xml | 16 ++--
.../hbase/backend/bean/HBaseCluster.java | 4 +-
.../backend/config/YamlHBaseConfiguration.java | 3 +-
.../backend/connector/HBaseAdminCallback.java | 11 ++-
.../connector/HBaseBackgroundExecutorManager.java | 20 ++---
.../backend/connector/HBaseConnectionFactory.java | 22 ++---
.../hbase/backend/connector/HBaseExecutor.java | 76 ++++++++--------
.../backend/connector/HBaseQueryCallback.java | 12 +--
.../connector/HBaseTaskExecutorManager.java | 15 +---
.../backend/connector/HBaseUpdateCallback.java | 9 +-
.../hbase/backend/context/HBaseContext.java | 100 ++++++++++-----------
...aRefresher.java => HBaseMetaDataRefresher.java} | 13 ++-
.../backend/context/HBaseRegionWarmUpContext.java | 35 +++++---
.../backend/exception/HBaseOperationException.java | 3 +-
infra/common/pom.xml | 1 -
pom.xml | 17 ++--
proxy/backend/core/pom.xml | 1 -
.../backend/config/ProxyConfigurationLoader.java | 4 +-
.../backend/config/YamlProxyConfiguration.java | 1 -
.../proxy/initializer/BootstrapInitializer.java | 1 -
20 files changed, 165 insertions(+), 199 deletions(-)
diff --git a/hbase/pom.xml b/hbase/pom.xml
index 1f95da34eb2..ab28c1f8ec0 100644
--- a/hbase/pom.xml
+++ b/hbase/pom.xml
@@ -20,31 +20,27 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere</artifactId>
+ <groupId>org.apache.shardingsphere</groupId>
<version>5.3.2-SNAPSHOT</version>
</parent>
<artifactId>shardingsphere-hbase</artifactId>
<dependencies>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-shaded-client</artifactId>
- <scope>compile</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-util</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-executor</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-shaded-client</artifactId>
</dependency>
</dependencies>
-
</project>
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/bean/HBaseCluster.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/bean/HBaseCluster.java
index 8557fef4bfa..2ca29907689 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/bean/HBaseCluster.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/bean/HBaseCluster.java
@@ -21,6 +21,9 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.hadoop.hbase.client.Connection;
+/**
+ * HBase cluster.
+ */
@RequiredArgsConstructor
@Getter
public final class HBaseCluster {
@@ -28,5 +31,4 @@ public final class HBaseCluster {
private final String clusterName;
private final Connection connection;
-
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/config/YamlHBaseConfiguration.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/config/YamlHBaseConfiguration.java
index 824790d53dd..de4035d6974 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/config/YamlHBaseConfiguration.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/config/YamlHBaseConfiguration.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.Properties;
/**
- * Rule configuration for YAML.
+ * YAML rule configuration for HBase.
*/
@Getter
@Setter
@@ -38,5 +38,4 @@ public final class YamlHBaseConfiguration implements
YamlConfiguration {
private Map<String, YamlHBaseParameter> dataSources = new HashMap<>();
private Properties props;
-
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseAdminCallback.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseAdminCallback.java
index 43006792465..812d3bdfa7a 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseAdminCallback.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseAdminCallback.java
@@ -23,17 +23,16 @@ import java.io.IOException;
/**
* Call back for HBase operation.
*
- * @param <T> return type
+ * @param <T> type of result
*/
public interface HBaseAdminCallback<T> {
/**
- * Do operation in HBase.
- *
- * @param admin execute in HBase Table
- * @return result
- * @throws IOException exception
+ * Execute in HBase.
*
+ * @param admin execute in HBase table
+ * @return execute result
+ * @throws IOException IO exception
*/
T executeInHBase(Admin admin) throws IOException;
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseBackgroundExecutorManager.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseBackgroundExecutorManager.java
index 383624ae1c1..c051ec3b1b8 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseBackgroundExecutorManager.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseBackgroundExecutorManager.java
@@ -17,35 +17,29 @@
package org.apache.shardingsphere.hbase.backend.connector;
-import lombok.Getter;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
- * Background executor.
+ * HBase background executor manager.
*/
-@Getter
public final class HBaseBackgroundExecutorManager implements Closeable {
private final ScheduledExecutorService executorService;
public HBaseBackgroundExecutorManager() {
- executorService = getExecutorService();
- }
-
- private ScheduledExecutorService getExecutorService() {
- ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("background");
- return Executors.newScheduledThreadPool(1, threadFactory);
+ executorService = Executors.newScheduledThreadPool(1,
ExecutorThreadFactoryBuilder.build("background"));
}
/**
- * Submit background task.
- * @param runnable background task
- * @param interval Running interval
+ * Submit task.
+ *
+ * @param runnable task
+ * @param interval running interval
*/
public void submit(final Runnable runnable, final int interval) {
executorService.scheduleWithFixedDelay(runnable, interval, interval,
TimeUnit.SECONDS);
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseConnectionFactory.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseConnectionFactory.java
index 9801b6d13c3..d01293f02f5 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseConnectionFactory.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseConnectionFactory.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.hbase.backend.connector;
+import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.shardingsphere.hbase.backend.config.YamlHBaseConfiguration;
import org.apache.shardingsphere.hbase.backend.config.YamlHBaseParameter;
import
org.apache.shardingsphere.hbase.backend.exception.HBaseOperationException;
+
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -39,7 +40,8 @@ public final class HBaseConnectionFactory {
/**
* Create HBase connection.
- * @param yamlProxyHBaseConfiguration HBase config
+ *
+ * @param yamlProxyHBaseConfiguration HBase configuration
* @return A connection for per HBase cluster
*/
public static Map<String, Connection> createHBaseConnections(final
YamlHBaseConfiguration yamlProxyHBaseConfiguration) {
@@ -53,20 +55,14 @@ public final class HBaseConnectionFactory {
private static Connection createConnection(final YamlHBaseParameter
parameter) {
Configuration config = createConfiguration(parameter);
try {
- if (StringUtils.isEmpty(parameter.getAccessUser())) {
- return ConnectionFactory.createConnection(config);
- } else {
- return ConnectionFactory.createConnection(config,
createUser(parameter.getAccessUser()));
- }
- } catch (IOException e) {
- throw new HBaseOperationException(e.getMessage());
+ return Strings.isNullOrEmpty(parameter.getAccessUser())
+ ? ConnectionFactory.createConnection(config)
+ : ConnectionFactory.createConnection(config,
User.create(UserGroupInformation.createRemoteUser(parameter.getAccessUser())));
+ } catch (final IOException ex) {
+ throw new HBaseOperationException(ex.getMessage());
}
}
- private static User createUser(final String accessUser) {
- return User.create(UserGroupInformation.createRemoteUser(accessUser));
- }
-
private static Configuration createConfiguration(final YamlHBaseParameter
parameter) {
Configuration result = HBaseConfiguration.create();
result.set("fs.defaultFS", parameter.getFsDefaultFs());
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseExecutor.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseExecutor.java
index 5db33adc4f2..326351aac92 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseExecutor.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseExecutor.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.hbase.backend.connector;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -27,16 +29,18 @@ import
org.apache.shardingsphere.hbase.backend.exception.HBaseOperationException
import java.io.IOException;
/**
- * Execute HBase operation.
+ * HBase executor.
+ *
+ * <p>Do not cache table here.</p>
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class HBaseExecutor {
/**
- * Do operation in HBase, wrapper HBase Exception.
- * <p>If we need cache Table, do that in here.</p>
+ * Execute update.
*
- * @param tableName tableName
+ * @param tableName table name
* @param operation operation
*/
public static void executeUpdate(final String tableName, final
HBaseUpdateCallback operation) {
@@ -44,62 +48,56 @@ public final class HBaseExecutor {
try (Table table =
HBaseContext.getInstance().getConnection(tableName).getTable(backendTableName))
{
try {
operation.executeInHBase(table);
- } catch (IOException e) {
- log.info(String.format("query hbase table: %s, execute hbase
fail", tableName));
- log.error(e.toString());
- throw new HBaseOperationException(e.getMessage());
+ } catch (final IOException ex) {
+ log.info(String.format("Query HBase table: %s, execute HBase
fail.", tableName));
+ log.error(ex.toString());
+ throw new HBaseOperationException(ex.getMessage());
}
- } catch (IOException e) {
- log.info(String.format("query hbase table: %s, execute hbase
fail", tableName));
- log.error(e.toString());
- throw new HBaseOperationException(e.getMessage());
+ } catch (final IOException ex) {
+ log.info(String.format("Query HBase table: %s, execute HBase
fail.", tableName));
+ log.error(ex.toString());
+ throw new HBaseOperationException(ex.getMessage());
}
}
/**
- * Do operation in HBase, wrapper HBase Exception.
- * <p>If we need cache Table, do that in here.</p>
- *
- * @param tableName tableName
+ * Execute query.
+ *
+ * @param tableName table name
* @param operation operation
- * @param <R> Result Type
- * @return result
+ * @param <T> type of result
+ * @return query result
*/
- public static <R> R executeQuery(final String tableName, final
HBaseQueryCallback<R> operation) {
+ public static <T> T executeQuery(final String tableName, final
HBaseQueryCallback<T> operation) {
TableName backendTableName = TableName.valueOf(tableName);
try (Table table =
HBaseContext.getInstance().getConnection(tableName).getTable(backendTableName))
{
- R result;
try {
- result = operation.executeInHBase(table);
- } catch (IOException e) {
- throw new HBaseOperationException(e.getMessage());
+ return operation.executeInHBase(table);
+ } catch (final IOException ex) {
+ throw new HBaseOperationException(ex.getMessage());
}
- return result;
- } catch (IOException e) {
- throw new HBaseOperationException(e.getMessage());
+ } catch (final IOException ex) {
+ throw new HBaseOperationException(ex.getMessage());
}
}
/**
- * Do operation in HBase, wrapper HBase Exception.
- * <p>If we need cache Table, do that in here.</p>
- *
+ * Execute admin.
+ *
* @param connection HBase connection
* @param operation operation
- * @param <R> Result Type
- * @return result.
+ * @param <T> type of result
+ * @return admin result
*/
- public static <R> R executeAdmin(final Connection connection, final
HBaseAdminCallback<R> operation) {
+ public static <T> T executeAdmin(final Connection connection, final
HBaseAdminCallback<T> operation) {
try (Admin admin = connection.getAdmin()) {
- R result;
try {
- result = operation.executeInHBase(admin);
- } catch (IOException e) {
- throw new HBaseOperationException(e.getMessage());
+ return operation.executeInHBase(admin);
+ } catch (final IOException ex) {
+ throw new HBaseOperationException(ex.getMessage());
}
- return result;
- } catch (IOException e) {
- throw new HBaseOperationException(e.getMessage());
+ } catch (final IOException ex) {
+ throw new HBaseOperationException(ex.getMessage());
}
}
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseQueryCallback.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseQueryCallback.java
index c366fa7422f..dfe14c81b53 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseQueryCallback.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseQueryCallback.java
@@ -21,18 +21,18 @@ import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
/**
- * Call back for HBase operation.
+ * HBase query callback.
*
- * @param <T> return type.
+ * @param <T> type of result
*/
public interface HBaseQueryCallback<T> {
/**
- * Do operation in HBase.
+ * Execute in HBase.
*
- * @param table execute in HBase Table
- * @return result
- * @throws IOException exception
+ * @param table table
+ * @return execute result
+ * @throws IOException IO exception
*
*/
T executeInHBase(Table table) throws IOException;
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseTaskExecutorManager.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseTaskExecutorManager.java
index a99b2ae5ec9..bf2e667e9af 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseTaskExecutorManager.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseTaskExecutorManager.java
@@ -34,28 +34,19 @@ public final class HBaseTaskExecutorManager implements
Closeable {
* @param poolSize pool size
*/
public HBaseTaskExecutorManager(final int poolSize) {
- executorService = getExecutorService(poolSize);
- }
-
- private ThreadPoolExecutor getExecutorService(final int poolSize) {
- return new ThreadPoolExecutor(
- poolSize, poolSize, 10,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(20000),
- new ThreadPoolExecutor.CallerRunsPolicy());
+ executorService = new ThreadPoolExecutor(poolSize, poolSize, 10L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(20000), new
ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* Submit task.
+ *
* @param runnable task
*/
public void submit(final Runnable runnable) {
executorService.submit(runnable);
}
- /**
- * Close executor.
- */
+ @Override
public void close() {
executorService.shutdown();
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseUpdateCallback.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseUpdateCallback.java
index 99d0c33eb55..7ef1afe966b 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseUpdateCallback.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/connector/HBaseUpdateCallback.java
@@ -21,16 +21,15 @@ import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;
/**
- * Call back for HBase operation.
+ * HBase update callback.
*/
public interface HBaseUpdateCallback {
/**
- * Do operation in HBase.
+ * Execute in HBase.
*
- * @param table execute in HBase Table
- * @throws IOException exception
+ * @param table table
+ * @throws IOException IO exception
*/
void executeInHBase(Table table) throws IOException;
-
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseContext.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseContext.java
index d4530738c8f..a665587ffbb 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseContext.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseContext.java
@@ -31,18 +31,20 @@ import
org.apache.shardingsphere.hbase.backend.connector.HBaseExecutor;
import
org.apache.shardingsphere.hbase.backend.exception.HBaseOperationException;
import org.apache.shardingsphere.hbase.backend.props.HBaseProperties;
import org.apache.shardingsphere.hbase.backend.props.HBasePropertyKey;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
- * HBase context, parser config and create connection.
+ * HBase context.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@@ -64,7 +66,7 @@ public final class HBaseContext implements AutoCloseable {
private boolean isSyncWarmUp;
- private Map<String, HBaseCluster> tableConnectionMap = new
ConcurrentHashMap<>();
+ private final Map<String, HBaseCluster> tableConnectionMap = new
ConcurrentHashMap<>();
/**
* Get instance of HBase context.
@@ -76,79 +78,72 @@ public final class HBaseContext implements AutoCloseable {
}
/**
- * Init hbase context.
+ * Initialize HBase context.
+ *
* @param connections A connection for per HBase cluster
*/
public void init(final Map<String, Connection> connections) {
this.connections = new ArrayList<>(connections.size());
- this.warmUpContext = HBaseRegionWarmUpContext.getInstance();
- this.warmUpContext.init(getWarmUpThreadSize());
- this.isSyncWarmUp = isSyncWarm();
- for (Map.Entry<String, Connection> entry : connections.entrySet()) {
+ warmUpContext = HBaseRegionWarmUpContext.getInstance();
+ warmUpContext.init(getWarmUpThreadSize());
+ isSyncWarmUp =
HBaseContext.getInstance().getProps().<Boolean>getValue(HBasePropertyKey.IS_SYNC_WARM_UP);
+ for (Entry<String, Connection> entry : connections.entrySet()) {
HBaseCluster cluster = new HBaseCluster(entry.getKey(),
entry.getValue());
- loadTablesFromHBase(cluster);
+ loadTables(cluster);
this.connections.add(cluster);
}
- log.info("{} tables loaded from {} clusters",
tableConnectionMap.size(), connections.size());
- }
-
- private boolean isSyncWarm() {
- return
HBaseContext.getInstance().getProps().<Boolean>getValue(HBasePropertyKey.IS_SYNC_WARM_UP);
+ log.info("{} tables loaded from {} clusters.",
tableConnectionMap.size(), connections.size());
}
/**
- * Get warmUpThreadSize.
- * @return warmUpThreadSize, 0 < warmUpThreadSize <= 30
+ * Get warm up thread size.
+ *
+ * @return warm up thread size, value should be in (0, 30]
*/
private int getWarmUpThreadSize() {
int warmUpThreadSize =
HBaseContext.getInstance().getProps().<Integer>getValue(HBasePropertyKey.WARM_UP_THREAD_NUM);
- if (warmUpThreadSize < 0) {
- return 1;
- }
-
- return Math.min(warmUpThreadSize, 30);
+ return warmUpThreadSize < 0 ? 1 : Math.min(warmUpThreadSize, 30);
}
/**
- * Load tables from HBase database.
- * @param hbaseCluster hbase cluster object
+ * Load tables.
+ *
+ * @param hbaseCluster HBase cluster
*/
- public synchronized void loadTablesFromHBase(final HBaseCluster
hbaseCluster) {
+ public synchronized void loadTables(final HBaseCluster hbaseCluster) {
+ warmUpContext.initStatisticsInfo(System.currentTimeMillis());
HTableDescriptor[] hTableDescriptor =
HBaseExecutor.executeAdmin(hbaseCluster.getConnection(), Admin::listTables);
- List<String> tableNames =
Arrays.stream(hTableDescriptor).map(HTableDescriptor::getNameAsString).collect(Collectors.toList());
- this.warmUpContext.initStatisticsInfo(System.currentTimeMillis());
- for (String tableName : tableNames) {
- if (!tableConnectionMap.containsKey(tableName)) {
- this.warmUpContext.addNeedWarmCount();
- log.info("Load table `{}` from cluster `{}`", tableName,
hbaseCluster.getClusterName());
- tableConnectionMap.put(tableName, hbaseCluster);
- this.warmUpContext.submitWarmUpTask(tableName, hbaseCluster);
+ for (String each :
Arrays.stream(hTableDescriptor).map(HTableDescriptor::getNameAsString).collect(Collectors.toList()))
{
+ if (tableConnectionMap.containsKey(each)) {
+ continue;
}
+ warmUpContext.addNeedWarmCount();
+ log.info("Load table `{}` from cluster `{}`.", each,
hbaseCluster.getClusterName());
+ tableConnectionMap.put(each, hbaseCluster);
+ warmUpContext.submitWarmUpTask(each, hbaseCluster);
}
if (isSyncWarmUp) {
-
this.warmUpContext.syncExecuteWarmUp(hbaseCluster.getClusterName());
- this.warmUpContext.clear();
+ warmUpContext.syncExecuteWarmUp(hbaseCluster.getClusterName());
+ warmUpContext.clear();
}
}
/**
- * get connection via table name.
- * @param tableName table name.
- *
- * @return HBase Connection.
+ * Get connection via table name.
+ *
+ * @param tableName table name
+ * @return HBase connection
*/
public Connection getConnection(final String tableName) {
- if (tableConnectionMap.containsKey(tableName)) {
- return tableConnectionMap.get(tableName).getConnection();
- } else {
- throw new HBaseOperationException(String.format("Table `%s` is not
exists", tableName));
- }
+
ShardingSpherePreconditions.checkState(tableConnectionMap.containsKey(tableName),
() -> new HBaseOperationException(String.format("Table `%s` is not exists",
tableName)));
+ return tableConnectionMap.get(tableName).getConnection();
}
/**
* Is table exists.
- * @param tableName tableName
- * @return result
+ *
+ * @param tableName table name
+ * @return table exists or not
*/
public boolean isTableExists(final String tableName) {
return tableConnectionMap.containsKey(tableName);
@@ -156,16 +151,14 @@ public final class HBaseContext implements AutoCloseable {
/**
* Get connection via cluster name.
+ *
* @param clusterName cluster name
- * @return HBase Connection
+ * @return HBase connection
*/
public Connection getConnectionByClusterName(final String clusterName) {
Optional<HBaseCluster> cluster = connections.stream().filter(each ->
each.getClusterName().equalsIgnoreCase(clusterName)).findFirst();
- if (cluster.isPresent()) {
- return cluster.get().getConnection();
- } else {
- throw new HBaseOperationException(String.format("Cluster `%s` is
not exists", clusterName));
- }
+ ShardingSpherePreconditions.checkState(cluster.isPresent(), () -> new
HBaseOperationException(String.format("Cluster `%s` is not exists",
clusterName)));
+ return cluster.get().getConnection();
}
@Override
@@ -176,8 +169,9 @@ public final class HBaseContext implements AutoCloseable {
for (Connection connection :
connections.stream().map(HBaseCluster::getConnection).collect(Collectors.toList()))
{
try {
connection.close();
- } catch (IOException e) {
- e.printStackTrace();
+ } catch (final IOException ex) {
+ // TODO define new exception, do not use RuntimeException
+ throw new RuntimeException(ex);
}
}
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaRefresher.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaDataRefresher.java
similarity index 83%
rename from
hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaRefresher.java
rename to
hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaDataRefresher.java
index b340a4883cb..625e12e7f32 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaRefresher.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseMetaDataRefresher.java
@@ -17,25 +17,24 @@
package org.apache.shardingsphere.hbase.backend.context;
-import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.hbase.backend.exception.HBaseOperationException;
/**
- * background thread to refresh metadata.
+ * HBase meta data refresher.
*/
-@AllArgsConstructor
+@RequiredArgsConstructor
@Slf4j
-public class HBaseMetaRefresher implements Runnable {
+public class HBaseMetaDataRefresher implements Runnable {
private final HBaseContext context;
@Override
public void run() {
try {
- context.getConnections().forEach(context::loadTablesFromHBase);
- } catch (HBaseOperationException e) {
- // ignored
+ context.getConnections().forEach(context::loadTables);
+ } catch (final HBaseOperationException ignored) {
}
}
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseRegionWarmUpContext.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseRegionWarmUpContext.java
index 740fd877e6d..e50b9f931f6 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseRegionWarmUpContext.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/context/HBaseRegionWarmUpContext.java
@@ -54,7 +54,8 @@ public final class HBaseRegionWarmUpContext {
/**
* Init.
- * @param poolSize mul execute size
+ *
+ * @param poolSize execute pool size
*/
public void init(final int poolSize) {
executorManager = new HBaseTaskExecutorManager(poolSize);
@@ -62,8 +63,9 @@ public final class HBaseRegionWarmUpContext {
/**
* Submit region warm up task.
- * @param tableName tableName
- * @param hbaseCluster hbaseCluster
+ *
+ * @param tableName table name
+ * @param hbaseCluster HBase cluster
*/
public void submitWarmUpTask(final String tableName, final HBaseCluster
hbaseCluster) {
executorManager.submit(() -> loadRegionInfo(tableName, hbaseCluster));
@@ -72,15 +74,20 @@ public final class HBaseRegionWarmUpContext {
private void loadRegionInfo(final String tableName, final HBaseCluster
hbaseCluster) {
try {
RegionLocator regionLocator =
hbaseCluster.getConnection().getRegionLocator(TableName.valueOf(tableName));
- regionLocator.getAllRegionLocations();
+ warmUpRegion(regionLocator);
HBaseRegionWarmUpContext.getInstance().addExecuteCount();
- } catch (IOException e) {
- log.error(String.format("table: %s warm up error, getRegionLocator
execute error reason is %s", tableName, e));
+ } catch (final IOException ex) {
+ log.error(String.format("Table: `%s` load region info error,
reason is %s", tableName, ex));
}
}
+ private static void warmUpRegion(final RegionLocator regionLocator) throws
IOException {
+ regionLocator.getAllRegionLocations();
+ }
+
/**
* Init statistics info.
+ *
* @param startWarmUpTime start warm up time
*/
public void initStatisticsInfo(final long startWarmUpTime) {
@@ -91,26 +98,26 @@ public final class HBaseRegionWarmUpContext {
* Execute count add one.
*/
public void addExecuteCount() {
- this.executeCount.incrementAndGet();
+ executeCount.incrementAndGet();
}
/**
* All need warm up table add one.
*/
public void addNeedWarmCount() {
- this.tableCount.incrementAndGet();
+ tableCount.incrementAndGet();
}
/**
* Sync execute.
+ *
* @param clusterName clusterName
*/
public void syncExecuteWarmUp(final String clusterName) {
- while (this.executeCount.get() < tableCount.get()) {
+ while (executeCount.get() < tableCount.get()) {
try {
- Thread.sleep(100);
- } catch (InterruptedException ignore) {
-
+ Thread.sleep(100L);
+ } catch (final InterruptedException ignore) {
}
}
log.info(String.format("%s cluster end warm up, execute time: %dms,
warm table: %d", clusterName, System.currentTimeMillis() - startWarmUpTime,
executeCount.get()));
@@ -120,8 +127,8 @@ public final class HBaseRegionWarmUpContext {
* Clear statistics info.
*/
public void clear() {
- this.tableCount.set(0);
- this.executeCount.set(0);
+ tableCount.set(0);
+ executeCount.set(0);
}
}
diff --git
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/exception/HBaseOperationException.java
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/exception/HBaseOperationException.java
index cbfaf70d6a7..d4956edb1e5 100644
---
a/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/exception/HBaseOperationException.java
+++
b/hbase/src/main/java/org/apache/shardingsphere/hbase/backend/exception/HBaseOperationException.java
@@ -21,10 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * HBase Operation exception.
+ * HBase operation exception.
*/
@RequiredArgsConstructor
@Getter
+// TODO should extend to ShardingSphereExternalException
public final class HBaseOperationException extends RuntimeException {
private static final long serialVersionUID = -2361593557266150170L;
diff --git a/infra/common/pom.xml b/infra/common/pom.xml
index cbf61609f12..c2cee816ea1 100644
--- a/infra/common/pom.xml
+++ b/infra/common/pom.xml
@@ -66,7 +66,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
-
</dependencies>
<build>
diff --git a/pom.xml b/pom.xml
index 63a202d5875..879eb9b0111 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,7 @@
<module>jdbc</module>
<module>proxy</module>
+ <module>hbase</module>
<module>features</module>
<module>agent</module>
@@ -49,7 +50,6 @@
<module>test</module>
<module>distribution</module>
- <module>hbase</module>
</modules>
<properties>
@@ -101,6 +101,7 @@
<mariadb-java-client.version>2.4.2</mariadb-java-client.version>
<h2.version>2.1.214</h2.version>
<mssql.version>6.1.7.jre8-preview</mssql.version>
+ <hbase.client.version>1.7.1</hbase.client.version>
<hikari-cp.version>4.0.3</hikari-cp.version>
<commons-dbcp2.version>2.9.0</commons-dbcp2.version>
@@ -113,8 +114,6 @@
<protobuf-java.version>3.21.12</protobuf-java.version>
<awaitility.version>4.2.0</awaitility.version>
- <hbase.client.version>1.7.1</hbase.client.version>
-
<!-- Plugin versions -->
<apache-rat-plugin.version>0.15</apache-rat-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
@@ -498,6 +497,11 @@
<version>${opengauss.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-shaded-client</artifactId>
+ <version>${hbase.client.version}</version>
+ </dependency>
<dependency>
<groupId>com.zaxxer</groupId>
@@ -618,12 +622,6 @@
<version>${apollo-client.version}</version>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-shaded-client</artifactId>
- <version>${hbase.client.version}</version>
- </dependency>
</dependencies>
</dependencyManagement>
@@ -687,7 +685,6 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
-
</dependencies>
<build>
diff --git a/proxy/backend/core/pom.xml b/proxy/backend/core/pom.xml
index 10f2804eb3f..a579b464f8d 100644
--- a/proxy/backend/core/pom.xml
+++ b/proxy/backend/core/pom.xml
@@ -228,6 +228,5 @@
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
</dependency>
-
</dependencies>
</project>
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
index 6aeae9a77ee..357e137245b 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
@@ -66,8 +66,7 @@ public final class ProxyConfigurationLoader {
File configPath = getResourceFile(path);
Collection<YamlProxyDatabaseConfiguration> databaseConfigs =
loadDatabaseConfigurations(configPath);
return new YamlProxyConfiguration(serverConfig,
databaseConfigs.stream().collect(Collectors.toMap(
- YamlProxyDatabaseConfiguration::getDatabaseName, each -> each,
(oldValue, currentValue) -> oldValue,
- LinkedHashMap::new)));
+ YamlProxyDatabaseConfiguration::getDatabaseName, each -> each,
(oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
}
@SneakyThrows(URISyntaxException.class)
@@ -151,5 +150,4 @@ public final class ProxyConfigurationLoader {
private static File[] findRuleConfigurationFiles(final File path) {
return path.listFiles(each ->
SCHEMA_CONFIG_FILE_PATTERN.matcher(each.getName()).matches());
}
-
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/YamlProxyConfiguration.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/YamlProxyConfiguration.java
index 692aa4eee2a..8ab671b2b76 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/YamlProxyConfiguration.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/config/YamlProxyConfiguration.java
@@ -34,5 +34,4 @@ public final class YamlProxyConfiguration {
private final YamlProxyServerConfiguration serverConfiguration;
private final Map<String, YamlProxyDatabaseConfiguration>
databaseConfigurations;
-
}
diff --git
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index d272da70732..0457026e9c5 100644
---
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -86,5 +86,4 @@ public final class BootstrapInitializer {
}
}
}
-
}