abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2382
Change subject: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel
drops
......................................................................
[ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Previously, when the http server channel drops unexpectedly, we
did nothing.
- After this change, the http server will log the event and try
to re-bind to the port until it either succeeds or
server.stop() is invoked.
Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
---
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
1 file changed, 65 insertions(+), 1 deletion(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/82/2382/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 19436ab..b8ef610 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -73,6 +73,7 @@
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
+ private volatile Thread recoveryThread;
private Channel channel;
private Throwable cause;
@@ -132,6 +133,14 @@
LOGGER.log(Level.ERROR, "Failure stopping an Http Server", e);
setFailed(e);
throw e;
+ }
+ }
+ // Should wait for the recovery thread outside synchronized block
+ Thread rt = recoveryThread;
+ if (rt != null) {
+ rt.join(TimeUnit.SECONDS.toMillis(5));
+ if (recoveryThread != null) {
+ LOGGER.log(Level.ERROR, "Failure stopping recovery thread");
}
}
}
@@ -209,6 +218,45 @@
* Note that it doesn't work for the case where multiple paths map to
a single IServlet
*/
Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() -
l1.getPaths()[0].length());
+ channel = bind();
+ }
+
+ private void triggerRecovery() {
+ // try to revive the channel
+ recoveryThread = new Thread(() -> {
+ synchronized (lock) {
+ while (true) {
+ if (state != STARTED) {
+ break;
+ }
+ try {
+ channel = bind();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while attempting
to revive server channel", e);
+ setFailed(e);
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable th) {
+ // sleep for 5s
+ LOGGER.log(Level.WARN, "Failed server recovery
attempt", th);
+ LOGGER.log(Level.WARN, "Sleeping for 5s before
starting the next attempt");
+ try {
+ lock.wait(TimeUnit.SECONDS.toMillis(5)); // Wait
on lock to allow stop request to be executed
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while
attempting to revive server channel", e);
+ setFailed(e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+ recoveryThread = null;
+ });
+ recoveryThread.start();
+ }
+
+ private Channel bind() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -216,10 +264,26 @@
.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new
HttpServerInitializer(this));
- channel = b.bind(port).sync().channel();
+ Channel channel = b.bind(port).sync().channel();
+ channel.closeFuture().addListener(f -> {
+ // This listener is invoked from within a netty IO thread. Hence,
we can never block it
+ // For simplicity, we will submit the recovery task to a different
thread
+ synchronized (lock) {
+ if (state != STARTED) {
+ return;
+ }
+ LOGGER.log(Level.WARN, "The Http server has stopped
unexpectedly. Starting server recovery");
+ triggerRecovery();
+ }
+ });
+ return channel;
}
protected void doStop() throws InterruptedException {
+ // stop recovery if it was ongoing
+ if (recoveryThread != null) {
+ recoveryThread.interrupt();
+ }
// stop taking new requests
executor.shutdown();
try {
--
To view, visit https://asterix-gerrit.ics.uci.edu/2382
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>