abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1474
Change subject: Use Chunked Http Response
......................................................................
Use Chunked Http Response
Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
8 files changed, 208 insertions(+), 75 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/74/1474/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index c2d1d33..f1e64b3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -193,20 +193,20 @@
statementExecutorFactory.create(aqlStatements,
sessionConfig, compilationProvider);
translator.compileAndExecute(hcc, hds, resultDelivery);
} catch (AsterixException | TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(),
pe);
String errorMessage =
ResultUtil.buildParseExceptionMessage(pe, query);
ObjectNode errorResp =
ResultUtil.getErrorResponse(2, errorMessage, "",
ResultUtil.extractFullStackTrace(pe));
sessionConfig.out().write(new
ObjectMapper().writeValueAsString(errorResp));
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
} catch (Exception e) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(),
e);
- ResultUtil.apiErrorHandler(sessionConfig.out(), e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ ResultUtil.apiErrorHandler(sessionConfig.out(), e);
}
} catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failure handling request", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ LOGGER.log(Level.WARNING, "Failure handling request", e);
return;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 984122b..6c20ec3 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -28,26 +30,24 @@
public class ChunkedNettyOutputStream extends OutputStream {
+ private static final Logger LOGGER =
Logger.getLogger(ChunkedNettyOutputStream.class.getName());
private final ChannelHandlerContext ctx;
private final ChunkedResponse response;
private ByteBuf buffer;
- public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize,
- ChunkedResponse response) {
+ public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize,
ChunkedResponse response) {
this.response = response;
this.ctx = ctx;
buffer = ctx.alloc().buffer(chunkSize);
}
@Override
- public synchronized void write(byte[] b, int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length)) {
+ public void write(byte[] b, int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) >
b.length)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
-
if (len > buffer.capacity()) {
flush();
flush(b, off, len);
@@ -64,7 +64,7 @@
}
@Override
- public synchronized void write(int b) {
+ public void write(int b) throws IOException {
if (buffer.writableBytes() > 0) {
buffer.writeByte(b);
} else {
@@ -74,35 +74,62 @@
}
@Override
- public synchronized void close() throws IOException {
- flush();
- buffer.release();
+ public void close() throws IOException {
+ if (response.isHeaderSent() || response.status() !=
HttpResponseStatus.OK) {
+ flush();
+ buffer.release();
+ } else {
+ response.fullReponse(buffer);
+ }
super.close();
}
@Override
- public synchronized void flush() {
+ public void flush() throws IOException {
+ ensureWritable();
if (buffer.readableBytes() > 0) {
- int size = buffer.capacity();
if (response.status() == HttpResponseStatus.OK) {
- response.flush();
+ int size = buffer.capacity();
+ response.beforeFlush();
DefaultHttpContent content = new DefaultHttpContent(buffer);
- ctx.write(content);
+ ctx.write(content, ctx.channel().voidPromise());
+ buffer = ctx.alloc().buffer(size);
} else {
- response.error(buffer);
+ ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes());
+ aBuffer.writeBytes(buffer);
+ response.error(aBuffer);
}
- buffer = ctx.alloc().buffer(size);
}
}
- private synchronized void flush(byte[] buf, int offset, int len) {
+ private synchronized void ensureWritable() throws IOException {
+ while (!ctx.channel().isWritable()) {
+ try {
+ System.err.println("Stream: became unwritable");
+ ctx.flush();
+ wait();
+ System.err.println("Stream: became writable");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.WARNING, "Interupted while waiting for
channel to be writable", e);
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void flush(byte[] buf, int offset, int len) throws IOException {
+ ensureWritable();
ByteBuf aBuffer = ctx.alloc().buffer(len);
aBuffer.writeBytes(buf, offset, len);
if (response.status() == HttpResponseStatus.OK) {
- response.flush();
- ctx.write(new DefaultHttpContent(aBuffer));
+ response.beforeFlush();
+ ctx.write(new DefaultHttpContent(aBuffer),
ctx.channel().voidPromise());
} else {
response.error(aBuffer);
}
}
+
+ public synchronized void resume() {
+ notifyAll();
+ }
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 19c2664..aa1de0b 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -18,9 +18,6 @@
*/
package org.apache.hyracks.http.server;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
@@ -28,15 +25,28 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
+/**
+ * A chunked http response. Here is how it is expected to work:
+ * If the response is a success aka 200 and is less than chunkSize, then it is
sent as a single http response message
+ * If the response is larger than the chunkSize and the response status is
200, then it is sent as chunks of chunkSize.
+ * If the response status is non 200, then it is always sent as a single http
response message.
+ * If the response status is non 200, then output bufferred before setting the
response status is discarded.
+ * If flush() is called on the writer and even if it is less than chunkSize,
then the response will be chunked.
+ * When chunking, an output buffer is allocated only when the previous buffer
has been sent
+ * If an error occurs after sending the first chunk, the connection will close
abruptly.
+ */
public class ChunkedResponse implements IServletResponse {
private final ChannelHandlerContext ctx;
private final ChunkedNettyOutputStream outputStream;
@@ -45,16 +55,18 @@
private boolean headerSent;
private ByteBuf error;
private ChannelFuture future;
+ private boolean done;
- public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request)
{
+ public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request,
int chunkSize) {
this.ctx = ctx;
- outputStream = new ChunkedNettyOutputStream(ctx, 4096, this);
+ outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
writer = new PrintWriter(outputStream);
- response = new DefaultHttpResponse(HTTP_1_1, OK);
+ response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING,
HttpHeaderValues.CHUNKED);
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION,
HttpHeaderValues.KEEP_ALIVE);
}
+ done = false;
}
@Override
@@ -78,25 +90,42 @@
@Override
public void close() throws IOException {
- if (error == null) {
- writer.close();
- future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ writer.close();
+ if (error == null && response.status() == HttpResponseStatus.OK) {
+ if (!done) {
+ future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ } else {
+ // There was an error
+ if (headerSent) {
+ future = ctx.channel().close();
+ } else {
+ // we didn't send anything to the user, we need to send an
unchunked error response
+ fullResponse(response.protocolVersion(), response.status(),
+ error == null ? ctx.alloc().buffer(0, 0) : error,
response.headers());
+ }
}
+ done = true;
}
public HttpResponseStatus status() {
return response.status();
}
- public void flush() {
+ public void beforeFlush() {
if (!headerSent && response.status() == HttpResponseStatus.OK) {
- ctx.writeAndFlush(response);
+ ctx.write(response, ctx.channel().voidPromise());
headerSent = true;
}
}
public void error(ByteBuf error) {
- this.error = error;
+ if (this.error == null) {
+ this.error = error;
+ } else {
+ this.error.capacity(this.error.capacity() + error.capacity());
+ this.error.writeBytes(error);
+ }
}
@Override
@@ -106,8 +135,28 @@
@Override
public void setStatus(HttpResponseStatus status) {
- // update the response
- // close the stream
- // write the response
+ response.setStatus(status);
+ }
+
+ public boolean isHeaderSent() {
+ return headerSent;
+ }
+
+ public void fullReponse(ByteBuf buffer) {
+ fullResponse(response.protocolVersion(), response.status(), buffer,
response.headers());
+ }
+
+ private void fullResponse(HttpVersion version, HttpResponseStatus status,
ByteBuf buffer, HttpHeaders headers) {
+ DefaultFullHttpResponse fullResponse = new
DefaultFullHttpResponse(version, status, buffer);
+ fullResponse.headers().set(headers);
+ fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,
buffer.readableBytes());
+ future = ctx.writeAndFlush(fullResponse);
+ headerSent = true;
+ done = true;
+ }
+
+ @Override
+ public void resume() {
+ outputStream.resume();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 245f28a..e5222eb 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -93,4 +93,10 @@
public void setStatus(HttpResponseStatus status) {
response.setStatus(status);
}
+
+ @Override
+ public void resume() {
+ // Do nothing.
+ // This response is sent as a single piece
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index f7f55bd..8040dee 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -23,12 +23,16 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.logging.LogLevel;
@@ -36,6 +40,8 @@
public class HttpServer {
// Constants
+ private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
+ private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
private static final Logger LOGGER =
Logger.getLogger(HttpServer.class.getName());
private static final int FAILED = -1;
private static final int STOPPED = 0;
@@ -49,18 +55,19 @@
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final int port;
+ private final ExecutorService executor;
// Mutable members
private volatile int state = STOPPED;
private Channel channel;
private Throwable cause;
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
- int port) {
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
int port) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.port = port;
ctx = new ConcurrentHashMap<>();
lets = new ArrayList<>();
+ executor = Executors.newFixedThreadPool(16);
}
public final void start() throws Exception { // NOSONAR
@@ -158,7 +165,6 @@
lets.add(let);
}
-
protected void doStart() throws InterruptedException {
/*
* This is a hacky way to ensure that ILets with more specific paths
are checked first.
@@ -172,13 +178,12 @@
*/
Collections.sort(lets, (l1, l2) -> l2.getPaths()[0].length() -
l1.getPaths()[0].length());
ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new HttpServerInitializer(this));
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK,
HIGH_WRITE_BUFFER_WATER_MARK))
+ .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new
HttpServerInitializer(this));
channel = b.bind(port).sync().channel();
}
-
protected void doStop() throws InterruptedException {
channel.close();
@@ -212,14 +217,18 @@
return true;
}
} else if (c == '*') {
- return path.regionMatches(path.length() - pathSpec.length() + 1,
- pathSpec, 1, pathSpec.length() - 1);
+ return path.regionMatches(path.length() - pathSpec.length() + 1,
pathSpec, 1, pathSpec.length() - 1);
}
return false;
}
+
private static boolean isPathWildcardMatch(String pathSpec, String path) {
int cpl = pathSpec.length() - 2;
return (pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0,
cpl))
&& (path.length() == cpl || '/' == path.charAt(cpl));
}
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index c8ed937..4da63ed 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,13 +41,19 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
import io.netty.handler.codec.http.multipart.MixedAttribute;
-public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler extends SimpleChannelInboundHandler<Object>
implements Callable<Void> {
private static final Logger LOGGER =
Logger.getLogger(HttpServerHandler.class.getName());
protected final HttpServer server;
+ protected final int chunkSize;
+ protected IServlet servlet;
+ protected IServletRequest request;
+ protected IServletResponse response;
+ protected ChannelHandlerContext ctx;
- public HttpServerHandler(HttpServer server) {
+ public HttpServerHandler(HttpServer server, int chunkSize) {
this.server = server;
+ this.chunkSize = chunkSize;
}
@Override
@@ -55,35 +62,34 @@
}
@Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
+ if (ctx.channel().isWritable()) {
+ response.resume();
+ }
+ super.channelWritabilityChanged(ctx);
+ }
+
+ @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
try {
FullHttpRequest http = (FullHttpRequest) msg;
- IServlet servlet = server.getServlet(http);
+ this.ctx = ctx;
+ servlet = server.getServlet(http);
if (servlet == null) {
- DefaultHttpResponse response = new
DefaultHttpResponse(http.protocolVersion(),
- HttpResponseStatus.NOT_FOUND);
- ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ DefaultHttpResponse notFound =
+ new DefaultHttpResponse(http.protocolVersion(),
HttpResponseStatus.NOT_FOUND);
+ ctx.write(notFound).addListener(ChannelFutureListener.CLOSE);
} else {
if (http.method() != HttpMethod.GET && http.method() !=
HttpMethod.POST) {
- DefaultHttpResponse response = new
DefaultHttpResponse(http.protocolVersion(),
- HttpResponseStatus.METHOD_NOT_ALLOWED);
-
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
+ DefaultHttpResponse notAllowed =
+ new DefaultHttpResponse(http.protocolVersion(),
HttpResponseStatus.METHOD_NOT_ALLOWED);
+
ctx.write(notAllowed).addListener(ChannelFutureListener.CLOSE);
return;
}
- IServletRequest request = http.method() == HttpMethod.GET ?
get(http) : post(http);
- IServletResponse response = new FullResponse(ctx, http);
- try {
- servlet.handle(request, response);
- } catch (Throwable th) { // NOSONAR
- LOGGER.log(Level.WARNING, "Failure during handling of an
IServLetRequest", th);
-
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- } finally {
- response.close();
- }
- ChannelFuture lastContentFuture = response.future();
- if (!HttpUtil.isKeepAlive(http)) {
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
- }
+ request = http.method() == HttpMethod.GET ? get(http) :
post(http);
+ response = new ChunkedResponse(ctx, http, chunkSize);
+ http.retain();
+ server.getExecutor().submit(this);
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", e);
@@ -128,4 +134,28 @@
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
ctx.close();
}
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ try {
+ servlet.handle(request, response);
+ } catch (Throwable th) { // NOSONAR
+ LOGGER.log(Level.WARNING, "Failure during handling of an
IServLetRequest", th);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } finally {
+ response.close();
+ }
+ ChannelFuture lastContentFuture = response.future();
+ if (!HttpUtil.isKeepAlive(request.getHttpRequest())) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ } catch (Throwable th) {//NOSONAR
+ LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", th);
+ ctx.close();
+ } finally {
+ request.getHttpRequest().release();
+ }
+ return null;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index 3b32ee6..bc67865 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -27,9 +27,10 @@
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
- private static final int MAX_CHUNK_SIZE = 262144;
- private static final int MAX_HEADER_SIZE = 262144;
- private static final int MAX_INITIAL_LINE_LENGTH = 131072;
+ private static final int MAX_REQUEST_CHUNK_SIZE = 262144;
+ private static final int MAX_REQUEST_HEADER_SIZE = 262144;
+ private static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+ private static final int RESPONSE_CHUNK_SIZE = 4096;
private HttpServer server;
public HttpServerInitializer(HttpServer server) {
@@ -39,9 +40,10 @@
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
- p.addLast(new HttpRequestDecoder(MAX_INITIAL_LINE_LENGTH,
MAX_HEADER_SIZE, MAX_CHUNK_SIZE));
+ p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH,
MAX_REQUEST_HEADER_SIZE,
+ MAX_REQUEST_CHUNK_SIZE));
p.addLast(new HttpResponseEncoder());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
- p.addLast(new HttpServerHandler(server));
+ p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
index 342e643..4e32436 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java
@@ -34,6 +34,7 @@
/**
* Set a response header
+ *
* @param name
* @param value
* @return
@@ -43,6 +44,7 @@
/**
* Get the output writer for the response
+ *
* @return
* @throws Exception
*/
@@ -50,6 +52,7 @@
/**
* Send the last http response if any and return the future
+ *
* @return
* @throws Exception
*/
@@ -57,12 +60,14 @@
/**
* Set the status of the http response
+ *
* @param status
*/
void setStatus(HttpResponseStatus status);
/**
* Get the output stream for the response
+ *
* @return
*/
OutputStream outputStream();
@@ -74,4 +79,9 @@
public static void setContentType(IServletResponse response, String type)
throws IOException {
response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
}
+
+ /**
+ * resume streaming if stopped
+ */
+ void resume();
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1474
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>