This is an automated email from the ASF dual-hosted git repository.

zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1278156a2 polish brpc generic (#4402)
1278156a2 is described below

commit 1278156a275fb97248040138e7f71b1ddecaa448
Author: mahaitao <[email protected]>
AuthorDate: Thu Feb 23 22:19:02 2023 +0800

    polish brpc generic (#4402)
    
    * polish brpc generic
    
    * feat:fix
    
    ---------
    
    Co-authored-by: mahaitao617 <[email protected]>
    Co-authored-by: dragon-zhang <[email protected]>
---
 .../apache/shenyu/common/constant/Constants.java   |   5 +
 .../plugin/brpc/cache/ApplicationConfigCache.java  |   5 +-
 .../shenyu/plugin/brpc/proxy/BrpcProxyService.java |   1 -
 .../plugin/brpc/spi/SharedThreadPoolFactory.java   | 116 +++++++++++++++++++++
 ....starlight.api.rpc.threadpool.ThreadPoolFactory |  17 +++
 5 files changed, 141 insertions(+), 3 deletions(-)

diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 68c933077..eb93692a2 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -687,6 +687,11 @@ public interface Constants {
      */
     String NAMESPACE = "namespace";
 
+    /**
+     * brpc  spi bizThreadPoolName.
+     */
+    String SHARED_BIZTHREADPOOLNAME = "shared";
+
     /**
      * String q.
      */
diff --git 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
index 242b9e1a3..0283c38ce 100644
--- 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
+++ 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
@@ -109,7 +109,9 @@ public final class ApplicationConfigCache {
         StarlightClient client = CLIENT_CACHE.get(serviceConfig);
         if (Objects.isNull(client)) {
             BrpcParamExtInfo brpcParamExtInfo = 
GsonUtils.getInstance().fromJson(metaData.getRpcExt(), BrpcParamExtInfo.class);
-            client = new SingleStarlightClient(brpcParamExtInfo.getHost(), 
brpcParamExtInfo.getPort(), new TransportConfig());
+            TransportConfig transportConfig = new TransportConfig();
+            
transportConfig.setBizThreadPoolName(Constants.SHARED_BIZTHREADPOOLNAME);
+            client = new SingleStarlightClient(brpcParamExtInfo.getHost(), 
brpcParamExtInfo.getPort(), transportConfig);
             client.init();
             CLIENT_CACHE.put(serviceConfig, client);
         }
@@ -281,7 +283,6 @@ public final class ApplicationConfigCache {
             this.methodName = methodName;
         }
 
-
         /**
          * Gets paramTypes.
          *
diff --git 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
index 7a8b779c7..4fbe7b997 100644
--- 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
+++ 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
@@ -83,7 +83,6 @@ public class BrpcProxyService {
             }
         }
         initThreadPool();
-        //todo use 
com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory impl it
         CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> 
getValue(metaData, params), threadPool);
         return Mono.fromFuture(future.thenApply(ret -> {
             if (Objects.isNull(ret)) {
diff --git 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
new file mode 100644
index 000000000..2303fc7f4
--- /dev/null
+++ 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.brpc.spi;
+
+import com.baidu.cloud.starlight.api.common.URI;
+import com.baidu.cloud.starlight.api.rpc.RpcService;
+import com.baidu.cloud.starlight.api.rpc.config.ServiceConfig;
+import com.baidu.cloud.starlight.api.rpc.threadpool.NamedThreadFactory;
+import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  shared threadpool factory.
+ */
+public class SharedThreadPoolFactory implements ThreadPoolFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SharedThreadPoolFactory.class);
+
+    private ThreadPoolExecutor defaultThreadPool;
+
+    private final Map<RpcService, ThreadPoolExecutor> threadPoolMap = new 
ConcurrentHashMap();
+
+    public SharedThreadPoolFactory() {
+    }
+
+    @Override
+    public void initDefaultThreadPool(final URI uri, final String 
threadPrefix) {
+        this.defaultThreadPool = 
SpringBeanUtils.getInstance().getBean(ShenyuThreadPoolExecutor.class);
+    }
+
+    @Override
+    public ThreadPoolExecutor getThreadPool(final RpcService rpcService) {
+        if (rpcService == null) {
+            return this.defaultThreadPool;
+        } else if (this.threadPoolMap.get(rpcService) != null) {
+            return (ThreadPoolExecutor) this.threadPoolMap.get(rpcService);
+        } else {
+            ServiceConfig serviceConfig = rpcService.getServiceConfig();
+            if (serviceConfig == null) {
+                return this.defaultThreadPool;
+            } else if (serviceConfig.getCustomizeThreadPool() != null && 
serviceConfig.getCustomizeThreadPool()) {
+                Integer corePoolSize = serviceConfig.getThreadPoolSize();
+                Integer maxThreadPoolSize = 
serviceConfig.getMaxThreadPoolSize();
+                Integer keepAliveTime = 
serviceConfig.getIdleThreadKeepAliveSecond();
+                Integer maxQueueSize = serviceConfig.getMaxRunnableQueueSize();
+
+                try {
+                    ThreadPoolExecutor threadPool;
+                    synchronized (this) {
+                        if (this.threadPoolMap.get(rpcService) != null) {
+                            return (ThreadPoolExecutor) 
this.threadPoolMap.get(rpcService);
+                        }
+
+                        threadPool = new ThreadPoolExecutor(corePoolSize, 
maxThreadPoolSize, (long) keepAliveTime,
+                                TimeUnit.SECONDS, new 
LinkedBlockingQueue(maxQueueSize),
+                                new NamedThreadFactory("service-biz-work"));
+                        this.threadPoolMap.put(rpcService, threadPool);
+                    }
+
+                    return threadPool;
+                } catch (Exception e) {
+                    LOGGER.warn("Create service thread pool failed, will use 
default thread pool");
+                    return this.defaultThreadPool;
+                }
+            } else {
+                return this.defaultThreadPool;
+            }
+        }
+    }
+
+    @Override
+    public ThreadPoolExecutor defaultThreadPool() {
+        return this.defaultThreadPool;
+    }
+
+    @Override
+    public void close() {
+        Iterator var1 = this.threadPoolMap.values().iterator();
+
+        while (var1.hasNext()) {
+            ThreadPoolExecutor threadPool = (ThreadPoolExecutor) var1.next();
+            if (!threadPool.isShutdown()) {
+                threadPool.shutdown();
+            }
+        }
+
+        this.threadPoolMap.clear();
+        if (this.defaultThreadPool != null) {
+            this.defaultThreadPool.shutdown();
+        }
+
+    }
+}
diff --git 
a/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
new file mode 100644
index 000000000..ffcdc2705
--- /dev/null
+++ 
b/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.shenyu.plugin.brpc.spi.SharedThreadPoolFactory

Reply via email to