This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f2e325edee Fix the error that the process does not exit after proxy 
startup fail (#32339)
3f2e325edee is described below

commit 3f2e325edee8eb26af0407c306daf2a0e8b3899b
Author: jiangML <[email protected]>
AuthorDate: Wed Jul 31 10:26:59 2024 +0800

    Fix the error that the process does not exit after proxy startup fail 
(#32339)
---
 ...ticsCollectContextManagerLifecycleListener.java |  7 +++-
 .../collect/StatisticsCollectJobWorker.java        | 48 +++++++++++++---------
 .../dispatch/ListenerAssistedSubscriber.java       |  5 ++-
 .../ResourceMetaDataChangedSubscriber.java         |  5 ++-
 .../org/apache/shardingsphere/proxy/Bootstrap.java | 11 +++++
 5 files changed, 53 insertions(+), 23 deletions(-)

diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
index 058118639c6..271996343e9 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectContextManagerLifecycleListener.java
@@ -29,7 +29,12 @@ public final class 
StatisticsCollectContextManagerLifecycleListener implements C
     @Override
     public void onInitialized(final ContextManager contextManager) {
         if (contextManager.getComputeNodeInstanceContext().isCluster() && 
InstanceType.PROXY == 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType())
 {
-            StatisticsCollectJobWorker.initialize(contextManager);
+            new StatisticsCollectJobWorker(contextManager).initialize();
         }
     }
+    
+    @Override
+    public void onDestroyed(final ContextManager contextManager) {
+        new StatisticsCollectJobWorker(contextManager).destroy();
+    }
 }
diff --git 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
index 0c8448d4282..0fce7189905 100644
--- 
a/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
+++ 
b/kernel/schedule/core/src/main/java/org/apache/shardingsphere/schedule/core/job/statistics/collect/StatisticsCollectJobWorker.java
@@ -17,8 +17,7 @@
 
 package org.apache.shardingsphere.schedule.core.job.statistics.collect;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
@@ -30,13 +29,14 @@ import 
org.apache.shardingsphere.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Statistics collect job worker.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
 @Slf4j
 public final class StatisticsCollectJobWorker {
     
@@ -46,28 +46,26 @@ public final class StatisticsCollectJobWorker {
     
     private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
     
+    private static ScheduleJobBootstrap scheduleJobBootstrap;
+    
+    private final ContextManager contextManager;
+    
     /**
      * Initialize job worker.
-     *
-     * @param contextManager context manager
      */
-    public static void initialize(final ContextManager contextManager) {
+    public void initialize() {
         if (WORKER_INITIALIZED.compareAndSet(false, true)) {
-            start(contextManager);
-        }
-    }
-    
-    private static void start(final ContextManager contextManager) {
-        ModeConfiguration modeConfig = 
contextManager.getComputeNodeInstanceContext().getModeConfiguration();
-        if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
-            ScheduleJobBootstrap bootstrap = new 
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new 
StatisticsCollectJob(contextManager), createJobConfiguration());
-            bootstrap.schedule();
-            return;
+            ModeConfiguration modeConfig = 
contextManager.getComputeNodeInstanceContext().getModeConfiguration();
+            if ("ZooKeeper".equals(modeConfig.getRepository().getType())) {
+                scheduleJobBootstrap = new 
ScheduleJobBootstrap(createRegistryCenter(modeConfig), new 
StatisticsCollectJob(contextManager), createJobConfiguration());
+                scheduleJobBootstrap.schedule();
+                return;
+            }
+            log.warn("Can not collect statistics because of unsupported 
cluster type: {}", modeConfig.getRepository().getType());
         }
-        log.warn("Can not collect statistics because of unsupported cluster 
type: {}", modeConfig.getRepository().getType());
     }
     
-    private static CoordinatorRegistryCenter createRegistryCenter(final 
ModeConfiguration modeConfig) {
+    private CoordinatorRegistryCenter createRegistryCenter(final 
ModeConfiguration modeConfig) {
         ClusterPersistRepositoryConfiguration repositoryConfig = 
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
         String namespace = String.join("/", repositoryConfig.getNamespace(), 
ShardingSphereDataNode.getJobPath());
         CoordinatorRegistryCenter result = new 
ZookeeperRegistryCenter(getZookeeperConfiguration(repositoryConfig, namespace));
@@ -75,7 +73,7 @@ public final class StatisticsCollectJobWorker {
         return result;
     }
     
-    private static ZookeeperConfiguration getZookeeperConfiguration(final 
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace) 
{
+    private ZookeeperConfiguration getZookeeperConfiguration(final 
ClusterPersistRepositoryConfiguration repositoryConfig, final String namespace) 
{
         // TODO Merge registry center code in ElasticJob and ShardingSphere 
mode; Use SPI to load impl
         ZookeeperConfiguration result = new 
ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
         Properties props = repositoryConfig.getProps();
@@ -96,7 +94,17 @@ public final class StatisticsCollectJobWorker {
         return result;
     }
     
-    private static JobConfiguration createJobConfiguration() {
+    private JobConfiguration createJobConfiguration() {
         return JobConfiguration.newBuilder(JOB_NAME, 
1).cron(CRON_EXPRESSION).overwrite(true).build();
     }
+    
+    /**
+     * Destroy job worker.
+     */
+    public void destroy() {
+        if (WORKER_INITIALIZED.compareAndSet(true, false)) {
+            
Optional.ofNullable(scheduleJobBootstrap).ifPresent(ScheduleJobBootstrap::shutdown);
+            scheduleJobBootstrap = null;
+        }
+    }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
index 65efe051f07..90180a3fa8e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ListenerAssistedSubscriber.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch
 
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import 
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
@@ -76,7 +77,9 @@ public final class ListenerAssistedSubscriber implements 
EventSubscriber {
     
     private void refreshShardingSphereStatisticsData() {
         PersistRepository repository = 
contextManager.getPersistServiceFacade().getRepository();
-        if (repository instanceof ClusterPersistRepository) {
+        if (contextManager.getComputeNodeInstanceContext().isCluster()
+                && InstanceType.PROXY == 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
+                && repository instanceof ClusterPersistRepository) {
             new ShardingSphereStatisticsRefreshEngine(contextManager, new 
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) 
repository))).asyncRefresh();
         }
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
index 5fcc2ada39b..4791169367c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ResourceMetaDataChangedSubscriber.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch
 
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
@@ -128,7 +129,9 @@ public final class ResourceMetaDataChangedSubscriber 
implements EventSubscriber
     
     private void refreshShardingSphereStatisticsData() {
         PersistRepository repository = 
contextManager.getPersistServiceFacade().getRepository();
-        if (repository instanceof ClusterPersistRepository) {
+        if (contextManager.getComputeNodeInstanceContext().isCluster()
+                && InstanceType.PROXY == 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()
+                && repository instanceof ClusterPersistRepository) {
             new ShardingSphereStatisticsRefreshEngine(contextManager, new 
GlobalLockContext(new GlobalLockPersistService((ClusterPersistRepository) 
repository))).asyncRefresh();
         }
     }
diff --git 
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java 
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
index 6c5492fcc1c..ca206a36203 100644
--- 
a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
+++ 
b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
@@ -30,6 +30,8 @@ import 
org.apache.shardingsphere.proxy.frontend.ssl.ProxySSLContext;
 import org.apache.shardingsphere.proxy.initializer.BootstrapInitializer;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Optional;
@@ -52,6 +54,7 @@ public final class Bootstrap {
         YamlProxyConfiguration yamlConfig = 
ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath());
         int port = bootstrapArgs.getPort().orElseGet(() -> new 
ConfigurationProperties(yamlConfig.getServerConfiguration().getProps()).getValue(ConfigurationPropertyKey.PROXY_DEFAULT_PORT));
         List<String> addresses = bootstrapArgs.getAddresses();
+        checkPort(addresses, port);
         new BootstrapInitializer().init(yamlConfig, port, 
bootstrapArgs.isForce());
         Optional.ofNullable((Integer) 
yamlConfig.getServerConfiguration().getProps().get(ConfigurationPropertyKey.CDC_SERVER_PORT.getKey()))
                 .ifPresent(optional -> new Thread(new CDCServer(addresses, 
optional)).start());
@@ -60,4 +63,12 @@ public final class Bootstrap {
         bootstrapArgs.getSocketPath().ifPresent(proxy::start);
         proxy.start(port, addresses);
     }
+    
+    private static void checkPort(final List<String> addresses, final int 
port) throws IOException {
+        for (String each : addresses) {
+            try (ServerSocket socket = new ServerSocket()) {
+                socket.bind(new InetSocketAddress(each, port));
+            }
+        }
+    }
 }

Reply via email to