abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2577

Change subject: [NO ISSUE][HTTP] Http Server Fixes
......................................................................

[NO ISSUE][HTTP] Http Server Fixes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Since a single connection handles multiple requests, adding
  a close listener with each request can cause a lot of leftover
  objects.
- To fix this, we maintain a pointer to the request currently being
  processed and notify when the channel becomes inactive.
- When the user sends a new request before the previous one
  completes processing, the request is failed and the connection is
  closed.
- When the user sends a large request, the connection is always
  closed.

Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2
---
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
M hyracks-fullstack/pom.xml
14 files changed, 344 insertions(+), 56 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/77/2577/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 1a7c65f..4897449 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -79,7 +79,12 @@
 
     /**
      * Notifies the response that the channel has become writable
-     * became writable or unwritable. Used for flow control
+     * Used for flow control.
      */
     void notifyChannelWritable();
+
+    /**
+     * Notifies the response that the channel has become Inactive
+     */
+    void notifyChannelInactive();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index d5f81e5..da11a6f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -43,12 +43,6 @@
         this.response = response;
         this.ctx = ctx;
         buffer = ctx.alloc().buffer(chunkSize);
-        // register listener for channel closed
-        ctx.channel().closeFuture().addListener(futureListener -> {
-            synchronized (ChunkedNettyOutputStream.this) {
-                ChunkedNettyOutputStream.this.notifyAll();
-            }
-        });
     }
 
     @Override
@@ -111,8 +105,10 @@
                 response.beforeFlush();
                 DefaultHttpContent content = new DefaultHttpContent(buffer);
                 ctx.writeAndFlush(content, ctx.channel().voidPromise());
-                // The responisbility of releasing the buffer is now with the 
netty pipeline since it is forwarded
-                // within the http content. We must nullify buffer before we 
allocate the next one to avoid
+                // The responisbility of releasing the buffer is now with the 
netty pipeline
+                // since it is forwarded
+                // within the http content. We must nullify buffer before we 
allocate the next
+                // one to avoid
                 // releasing the buffer twice in case the allocation call 
fails.
                 buffer = null;
                 buffer = ctx.alloc().buffer(size);
@@ -128,7 +124,7 @@
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
-                if (!ctx.channel().isOpen()) {
+                if (!ctx.channel().isActive()) {
                     throw new IOException("Closed channel");
                 }
                 wait();
@@ -143,4 +139,8 @@
     public synchronized void resume() {
         notifyAll();
     }
+
+    public synchronized void channelInactive() {
+        notifyAll();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 323a463..a33007c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -65,6 +65,7 @@
     private final ChannelHandlerContext ctx;
     private final ChunkedNettyOutputStream outputStream;
     private final PrintWriter writer;
+    private final HttpServerHandler<?> handler;
     private HttpResponse response;
     private boolean headerSent;
     private ByteBuf error;
@@ -72,7 +73,9 @@
     private boolean done;
     private final boolean keepAlive;
 
-    public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, 
int chunkSize) {
+    public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext 
ctx, FullHttpRequest request,
+            int chunkSize) {
+        this.handler = handler;
         this.ctx = ctx;
         outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
         writer = new PrintWriter(outputStream);
@@ -108,7 +111,9 @@
         writer.close();
         if (error == null && response.status() == HttpResponseStatus.OK) {
             if (!done) {
-                future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+                future = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
+                future.addListener(f -> handler.requestHandled());
+                ctx.flush();
             }
         } else {
             // There was an error
@@ -118,11 +123,13 @@
                     error.release();
                 }
                 future = ctx.channel().close();
+                future.addListener(f -> handler.requestHandled());
             } else {
                 if (keepAlive && response.status() != 
HttpResponseStatus.UNAUTHORIZED) {
                     response.headers().remove(HttpHeaderNames.CONNECTION);
                 }
-                // we didn't send anything to the user, we need to send an 
unchunked error response
+                // we didn't send anything to the user, we need to send an 
unchunked error
+                // response
                 fullResponse(response.protocolVersion(), response.status(),
                         error == null ? ctx.alloc().buffer(0, 0) : error, 
response.headers());
             }
@@ -175,10 +182,13 @@
     private void fullResponse(HttpVersion version, HttpResponseStatus status, 
ByteBuf buffer, HttpHeaders headers) {
         DefaultFullHttpResponse fullResponse = new 
DefaultFullHttpResponse(version, status, buffer);
         fullResponse.headers().set(headers);
-        // for a full response remove chunked transfer-encoding and set the 
content length instead
+        // for a full response remove chunked transfer-encoding and set the 
content
+        // length instead
         fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
         fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 
buffer.readableBytes());
-        future = ctx.writeAndFlush(fullResponse);
+        future = ctx.write(fullResponse);
+        future.addListener(f -> handler.requestHandled());
+        ctx.flush();
         headerSent = true;
         done = true;
     }
@@ -187,4 +197,9 @@
     public void notifyChannelWritable() {
         outputStream.resume();
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        outputStream.channelInactive();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 598048e..eb0a170 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -40,13 +40,15 @@
 
 public class FullResponse implements IServletResponse {
     private final ChannelHandlerContext ctx;
+    private final HttpServerHandler<?> handler;
     private final ByteArrayOutputStream baos;
     private final PrintWriter writer;
     private final FullHttpResponse response;
     private final boolean keepAlive;
     private ChannelFuture future;
 
-    public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+    public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext 
ctx, FullHttpRequest request) {
+        this.handler = handler;
         this.ctx = ctx;
         baos = new ByteArrayOutputStream();
         writer = new PrintWriter(baos);
@@ -69,6 +71,7 @@
             }
         }
         future = ctx.writeAndFlush(fullResponse);
+        future.addListener(f -> handler.requestHandled());
         if (response.status() != HttpResponseStatus.OK && response.status() != 
HttpResponseStatus.UNAUTHORIZED) {
             future.addListener(ChannelFutureListener.CLOSE);
         }
@@ -105,4 +108,10 @@
         // Do nothing.
         // This response is sent as a single piece
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        // Do nothing.
+        // This response is sent as a single piece
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
new file mode 100644
index 0000000..6824183
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.util.List;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class HttpRequestAggregator extends HttpObjectAggregator {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final FullHttpResponse TOO_LARGE_CLOSE = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+            HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, 
Unpooled.EMPTY_BUFFER);
+    boolean failed = false;
+
+    public HttpRequestAggregator(int maxContentLength) {
+        super(maxContentLength);
+    }
+
+    public HttpRequestAggregator(int maxContentLength, boolean 
closeOnExpectationFailed) {
+        super(maxContentLength, closeOnExpectationFailed);
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, HttpObject msg, 
List<Object> out) throws Exception {
+        if (!failed) {
+            super.decode(ctx, msg, out);
+        }
+    }
+
+    @Override
+    protected void handleOversizedMessage(final ChannelHandlerContext ctx, 
HttpMessage oversized) throws Exception {
+        failed = true;
+        LOGGER.warn("A large request encountered. Closing the channel");
+        if (oversized instanceof HttpRequest) {
+            // send back a 413 and close the connection
+            ChannelFuture future = 
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate());
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws 
Exception {
+                    if (!future.isSuccess()) {
+                        LOGGER.debug("Failed to send a 413 Request Entity Too 
Large.", future.cause());
+                    }
+                    ctx.close();
+                }
+            });
+        } else {
+            throw new IllegalStateException("Unknown large message of class " 
+ oversized.getClass());
+        }
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
index 3ab2ab9..2a92594 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
@@ -88,15 +88,9 @@
             reject(ctx);
             return;
         }
+        super.channelActive(ctx);
         // We disable auto read to avoid reading at all if we can't handle any 
more requests
         ctx.read();
-        super.channelActive(ctx);
-    }
-
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
-        ctx.read();
-        super.channelReadComplete(ctx);
     }
 
     private boolean overloaded() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index bf8e629..0bfdf59 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -41,12 +41,13 @@
     private final IServletRequest request;
     private final IServletResponse response;
 
-    public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, 
IServletRequest request, int chunkSize) {
+    public HttpRequestHandler(HttpServerHandler<?> handler, 
ChannelHandlerContext ctx, IServlet servlet,
+            IServletRequest request, int chunkSize) {
         this.ctx = ctx;
         this.servlet = servlet;
         this.request = request;
-        response = chunkSize == 0 ? new FullResponse(ctx, 
request.getHttpRequest())
-                : new ChunkedResponse(ctx, request.getHttpRequest(), 
chunkSize);
+        response = chunkSize == 0 ? new FullResponse(handler, ctx, 
request.getHttpRequest())
+                : new ChunkedResponse(handler, ctx, request.getHttpRequest(), 
chunkSize);
         request.getHttpRequest().retain();
     }
 
@@ -57,7 +58,7 @@
             if (!HttpUtil.isKeepAlive(request.getHttpRequest())) {
                 lastContentFuture.addListener(ChannelFutureListener.CLOSE);
             }
-        } catch (Throwable th) { //NOSONAR
+        } catch (Throwable th) { // NOSONAR
             LOGGER.log(Level.ERROR, "Failure handling HTTP Request", th);
             ctx.close();
         } finally {
@@ -94,4 +95,8 @@
     public IServlet getServlet() {
         return servlet;
     }
+
+    public void notifyChannelInactive() {
+        response.notifyChannelWritable();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 2787b30..911e088 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -43,44 +43,72 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected final T server;
     protected final int chunkSize;
-    protected HttpRequestHandler handler;
+    protected volatile HttpRequestHandler handler;
+    protected volatile IServlet servlet;
+    protected volatile Future<Void> task;
+    protected volatile IChannelClosedHandler closeHandler;
 
     public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
         this.chunkSize = chunkSize;
     }
 
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) {
-        ctx.flush();
+    public void requestHandled() {
+        handler = null;
+        servlet = null;
+        task = null;
+        closeHandler = null;
     }
 
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        if (ctx.channel().isWritable()) {
-            handler.notifyChannelWritable();
+        HttpRequestHandler currentHandler = handler;
+        if (ctx.channel().isWritable() && currentHandler != null) {
+            currentHandler.notifyChannelWritable();
         }
         super.channelWritabilityChanged(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        HttpRequestHandler currentHandler = handler;
+        if (currentHandler != null) {
+            currentHandler.notifyChannelInactive();
+        }
+        IChannelClosedHandler currentCloseHandler = closeHandler;
+        IServlet currentServlet = servlet;
+        Future<Void> currentTask = task;
+        if (currentCloseHandler != null && currentTask != null && 
currentServlet != null) {
+            currentCloseHandler.channelClosed(server, currentServlet, 
currentTask);
+        }
+        super.channelInactive(ctx);
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
         FullHttpRequest request = (FullHttpRequest) msg;
         try {
-            IServlet servlet = server.getServlet(request);
+            if (request.decoderResult().isFailure()) {
+                throw new Exception("Failed to decode request", 
request.decoderResult().cause());
+            }
+            if (handler != null || servlet != null || closeHandler != null || 
task != null) {
+                throw new Exception("Server doesn't support pipelined 
requests");
+            }
+            servlet = server.getServlet(request);
             if (servlet == null) {
                 handleServletNotFound(ctx, request);
             } else {
                 submit(ctx, servlet, request);
             }
         } catch (Exception e) {
-            LOGGER.log(Level.ERROR, "Failure Submitting HTTP Request", e);
+            LOGGER.log(Level.ERROR, "Failure handling HTTP request", e);
             respond(ctx, request.protocolVersion(), new 
HttpResponseStatus(500, e.getMessage()));
         }
     }
 
     protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, 
HttpResponseStatus status) {
         DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, 
status);
+        requestHandled();
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
@@ -93,17 +121,14 @@
             respond(ctx, request.protocolVersion(), 
HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        handler = new HttpRequestHandler(ctx, servlet, servletRequest, 
chunkSize);
+        handler = new HttpRequestHandler(this, ctx, servlet, servletRequest, 
chunkSize);
         submit(ctx, servlet);
     }
 
     private void submit(ChannelHandlerContext ctx, IServlet servlet) throws 
IOException {
         try {
-            Future<Void> task = server.getExecutor(handler).submit(handler);
-            final IChannelClosedHandler closeHandler = 
servlet.getChannelClosedHandler(server);
-            if (closeHandler != null) {
-                ctx.channel().closeFuture().addListener(future -> 
closeHandler.channelClosed(server, servlet, task));
-            }
+            task = server.getExecutor(handler).submit(handler);
+            closeHandler = servlet.getChannelClosedHandler(server);
         } catch (RejectedExecutionException e) { // NOSONAR
             LOGGER.log(Level.WARN, "Request rejected by server executor 
service. " + e.getMessage());
             handler.reject();
@@ -122,4 +147,9 @@
         LOGGER.log(Level.ERROR, "Failure handling HTTP Request", cause);
         ctx.close();
     }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws 
Exception {
+        ctx.read();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index bc173fd..5f7a219 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -21,7 +21,6 @@
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 
@@ -30,6 +29,7 @@
     public static final int MAX_REQUEST_CHUNK_SIZE = 262144;
     public static final int MAX_REQUEST_HEADER_SIZE = 262144;
     public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+    public static final int MAX_REQUEST_SIZE = 524288;
     public static final int RESPONSE_CHUNK_SIZE = 4096;
     private final HttpServer server;
 
@@ -44,7 +44,7 @@
         p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, 
MAX_REQUEST_HEADER_SIZE,
                 MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());
-        p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
+        p.addLast(new HttpRequestAggregator(MAX_REQUEST_SIZE, true));
         p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
index 17f6f9a..d87a569 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -41,8 +41,8 @@
 
     protected final HttpUriRequest request;
 
-    protected HttpRequestTask() throws URISyntaxException {
-        request = post(null);
+    protected HttpRequestTask(int size) throws URISyntaxException {
+        request = post(null, size);
     }
 
     @Override
@@ -82,7 +82,7 @@
         }
     }
 
-    protected HttpUriRequest get(String query) throws URISyntaxException {
+    protected static HttpUriRequest get(String query) throws 
URISyntaxException {
         URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, 
HttpServerTest.PORT, HttpServerTest.PATH,
                 query, null);
         RequestBuilder builder = RequestBuilder.get(uri);
@@ -90,7 +90,7 @@
         return builder.build();
     }
 
-    protected HttpUriRequest post(String query) throws URISyntaxException {
+    protected static HttpUriRequest post(String query, int size) throws 
URISyntaxException {
         URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, 
HttpServerTest.PORT, HttpServerTest.PATH,
                 query, null);
         RequestBuilder builder = RequestBuilder.post(uri);
@@ -102,7 +102,11 @@
         String statement = str.toString();
         builder.setHeader("Content-type", "application/x-www-form-urlencoded");
         builder.addParameter("statement", statement);
-        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+        for (int i = 0; i < size; i++) {
+            str.append("This is a string statement that will be ignored");
+            str.append('\n');
+        }
+        builder.setEntity(new StringEntity(str.toString(), 
StandardCharsets.UTF_8));
         builder.setCharset(StandardCharsets.UTF_8);
         return builder.build();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 71e84e5..1ff1730 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -27,6 +27,7 @@
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -59,7 +60,7 @@
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
     static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
-    static final List<HttpRequestTask> TASKS = new ArrayList<>();
+    static final List<Callable<Void>> TASKS = new ArrayList<>();
     static final List<Future<Void>> FUTURES = new ArrayList<>();
     static final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -204,6 +205,52 @@
     }
 
     @Test
+    public void testLargeRequest() throws Exception {
+        int numExecutors = 24;
+        int serverQueueSize = 24;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, 
numExecutors, serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { 
PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        Exception failure = null;
+        try {
+            request(1, 32000);
+            for (Future<Void> thread : FUTURES) {
+                thread.get();
+            }
+        } catch (Exception e) {
+            failure = e;
+        } finally {
+            webMgr.stop();
+        }
+        Assert.assertNotNull(failure);
+    }
+
+    @Test
+    public void testMultipleSequentialRequests() throws Exception {
+        int numExecutors = 24;
+        int serverQueueSize = 24;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, 
numExecutors, serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { 
PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            sequentialRequest(24);
+            for (Future<Void> thread : FUTURES) {
+                thread.get();
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
+    @Test
     public void testMalformedString() throws Exception {
         int numExecutors = 16;
         int serverQueueSize = 16;
@@ -317,7 +364,7 @@
             request(1);
             waitTillQueued(server, 1);
             FUTURES.remove(0);
-            HttpRequestTask request = TASKS.remove(0);
+            HttpRequestTask request = (HttpRequestTask) TASKS.remove(0);
             request.request.abort();
             waitTillQueued(server, 0);
             synchronized (servlet) {
@@ -342,8 +389,19 @@
     }
 
     private void request(int count) throws URISyntaxException {
+        request(count, 32);
+    }
+
+    private void sequentialRequest(int count) throws URISyntaxException {
+        MultipleSequentialRequestsTask requestTask = new 
MultipleSequentialRequestsTask(32, count);
+        Future<Void> next = executor.submit(requestTask);
+        FUTURES.add(next);
+        TASKS.add(requestTask);
+    }
+
+    private void request(int count, int size) throws URISyntaxException {
         for (int i = 0; i < count; i++) {
-            HttpRequestTask requestTask = new HttpRequestTask();
+            HttpRequestTask requestTask = new HttpRequestTask(size);
             Future<Void> next = executor.submit(requestTask);
             FUTURES.add(next);
             TASKS.add(requestTask);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
new file mode 100644
index 0000000..889509c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class MultipleSequentialRequestsTask implements Callable<Void> {
+
+    private final int size;
+    private final int count;
+
+    protected MultipleSequentialRequestsTask(int size, int count) throws 
URISyntaxException {
+        this.size = size;
+        this.count = count;
+    }
+
+    @Override
+    public Void call() throws Exception {
+        try {
+            HttpClient client = 
HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+
+            for (int i = 0; i < count; i++) {
+                HttpUriRequest request = HttpRequestTask.post(null, size);
+                HttpResponse response = executeHttpRequest(client, request);
+                if (response.getStatusLine().getStatusCode() == 
HttpResponseStatus.OK.code()) {
+                    HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+                } else if (response.getStatusLine().getStatusCode() == 
HttpResponseStatus.SERVICE_UNAVAILABLE.code()) {
+                    HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+                } else {
+                    HttpServerTest.OTHER_COUNT.incrementAndGet();
+                }
+                InputStream in = response.getEntity().getContent();
+                if (HttpServerTest.PRINT_TO_CONSOLE) {
+                    BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+                    String line = null;
+                    while ((line = reader.readLine()) != null) {
+                        System.out.println(line);
+                    }
+                }
+                IOUtils.closeQuietly(in);
+            }
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        return null;
+    }
+
+    protected HttpResponse executeHttpRequest(HttpClient client, 
HttpUriRequest method) throws Exception {
+        try {
+            return client.execute(method);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
index 2b65106..ce20d37 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
@@ -64,24 +64,24 @@
         return getOpenFileDescriptorCount != null;
     }
 
-    public static Integer getCurrentOpenFileCount() {
+    public static long getCurrentOpenFileCount() {
         if (getOpenFileDescriptorCount == null) {
             return -1;
         }
         try {
-            return (Integer) getOpenFileDescriptorCount.invoke(osMXBean);
+            return (long) getOpenFileDescriptorCount.invoke(osMXBean);
         } catch (Throwable e) { // NOSONAR
             LOGGER.log(Level.WARN, "Failure invoking 
getOpenFileDescriptorCount", e);
             return -1;
         }
     }
 
-    public static Integer getMaxOpenFileCount() {
+    public static long getMaxOpenFileCount() {
         if (getMaxFileDescriptorCount == null) {
             return -1;
         }
         try {
-            return (Integer) getMaxFileDescriptorCount.invoke(osMXBean);
+            return (long) getMaxFileDescriptorCount.invoke(osMXBean);
         } catch (Throwable e) { // NOSONAR
             LOGGER.log(Level.WARN, "Failure invoking 
getMaxFileDescriptorCount", e);
             return -1;
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index 393119d..710e3fa 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -78,7 +78,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.1.6.Final</version>
+        <version>4.1.11.Final</version>
       </dependency>
       <dependency>
         <groupId>junit</groupId>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2577
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to