abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2382
Change subject: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops ...................................................................... [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops - user model changes: no - storage format changes: no - interface changes: no details: - Previously, when the http server channel drops unexpectedly, we did nothing. - After this change, the http server will log the event and try to re-bind to the port until it either succeeds or server.stop() is invoked. Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd --- M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java 1 file changed, 65 insertions(+), 1 deletion(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/82/2382/1 diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 19436ab..b8ef610 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -73,6 +73,7 @@ private final ThreadPoolExecutor executor; // Mutable members private volatile int state = STOPPED; + private volatile Thread recoveryThread; private Channel channel; private Throwable cause; @@ -132,6 +133,14 @@ LOGGER.log(Level.ERROR, "Failure stopping an Http Server", e); setFailed(e); throw e; + } + } + // Should wait for the recovery thread outside synchronized block + Thread rt = recoveryThread; + if (rt != null) { + rt.join(TimeUnit.SECONDS.toMillis(5)); + if (recoveryThread != null) { + LOGGER.log(Level.ERROR, "Failure stopping recovery thread"); } } } @@ -209,6 +218,45 @@ * Note that it doesn't work for the case where multiple paths map to a single IServlet */ Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length()); + channel = bind(); + } + + private void triggerRecovery() { + // try to revive the channel + recoveryThread = new Thread(() -> { + synchronized (lock) { + while (true) { + if (state != STARTED) { + break; + } + try { + channel = bind(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while attempting to revive server channel", e); + setFailed(e); + Thread.currentThread().interrupt(); + break; + } catch (Throwable th) { + // sleep for 5s + LOGGER.log(Level.WARN, "Failed server recovery attempt", th); + LOGGER.log(Level.WARN, "Sleeping for 5s before starting the next attempt"); + try { + lock.wait(TimeUnit.SECONDS.toMillis(5)); // Wait on lock to allow stop request to be executed + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while attempting to revive server channel", e); + setFailed(e); + Thread.currentThread().interrupt(); + break; + } + } + } + } + recoveryThread = null; + }); + recoveryThread.start(); + } + + private Channel bind() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE)) @@ -216,10 +264,26 @@ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this)); - channel = b.bind(port).sync().channel(); + Channel channel = b.bind(port).sync().channel(); + channel.closeFuture().addListener(f -> { + // This listener is invoked from within a netty IO thread. Hence, we can never block it + // For simplicity, we will submit the recovery task to a different thread + synchronized (lock) { + if (state != STARTED) { + return; + } + LOGGER.log(Level.WARN, "The Http server has stopped unexpectedly. Starting server recovery"); + triggerRecovery(); + } + }); + return channel; } protected void doStop() throws InterruptedException { + // stop recovery if it was ongoing + if (recoveryThread != null) { + recoveryThread.interrupt(); + } // stop taking new requests executor.shutdown(); try { -- To view, visit https://asterix-gerrit.ics.uci.edu/2382 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>