abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2382

Change subject: [ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel 
drops
......................................................................

[ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Previously, when the http server channel drops unexpectedly, we
  did nothing.
- After this change, the http server will log the event and try
  to re-bind to the port until it either succeeds or
  server.stop() is invoked.

Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
---
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
1 file changed, 65 insertions(+), 1 deletion(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/82/2382/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 19436ab..b8ef610 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -73,6 +73,7 @@
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
+    private volatile Thread recoveryThread;
     private Channel channel;
     private Throwable cause;
 
@@ -132,6 +133,14 @@
                 LOGGER.log(Level.ERROR, "Failure stopping an Http Server", e);
                 setFailed(e);
                 throw e;
+            }
+        }
+        // Should wait for the recovery thread outside synchronized block
+        Thread rt = recoveryThread;
+        if (rt != null) {
+            rt.join(TimeUnit.SECONDS.toMillis(5));
+            if (recoveryThread != null) {
+                LOGGER.log(Level.ERROR, "Failure stopping recovery thread");
             }
         }
     }
@@ -209,6 +218,45 @@
          * Note that it doesn't work for the case where multiple paths map to 
a single IServlet
          */
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - 
l1.getPaths()[0].length());
+        channel = bind();
+    }
+
+    private void triggerRecovery() {
+        // try to revive the channel
+        recoveryThread = new Thread(() -> {
+            synchronized (lock) {
+                while (true) {
+                    if (state != STARTED) {
+                        break;
+                    }
+                    try {
+                        channel = bind();
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, "Interrupted while attempting 
to revive server channel", e);
+                        setFailed(e);
+                        Thread.currentThread().interrupt();
+                        break;
+                    } catch (Throwable th) {
+                        // sleep for 5s
+                        LOGGER.log(Level.WARN, "Failed server recovery 
attempt", th);
+                        LOGGER.log(Level.WARN, "Sleeping for 5s before 
starting the next attempt");
+                        try {
+                            lock.wait(TimeUnit.SECONDS.toMillis(5)); // Wait 
on lock to allow stop request to be executed
+                        } catch (InterruptedException e) {
+                            LOGGER.log(Level.WARN, "Interrupted while 
attempting to revive server channel", e);
+                            setFailed(e);
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                }
+            }
+            recoveryThread = null;
+        });
+        recoveryThread.start();
+    }
+
+    private Channel bind() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -216,10 +264,26 @@
                 .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new 
HttpServerInitializer(this));
-        channel = b.bind(port).sync().channel();
+        Channel channel = b.bind(port).sync().channel();
+        channel.closeFuture().addListener(f -> {
+            // This listener is invoked from within a netty IO thread. Hence, 
we can never block it
+            // For simplicity, we will submit the recovery task to a different 
thread
+            synchronized (lock) {
+                if (state != STARTED) {
+                    return;
+                }
+                LOGGER.log(Level.WARN, "The Http server has stopped 
unexpectedly. Starting server recovery");
+                triggerRecovery();
+            }
+        });
+        return channel;
     }
 
     protected void doStop() throws InterruptedException {
+        // stop recovery if it was ongoing
+        if (recoveryThread != null) {
+            recoveryThread.interrupt();
+        }
         // stop taking new requests
         executor.shutdown();
         try {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2382
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to