>From Michael Blow <mb...@apache.org>: Michael Blow has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17904 )
Change subject: [NO ISSUE][HYR][HTTP] Only bind to unique addresses, report correct bind failure address(es) ...................................................................... [NO ISSUE][HYR][HTTP] Only bind to unique addresses, report correct bind failure address(es) (cherry picked from commit 08f27221e518) Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725 --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java 2 files changed, 58 insertions(+), 21 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/17904/1 diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java index 7147542..5c1946a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java @@ -100,7 +100,7 @@ * the subsequent failure * @return the root exception, or null if both parameters are null */ - public static Throwable suppress(Throwable first, Throwable second) { + public static <T extends Throwable> T suppress(T first, T second) { if (second instanceof InterruptedException) { Thread.currentThread().interrupt(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 97b8859..271de53 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -30,7 +32,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.http.api.IChannelClosedHandler; import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.util.MXHelper; @@ -80,8 +85,7 @@ private final ServletRegistry servlets; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; - private final InetSocketAddress defaultAddress; - private final List<InetSocketAddress> addresses; + private final Set<InetSocketAddress> addresses; private final ThreadPoolExecutor executor; // Mutable members private volatile int state = STOPPED; @@ -114,15 +118,14 @@ this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler); } - public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses, + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, Collection<InetSocketAddress> addresses, HttpServerConfig config, IChannelClosedHandler closeHandler) { if (addresses.isEmpty()) { throw new IllegalArgumentException("no addresses specified"); } this.bossGroup = bossGroup; this.workerGroup = workerGroup; - this.addresses = addresses; - defaultAddress = addresses.get(0); + this.addresses = new LinkedHashSet<>(addresses); this.closedHandler = closeHandler; this.config = config; channels = new ArrayList<>(); @@ -130,9 +133,16 @@ servlets = new ServletRegistry(); workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize()); int numExecutorThreads = config.getThreadCount(); + int[] ports = this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray(); + String desc; + if (ports.length > 1) { + desc = this.addresses.stream().map(a -> a.getAddress().getHostAddress() + ":" + a.getPort()) + .collect(Collectors.joining(",", "[", "]")); + } else { + desc = "port:" + ports[0]; + } executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue, - runnable -> new Thread(runnable, - "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement())); + runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-" + threadId.getAndIncrement())); long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK + numExecutorThreads * config.getMaxResponseChunkSize(); LOGGER.log(Level.DEBUG, @@ -156,7 +166,6 @@ doStart(); setStarted(); } catch (Throwable e) { // NOSONAR - LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e); setFailed(e); throw e; } @@ -255,14 +264,19 @@ return servlets.getServlets(); } - protected void doStart() throws InterruptedException, IOException { + protected void doStart() throws Exception { for (IServlet servlet : servlets.getServlets()) { - servlet.init(); + try { + servlet.init(); + } catch (IOException e) { + LOGGER.error("Failure initializing servlet {} on http server {}", servlet, addresses, e); + throw e; + } } bind(); } - private void bind() throws InterruptedException { + private void bind() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE)) @@ -270,17 +284,28 @@ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer()); - List<ChannelFuture> channelFutures = new ArrayList<>(); + List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new ArrayList<>(); for (InetSocketAddress address : addresses) { - channelFutures.add(b.bind(address)); + channelFutures.add(org.apache.commons.lang3.tuple.Pair.of(address, b.bind(address))); } - for (ChannelFuture future : channelFutures) { - Channel channel = future.sync().channel(); - channel.closeFuture().addListener(channelCloseListener); - synchronized (lock) { - channels.add(channel); + Exception failure = null; + for (Pair<InetSocketAddress, ChannelFuture> addressFuture : channelFutures) { + try { + Channel channel = addressFuture.getRight().sync().channel(); + channel.closeFuture().addListener(channelCloseListener); + synchronized (lock) { + channels.add(channel); + } + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOGGER.error("Bind failure starting http server at {}", addressFuture.getLeft(), e); + failure = ExceptionUtils.suppress(failure, e); } } + if (failure != null) { + throw failure; + } } private void triggerRecovery() throws InterruptedException { @@ -395,7 +420,7 @@ @Override public String toString() { - return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\"" + return "{\"class\":\"" + getClass().getSimpleName() + "\",\"addresses\":" + addresses + ",\"state\":\"" + getState() + "\"}"; } @@ -403,8 +428,9 @@ return config; } + @Deprecated // this returns an arbitrary (the first supplied if collection is ordered) address public InetSocketAddress getAddress() { - return defaultAddress; + return addresses.iterator().next(); } private void closeChannels() throws InterruptedException { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17904 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: stabilization-667a908755 Gerrit-Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725 Gerrit-Change-Number: 17904 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Blow <mb...@apache.org> Gerrit-MessageType: newchange