>From Michael Blow <[email protected]>:
Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18000 )
Change subject: [NO ISSUE][HYR][HTTP] Only bind to unique addresses, report
correct bind failure address(es)
......................................................................
[NO ISSUE][HYR][HTTP] Only bind to unique addresses, report correct bind
failure address(es)
Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
2 files changed, 56 insertions(+), 21 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/00/18000/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index e07cdd4..d616eb5 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -100,7 +100,7 @@
* the subsequent failure
* @return the root exception, or null if both parameters are null
*/
- public static Throwable suppress(Throwable first, Throwable second) {
+ public static <T extends Throwable> T suppress(T first, T second) {
if (second instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 97b8859..7215f27 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,7 +32,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.util.MXHelper;
@@ -56,6 +60,7 @@
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import it.unimi.dsi.fastutil.Pair;
public class HttpServer {
// Constants
@@ -80,8 +85,7 @@
private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final InetSocketAddress defaultAddress;
- private final List<InetSocketAddress> addresses;
+ private final Set<InetSocketAddress> addresses;
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
@@ -114,15 +118,14 @@
this(bossGroup, workerGroup, Collections.singletonList(address),
config, closeHandler);
}
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
List<InetSocketAddress> addresses,
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
Collection<InetSocketAddress> addresses,
HttpServerConfig config, IChannelClosedHandler closeHandler) {
if (addresses.isEmpty()) {
throw new IllegalArgumentException("no addresses specified");
}
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
- this.addresses = addresses;
- defaultAddress = addresses.get(0);
+ this.addresses = new LinkedHashSet<>(addresses);
this.closedHandler = closeHandler;
this.config = config;
channels = new ArrayList<>();
@@ -130,9 +133,16 @@
servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
+ int[] ports =
this.addresses.stream().mapToInt(InetSocketAddress::getPort).distinct().toArray();
+ String desc;
+ if (ports.length > 1) {
+ desc = this.addresses.stream().map(a ->
a.getAddress().getHostAddress() + ":" + a.getPort())
+ .collect(Collectors.joining(",", "[", "]"));
+ } else {
+ desc = "port:" + ports[0];
+ }
executor = new ThreadPoolExecutor(numExecutorThreads,
numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
- runnable -> new Thread(runnable,
- "HttpExecutor(port:" + defaultAddress.getPort() + ")-"
+ threadId.getAndIncrement()));
+ runnable -> new Thread(runnable, "HttpExecutor(" + desc + ")-"
+ threadId.getAndIncrement()));
long directMemoryBudget = numExecutorThreads * (long)
HIGH_WRITE_BUFFER_WATER_MARK
+ numExecutorThreads * config.getMaxResponseChunkSize();
LOGGER.log(Level.DEBUG,
@@ -156,7 +166,6 @@
doStart();
setStarted();
} catch (Throwable e) { // NOSONAR
- LOGGER.error("Failure starting an Http Server at: {}",
defaultAddress, e);
setFailed(e);
throw e;
}
@@ -255,14 +264,19 @@
return servlets.getServlets();
}
- protected void doStart() throws InterruptedException, IOException {
+ protected void doStart() throws Exception {
for (IServlet servlet : servlets.getServlets()) {
- servlet.init();
+ try {
+ servlet.init();
+ } catch (IOException e) {
+ LOGGER.error("Failure initializing servlet {} on http server
{}", servlet, addresses, e);
+ throw e;
+ }
}
bind();
}
- private void bind() throws InterruptedException {
+ private void bind() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -270,17 +284,28 @@
.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
WRITE_BUFFER_WATER_MARK)
.handler(new
LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
- List<ChannelFuture> channelFutures = new ArrayList<>();
+ List<Pair<InetSocketAddress, ChannelFuture>> channelFutures = new
ArrayList<>();
for (InetSocketAddress address : addresses) {
- channelFutures.add(b.bind(address));
+ channelFutures.add(Pair.of(address, b.bind(address)));
}
- for (ChannelFuture future : channelFutures) {
- Channel channel = future.sync().channel();
- channel.closeFuture().addListener(channelCloseListener);
- synchronized (lock) {
- channels.add(channel);
+ Exception failure = null;
+ for (Pair<InetSocketAddress, ChannelFuture> addressFuture :
channelFutures) {
+ try {
+ Channel channel = addressFuture.right().sync().channel();
+ channel.closeFuture().addListener(channelCloseListener);
+ synchronized (lock) {
+ channels.add(channel);
+ }
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("Bind failure starting http server at {}",
addressFuture.left(), e);
+ failure = ExceptionUtils.suppress(failure, e);
}
}
+ if (failure != null) {
+ throw failure;
+ }
}
private void triggerRecovery() throws InterruptedException {
@@ -395,7 +420,7 @@
@Override
public String toString() {
- return "{\"class\":\"" + getClass().getSimpleName() +
"\",\"address\":" + defaultAddress + ",\"state\":\""
+ return "{\"class\":\"" + getClass().getSimpleName() +
"\",\"addresses\":" + addresses + ",\"state\":\""
+ getState() + "\"}";
}
@@ -403,8 +428,9 @@
return config;
}
+ @Deprecated // this returns an arbitrary (the first supplied if collection
is ordered) address
public InetSocketAddress getAddress() {
- return defaultAddress;
+ return addresses.iterator().next();
}
private void closeChannels() throws InterruptedException {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18000
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I5d3feba0c9a35287b07e6d78d52b8e832cbf8725
Gerrit-Change-Number: 18000
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-MessageType: newchange