This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3f2e325edee Fix the error that the process does not exit after proxy
startup fail (#32339)
3f2e325edee is described below
commit 3f2e325edee8eb26af0407c306daf2a0e8b3899b
Author: jiangML <[email protected]>
AuthorDate: Wed Jul 31 10:26:59 2024 +0800
Fix the error that the process does not exit after proxy startup fail
(#32339)
---
...ticsCollectContextManagerLifecycleListener.java | 7 +++-
.../collect/StatisticsCollectJobWorker.java | 48 +++++++++++++---------
.../dispatch/ListenerAssistedSubscriber.java | 5 ++-
.../ResourceMetaDataChangedSubscriber.java | 5 ++-
.../org/apache/shardingsphere/proxy/Bootstrap.java | 11 +++++
5 files changed, 53 insertions(+), 23 deletions(-)
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
index 058118639c6..271996343e9 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
@@ -29,7 +29,12 @@ public final class
StatisticsCollectContextManagerLifecycleListener implements C
@Override
public void onInitialized(final ContextManager contextManager) {
if (contextManager.getComputeNodeInstanceContext().isCluster() &&
InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType())
{
- StatisticsCollectJobWorker.initialize(contextManager);
+ new StatisticsCollectJobWorker(contextManager).initialize();
}
}
+
+ @Override
+ public void onDestroyed(final ContextManager contextManager) {
+ new StatisticsCollectJobWorker(contextManager).destroy();
+ }
}
diff --git
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index 0c8448d4282..0fce7189905 100644
---
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -17,8 +17,7 @@
package org.apache.shardingsphere.schedule.core.job.statistics.collect;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
@@ -30,13 +29,14 @@ import
org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Statistics collect job worker.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
@Slf4j
public final class StatisticsCollectJobWorker {
@@ -46,28 +46,26 @@ public final class StatisticsCollectJobWorker {
private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
+ private static ScheduleJobBootstrap scheduleJobBootstrap;
+
+ private final ContextManager contextManager;
+
/**
* Initialize job worker.
- *
- * @param contextManager context manager
*/
- public static void initialize(final ContextManager contextManager) {
+ public void initialize() {
if (WORKER_INITIALIZED.compareAndSet(false, true)) {
- start(contextManager);
- }
- }
-
- private static void start(final ContextManager contextManager) {
- ModeConfiguration modeConfig =
contextManager.getComputeNodeInstanceContext().getModeConfiguration();
- if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
- ScheduleJobBootstrap bootstrap = new
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new
StatisticsCollectJob(contextManager), createJobConfiguration());
- bootstrap.schedule();
- return;
+ ModeConfiguration modeConfig =
contextManager.getComputeNodeInstanceContext().getModeConfiguration();
+ if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
+ scheduleJobBootstrap = new
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new
StatisticsCollectJob(contextManager), createJobConfiguration());
+ scheduleJobBootstrap.schedule();
+ return;
+ }
+ log.warn("Can not collect statistics because of unsupported
cluster type: {}", modeConfig.getRepository().getType());
}
- log.warn("Can not collect statistics because of unsupported cluster
type: {}", modeConfig.getRepository().getType());
}
- private static CoordinatorRegistryCenter createRegistryCenter(final
ModeConfiguration modeConfig) {
+ private CoordinatorRegistryCenter createRegistryCenter(final
ModeConfiguration modeConfig) {
ClusterPersistRepositoryConfiguration repositoryConfig =
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
String namespace = String.join("/", repositoryConfig.getNamespace(),
ShardingSphereDataNode.getJobPath());
CoordinatorRegistryCenter result = new
ZookeeperRegistryCenter(getZookeeperConfiguration(repositoryConfig, namespace));
@@ -75,7 +73,7 @@ public final class StatisticsCollectJobWorker {
return result;
}
- private static ZookeeperConfiguration getZookeeperConfiguration(final
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace)
{
+ private ZookeeperConfiguration getZookeeperConfiguration(final
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace)
{
// TODO Merge registry center code in ElasticJob and ShardingSphere
mode; Use SPI to load impl
ZookeeperConfiguration result = new
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
Properties props = repositoryConfig.getProps();
@@ -96,7 +94,17 @@ public final class StatisticsCollectJobWorker {
return result;
}
- private static JobConfiguration createJobConfiguration() {
+ private JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder(JOB_NAME,
1).cron(CRON_EXPRESSION).overwrite(true).build();
}
+
+ /**
+ * Destroy job worker.
+ */
+ public void destroy() {
+ if (WORKER_INITIALIZED.compareAndSet(true, false)) {
+
Optional.ofNullable(scheduleJobBootstrap).ifPresent(ScheduleJobBootstrap::shutdown);
+ scheduleJobBootstrap = null;
+ }
+ }
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index 65efe051f07..90180a3fa8e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
@@ -76,7 +77,9 @@ public final class ListenerAssistedSubscriber implements
EventSubscriber {
private void refreshShardingSphereStatisticsData() {
PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
- if (repository instanceof ClusterPersistRepository) {
+ if (contextManager.getComputeNodeInstanceContext().isCluster()
+ && InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
+ && repository instanceof ClusterPersistRepository) {
new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
index 5fcc2ada39b..4791169367c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
@@ -128,7 +129,9 @@ public final class ResourceMetaDataChangedSubscriber
implements EventSubscriber
private void refreshShardingSphereStatisticsData() {
PersistRepository repository =
contextManager.getPersistServiceFacade().getRepository();
- if (repository instanceof ClusterPersistRepository) {
+ if (contextManager.getComputeNodeInstanceContext().isCluster()
+ && InstanceType.PROXY ==
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
+ && repository instanceof ClusterPersistRepository) {
new ShardingSphereStatisticsRefreshEngine(contextManager, new
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository)
repository))).asyncRefresh();
}
}
diff --git
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
index 6c5492fcc1c..ca206a36203 100644
---
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
+++
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
@@ -30,6 +30,8 @@ import
org.apache.shardingsphere.proxy.frontend.ssl.ProxySSLContext;
import org.apache.shardingsphere.proxy.initializer.BootstrapInitializer;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
@@ -52,6 +54,7 @@ public final class Bootstrap {
YamlProxyConfiguration yamlConfig =
ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath());
int port = bootstrapArgs.getPort().orElseGet(() -> new
ConfigurationProperties(yamlConfig.getServerConfiguration().getProps()).getValue(ConfigurationPropertyKey.PROXY_DEFAULT_PORT));
List<String> addresses = bootstrapArgs.getAddresses();
+ checkPort(addresses, port);
new BootstrapInitializer().init(yamlConfig, port,
bootstrapArgs.isForce());
Optional.ofNullable((Integer)
yamlConfig.getServerConfiguration().getProps().get(ConfigurationPropertyKey.CDC_SERVER_PORT.getKey()))
.ifPresent(optional -> new Thread(new CDCServer(addresses,
optional)).start());
@@ -60,4 +63,12 @@ public final class Bootstrap {
bootstrapArgs.getSocketPath().ifPresent(proxy::start);
proxy.start(port, addresses);
}
+
+ private static void checkPort(final List<String> addresses, final int
port) throws IOException {
+ for (String each : addresses) {
+ try (ServerSocket socket = new ServerSocket()) {
+ socket.bind(new InetSocketAddress(each, port));
+ }
+ }
+ }
}