>From Michael Blow <mb...@apache.org>:

Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17904 )


Change subject: [NO ISSUE][HYR][HTTP] Only bind to unique addresses, report 
correct bind failure address(es)
......................................................................

[NO ISSUE][HYR][HTTP] Only bind to unique addresses, report correct bind 
failure address(es)

(cherry picked from commit 08f27221e518)

Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
2 files changed, 58 insertions(+), 21 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/04/17904/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index 7147542..5c1946a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -100,7 +100,7 @@
      *            the subsequent failure
      * @return the root exception, or null if both parameters are null
      */
-    public static Throwable suppress(Throwable first, Throwable second) {
+    public static <T extends Throwable> T suppress(T first, T second) {
         if (second instanceof InterruptedException) {
             Thread.currentThread().interrupt();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 97b8859..271de53 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -21,7 +21,9 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,7 +32,10 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;

+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.http.api.IChannelClosedHandler;
 import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.util.MXHelper;
@@ -80,8 +85,7 @@
     private final ServletRegistry servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final InetSocketAddress defaultAddress;
-    private final List<InetSocketAddress> addresses;
+    private final Set<InetSocketAddress> addresses;
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
@@ -114,15 +118,14 @@
         this(bossGroup, workerGroup, Collections.singletonList(address), 
config, closeHandler);
     }

-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
List<InetSocketAddress> addresses,
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
Collection<InetSocketAddress> addresses,
             HttpServerConfig config, IChannelClosedHandler closeHandler) {
         if (addresses.isEmpty()) {
             throw new IllegalArgumentException("no addresses specified");
         }
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
-        this.addresses = addresses;
-        defaultAddress = addresses.get(0);
+        this.addresses = new LinkedHashSet<>(addresses);
         this.closedHandler = closeHandler;
         this.config = config;
         channels = new ArrayList<>();
@@ -130,9 +133,16 @@
         servlets = new ServletRegistry();
         workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
         int numExecutorThreads = config.getThreadCount();
+        int[] ports = 
this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+        String desc;
+        if (ports.length > 1) {
+            desc = this.addresses.stream().map(a -> 
a.getAddress().getHostAddress() + ":" + a.getPort())
+                    .collect(Collectors.joining(",", "[", "]"));
+        } else {
+            desc = "port:" + ports[0];
+        }
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
-                runnable -> new Thread(runnable,
-                        "HttpExecutor(port:" + defaultAddress.getPort() + ")-" 
+ threadId.getAndIncrement()));
+                runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" 
+ threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) 
HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * config.getMaxResponseChunkSize();
         LOGGER.log(Level.DEBUG,
@@ -156,7 +166,6 @@
                 doStart();
                 setStarted();
             } catch (Throwable e) { // NOSONAR
-                LOGGER.error("Failure starting an Http Server at: {}", 
defaultAddress, e);
                 setFailed(e);
                 throw e;
             }
@@ -255,14 +264,19 @@
         return servlets.getServlets();
     }

-    protected void doStart() throws InterruptedException, IOException {
+    protected void doStart() throws Exception {
         for (IServlet servlet : servlets.getServlets()) {
-            servlet.init();
+            try {
+                servlet.init();
+            } catch (IOException e) {
+                LOGGER.error("Failure initializing servlet {} on http server 
{}", servlet, addresses, e);
+                throw e;
+            }
         }
         bind();
     }

-    private void bind() throws InterruptedException {
+    private void bind() throws Exception {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -270,17 +284,28 @@
                 .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new 
LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
-        List<ChannelFuture> channelFutures = new ArrayList<>();
+        List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new 
ArrayList<>();
         for (InetSocketAddress address : addresses) {
-            channelFutures.add(b.bind(address));
+            channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, 
b.bind(address)));
         }
-        for (ChannelFuture future : channelFutures) {
-            Channel channel = future.sync().channel();
-            channel.closeFuture().addListener(channelCloseListener);
-            synchronized (lock) {
-                channels.add(channel);
+        Exception failure = null;
+        for (Pair<InetSocketAddress, ChannelFuture> addressFuture : 
channelFutures) {
+            try {
+                Channel channel = addressFuture.getRight().sync().channel();
+                channel.closeFuture().addListener(channelCloseListener);
+                synchronized (lock) {
+                    channels.add(channel);
+                }
+            } catch (InterruptedException e) {
+                throw e;
+            } catch (Exception e) {
+                LOGGER.error("Bind failure starting http server at {}", 
addressFuture.getLeft(), e);
+                failure = ExceptionUtils.suppress(failure, e);
             }
         }
+        if (failure != null) {
+            throw failure;
+        }
     }

     private void triggerRecovery() throws InterruptedException {
@@ -395,7 +420,7 @@

     @Override
     public String toString() {
-        return "{\"class\":\"" + getClass().getSimpleName() + 
"\",\"address\":" + defaultAddress + ",\"state\":\""
+        return "{\"class\":\"" + getClass().getSimpleName() + 
"\",\"addresses\":" + addresses + ",\"state\":\""
                 + getState() + "\"}";
     }

@@ -403,8 +428,9 @@
         return config;
     }

+    @Deprecated // this returns an arbitrary (the first supplied if collection 
is ordered) address
     public InetSocketAddress getAddress() {
-        return defaultAddress;
+        return addresses.iterator().next();
     }

     private void closeChannels() throws InterruptedException {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17904
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-667a908755
Gerrit-Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725
Gerrit-Change-Number: 17904
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <mb...@apache.org>
Gerrit-MessageType: newchange

Reply via email to