abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1474

Change subject: Use Chunked Http Response
......................................................................

Use Chunked Http Response

Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
8 files changed, 208 insertions(+), 75 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/74/1474/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index c2d1d33..f1e64b3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -193,20 +193,20 @@
                         statementExecutorFactory.create(aqlStatements, 
sessionConfig, compilationProvider);
                 translator.compileAndExecute(hcc, hds, resultDelivery);
             } catch (AsterixException | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+                response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), 
pe);
                 String errorMessage = 
ResultUtil.buildParseExceptionMessage(pe, query);
                 ObjectNode errorResp =
                         ResultUtil.getErrorResponse(2, errorMessage, "", 
ResultUtil.extractFullStackTrace(pe));
                 sessionConfig.out().write(new 
ObjectMapper().writeValueAsString(errorResp));
-                response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             } catch (Exception e) {
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), 
e);
-                ResultUtil.apiErrorHandler(sessionConfig.out(), e);
                 response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                ResultUtil.apiErrorHandler(sessionConfig.out(), e);
             }
         } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "Failure handling request", e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            LOGGER.log(Level.WARNING, "Failure handling request", e);
             return;
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 984122b..6c20ec3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
@@ -28,26 +30,24 @@
 
 public class ChunkedNettyOutputStream extends OutputStream {
 
+    private static final Logger LOGGER = 
Logger.getLogger(ChunkedNettyOutputStream.class.getName());
     private final ChannelHandlerContext ctx;
     private final ChunkedResponse response;
     private ByteBuf buffer;
 
-    public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize,
-            ChunkedResponse response) {
+    public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize, 
ChunkedResponse response) {
         this.response = response;
         this.ctx = ctx;
         buffer = ctx.alloc().buffer(chunkSize);
     }
 
     @Override
-    public synchronized void write(byte[] b, int off, int len) {
-        if ((off < 0) || (off > b.length) || (len < 0) ||
-                ((off + len) > b.length)) {
+    public void write(byte[] b, int off, int len) throws IOException {
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length)) {
             throw new IndexOutOfBoundsException();
         } else if (len == 0) {
             return;
         }
-
         if (len > buffer.capacity()) {
             flush();
             flush(b, off, len);
@@ -64,7 +64,7 @@
     }
 
     @Override
-    public synchronized void write(int b) {
+    public void write(int b) throws IOException {
         if (buffer.writableBytes() > 0) {
             buffer.writeByte(b);
         } else {
@@ -74,35 +74,62 @@
     }
 
     @Override
-    public synchronized void close() throws IOException {
-        flush();
-        buffer.release();
+    public void close() throws IOException {
+        if (response.isHeaderSent() || response.status() != 
HttpResponseStatus.OK) {
+            flush();
+            buffer.release();
+        } else {
+            response.fullReponse(buffer);
+        }
         super.close();
     }
 
     @Override
-    public synchronized void flush() {
+    public void flush() throws IOException {
+        ensureWritable();
         if (buffer.readableBytes() > 0) {
-            int size = buffer.capacity();
             if (response.status() == HttpResponseStatus.OK) {
-                response.flush();
+                int size = buffer.capacity();
+                response.beforeFlush();
                 DefaultHttpContent content = new DefaultHttpContent(buffer);
-                ctx.write(content);
+                ctx.write(content, ctx.channel().voidPromise());
+                buffer = ctx.alloc().buffer(size);
             } else {
-                response.error(buffer);
+                ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes());
+                aBuffer.writeBytes(buffer);
+                response.error(aBuffer);
             }
-            buffer = ctx.alloc().buffer(size);
         }
     }
 
-    private synchronized void flush(byte[] buf, int offset, int len) {
+    private synchronized void ensureWritable() throws IOException {
+        while (!ctx.channel().isWritable()) {
+            try {
+                System.err.println("Stream: became unwritable");
+                ctx.flush();
+                wait();
+                System.err.println("Stream: became writable");
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOGGER.log(Level.WARNING, "Interupted while waiting for 
channel to be writable", e);
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private void flush(byte[] buf, int offset, int len) throws IOException {
+        ensureWritable();
         ByteBuf aBuffer = ctx.alloc().buffer(len);
         aBuffer.writeBytes(buf, offset, len);
         if (response.status() == HttpResponseStatus.OK) {
-            response.flush();
-            ctx.write(new DefaultHttpContent(aBuffer));
+            response.beforeFlush();
+            ctx.write(new DefaultHttpContent(aBuffer), 
ctx.channel().voidPromise());
         } else {
             response.error(aBuffer);
         }
     }
+
+    public synchronized void resume() {
+        notifyAll();
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 19c2664..aa1de0b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hyracks.http.server;
 
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
@@ -28,15 +25,28 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
 
+/**
+ * A chunked http response. Here is how it is expected to work:
+ * If the response is a success aka 200 and is less than chunkSize, then it is 
sent as a single http response message
+ * If the response is larger than the chunkSize and the response status is 
200, then it is sent as chunks of chunkSize.
+ * If the response status is non 200, then it is always sent as a single http 
response message.
+ * If the response status is non 200, then output bufferred before setting the 
response status is discarded.
+ * If flush() is called on the writer and even if it is less than chunkSize, 
then the response will be chunked.
+ * When chunking, an output buffer is allocated only when the previous buffer 
has been sent
+ * If an error occurs after sending the first chunk, the connection will close 
abruptly.
+ */
 public class ChunkedResponse implements IServletResponse {
     private final ChannelHandlerContext ctx;
     private final ChunkedNettyOutputStream outputStream;
@@ -45,16 +55,18 @@
     private boolean headerSent;
     private ByteBuf error;
     private ChannelFuture future;
+    private boolean done;
 
-    public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request) 
{
+    public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, 
int chunkSize) {
         this.ctx = ctx;
-        outputStream = new ChunkedNettyOutputStream(ctx, 4096, this);
+        outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
         writer = new PrintWriter(outputStream);
-        response = new DefaultHttpResponse(HTTP_1_1, OK);
+        response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
         response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, 
HttpHeaderValues.CHUNKED);
         if (HttpUtil.isKeepAlive(request)) {
             response.headers().set(HttpHeaderNames.CONNECTION, 
HttpHeaderValues.KEEP_ALIVE);
         }
+        done = false;
     }
 
     @Override
@@ -78,25 +90,42 @@
 
     @Override
     public void close() throws IOException {
-        if (error == null) {
-            writer.close();
-            future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+        writer.close();
+        if (error == null && response.status() == HttpResponseStatus.OK) {
+            if (!done) {
+                future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+            }
+        } else {
+            // There was an error
+            if (headerSent) {
+                future = ctx.channel().close();
+            } else {
+                // we didn't send anything to the user, we need to send an 
unchunked error response
+                fullResponse(response.protocolVersion(), response.status(),
+                        error == null ? ctx.alloc().buffer(0, 0) : error, 
response.headers());
+            }
         }
+        done = true;
     }
 
     public HttpResponseStatus status() {
         return response.status();
     }
 
-    public void flush() {
+    public void beforeFlush() {
         if (!headerSent && response.status() == HttpResponseStatus.OK) {
-            ctx.writeAndFlush(response);
+            ctx.write(response, ctx.channel().voidPromise());
             headerSent = true;
         }
     }
 
     public void error(ByteBuf error) {
-        this.error = error;
+        if (this.error == null) {
+            this.error = error;
+        } else {
+            this.error.capacity(this.error.capacity() + error.capacity());
+            this.error.writeBytes(error);
+        }
     }
 
     @Override
@@ -106,8 +135,28 @@
 
     @Override
     public void setStatus(HttpResponseStatus status) {
-        // update the response
-        // close the stream
-        // write the response
+        response.setStatus(status);
+    }
+
+    public boolean isHeaderSent() {
+        return headerSent;
+    }
+
+    public void fullReponse(ByteBuf buffer) {
+        fullResponse(response.protocolVersion(), response.status(), buffer, 
response.headers());
+    }
+
+    private void fullResponse(HttpVersion version, HttpResponseStatus status, 
ByteBuf buffer, HttpHeaders headers) {
+        DefaultFullHttpResponse fullResponse = new 
DefaultFullHttpResponse(version, status, buffer);
+        fullResponse.headers().set(headers);
+        fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 
buffer.readableBytes());
+        future = ctx.writeAndFlush(fullResponse);
+        headerSent = true;
+        done = true;
+    }
+
+    @Override
+    public void resume() {
+        outputStream.resume();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 245f28a..e5222eb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -93,4 +93,10 @@
     public void setStatus(HttpResponseStatus status) {
         response.setStatus(status);
     }
+
+    @Override
+    public void resume() {
+        // Do nothing.
+        // This response is sent as a single piece
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index f7f55bd..8040dee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -23,12 +23,16 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.logging.LogLevel;
@@ -36,6 +40,8 @@
 
 public class HttpServer {
     // Constants
+    private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
+    private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
     private static final Logger LOGGER = 
Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -49,18 +55,19 @@
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final int port;
+    private final ExecutorService executor;
     // Mutable members
     private volatile int state = STOPPED;
     private Channel channel;
     private Throwable cause;
 
-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
-            int port) {
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
int port) {
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
         this.port = port;
         ctx = new ConcurrentHashMap<>();
         lets = new ArrayList<>();
+        executor = Executors.newFixedThreadPool(16);
     }
 
     public final void start() throws Exception { // NOSONAR
@@ -158,7 +165,6 @@
         lets.add(let);
     }
 
-
     protected void doStart() throws InterruptedException {
         /*
          * This is a hacky way to ensure that ILets with more specific paths 
are checked first.
@@ -172,13 +178,12 @@
          */
         Collections.sort(lets, (l1, l2) -> l2.getPaths()[0].length() - 
l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
-        b.group(bossGroup, workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .handler(new LoggingHandler(LogLevel.INFO))
-                .childHandler(new HttpServerInitializer(this));
+        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                        new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK))
+                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new 
HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
     }
-
 
     protected void doStop() throws InterruptedException {
         channel.close();
@@ -212,14 +217,18 @@
                 return true;
             }
         } else if (c == '*') {
-            return path.regionMatches(path.length() - pathSpec.length() + 1,
-                    pathSpec, 1, pathSpec.length() - 1);
+            return path.regionMatches(path.length() - pathSpec.length() + 1, 
pathSpec, 1, pathSpec.length() - 1);
         }
         return false;
     }
+
     private static boolean isPathWildcardMatch(String pathSpec, String path) {
         int cpl = pathSpec.length() - 2;
         return (pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0, 
cpl))
                 && (path.length() == cpl || '/' == path.charAt(cpl));
     }
+
+    public ExecutorService getExecutor() {
+        return executor;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index c8ed937..4da63ed 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,13 +41,19 @@
 import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
 import io.netty.handler.codec.http.multipart.MixedAttribute;
 
-public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler extends SimpleChannelInboundHandler<Object> 
implements Callable<Void> {
 
     private static final Logger LOGGER = 
Logger.getLogger(HttpServerHandler.class.getName());
     protected final HttpServer server;
+    protected final int chunkSize;
+    protected IServlet servlet;
+    protected IServletRequest request;
+    protected IServletResponse response;
+    protected ChannelHandlerContext ctx;
 
-    public HttpServerHandler(HttpServer server) {
+    public HttpServerHandler(HttpServer server, int chunkSize) {
         this.server = server;
+        this.chunkSize = chunkSize;
     }
 
     @Override
@@ -55,35 +62,34 @@
     }
 
     @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
+        if (ctx.channel().isWritable()) {
+            response.resume();
+        }
+        super.channelWritabilityChanged(ctx);
+    }
+
+    @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
         try {
             FullHttpRequest http = (FullHttpRequest) msg;
-            IServlet servlet = server.getServlet(http);
+            this.ctx = ctx;
+            servlet = server.getServlet(http);
             if (servlet == null) {
-                DefaultHttpResponse response = new 
DefaultHttpResponse(http.protocolVersion(),
-                        HttpResponseStatus.NOT_FOUND);
-                ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+                DefaultHttpResponse notFound =
+                        new DefaultHttpResponse(http.protocolVersion(), 
HttpResponseStatus.NOT_FOUND);
+                ctx.write(notFound).addListener(ChannelFutureListener.CLOSE);
             } else {
                 if (http.method() != HttpMethod.GET && http.method() != 
HttpMethod.POST) {
-                    DefaultHttpResponse response = new 
DefaultHttpResponse(http.protocolVersion(),
-                            HttpResponseStatus.METHOD_NOT_ALLOWED);
-                    
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+                    DefaultHttpResponse notAllowed =
+                            new DefaultHttpResponse(http.protocolVersion(), 
HttpResponseStatus.METHOD_NOT_ALLOWED);
+                    
ctx.write(notAllowed).addListener(ChannelFutureListener.CLOSE);
                     return;
                 }
-                IServletRequest request = http.method() == HttpMethod.GET ? 
get(http) : post(http);
-                IServletResponse response = new FullResponse(ctx, http);
-                try {
-                    servlet.handle(request, response);
-                } catch (Throwable th) { // NOSONAR
-                    LOGGER.log(Level.WARNING, "Failure during handling of an 
IServLetRequest", th);
-                    
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-                } finally {
-                    response.close();
-                }
-                ChannelFuture lastContentFuture = response.future();
-                if (!HttpUtil.isKeepAlive(http)) {
-                    lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-                }
+                request = http.method() == HttpMethod.GET ? get(http) : 
post(http);
+                response = new ChunkedResponse(ctx, http, chunkSize);
+                http.retain();
+                server.getExecutor().submit(this);
             }
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", e);
@@ -128,4 +134,28 @@
         LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
         ctx.close();
     }
+
+    @Override
+    public Void call() throws Exception {
+        try {
+            try {
+                servlet.handle(request, response);
+            } catch (Throwable th) { // NOSONAR
+                LOGGER.log(Level.WARNING, "Failure during handling of an 
IServLetRequest", th);
+                response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            } finally {
+                response.close();
+            }
+            ChannelFuture lastContentFuture = response.future();
+            if (!HttpUtil.isKeepAlive(request.getHttpRequest())) {
+                lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+            }
+        } catch (Throwable th) {//NOSONAR
+            LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", th);
+            ctx.close();
+        } finally {
+            request.getHttpRequest().release();
+        }
+        return null;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index 3b32ee6..bc67865 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -27,9 +27,10 @@
 
 public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
 
-    private static final int MAX_CHUNK_SIZE = 262144;
-    private static final int MAX_HEADER_SIZE = 262144;
-    private static final int MAX_INITIAL_LINE_LENGTH = 131072;
+    private static final int MAX_REQUEST_CHUNK_SIZE = 262144;
+    private static final int MAX_REQUEST_HEADER_SIZE = 262144;
+    private static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+    private static final int RESPONSE_CHUNK_SIZE = 4096;
     private HttpServer server;
 
     public HttpServerInitializer(HttpServer server) {
@@ -39,9 +40,10 @@
     @Override
     public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
-        p.addLast(new HttpRequestDecoder(MAX_INITIAL_LINE_LENGTH, 
MAX_HEADER_SIZE, MAX_CHUNK_SIZE));
+        p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, 
MAX_REQUEST_HEADER_SIZE,
+                MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());
         p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
-        p.addLast(new HttpServerHandler(server));
+        p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
index 342e643..4e32436 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
@@ -34,6 +34,7 @@
 
     /**
      * Set a response header
+     *
      * @param name
      * @param value
      * @return
@@ -43,6 +44,7 @@
 
     /**
      * Get the output writer for the response
+     *
      * @return
      * @throws Exception
      */
@@ -50,6 +52,7 @@
 
     /**
      * Send the last http response if any and return the future
+     *
      * @return
      * @throws Exception
      */
@@ -57,12 +60,14 @@
 
     /**
      * Set the status of the http response
+     *
      * @param status
      */
     void setStatus(HttpResponseStatus status);
 
     /**
      * Get the output stream for the response
+     *
      * @return
      */
     OutputStream outputStream();
@@ -74,4 +79,9 @@
     public static void setContentType(IServletResponse response, String type) 
throws IOException {
         response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
     }
+
+    /**
+     * resume streaming if stopped
+     */
+    void resume();
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1474
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to