abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2577
Advertising
Change subject: [NO ISSUE][HTTP] Http Server Fixes
......................................................................
[NO ISSUE][HTTP] Http Server Fixes
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Since a single connection handles multiple requests, adding
a close listener with each request can cause a lot of leftover
objects.
- To fix this, we maintain a pointer to the request currently being
processed and notify when the channel becomes inactive.
- When the user sends a new request before the previous one
completes processing, the request is failed and the connection is
closed.
- When the user sends a large request, the connection is always
closed.
Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2
---
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
M
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
M
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
A
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
M
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
M hyracks-fullstack/pom.xml
14 files changed, 344 insertions(+), 56 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/77/2577/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 1a7c65f..4897449 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -79,7 +79,12 @@
/**
* Notifies the response that the channel has become writable
- * became writable or unwritable. Used for flow control
+ * Used for flow control.
*/
void notifyChannelWritable();
+
+ /**
+ * Notifies the response that the channel has become Inactive
+ */
+ void notifyChannelInactive();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index d5f81e5..da11a6f 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -43,12 +43,6 @@
this.response = response;
this.ctx = ctx;
buffer = ctx.alloc().buffer(chunkSize);
- // register listener for channel closed
- ctx.channel().closeFuture().addListener(futureListener -> {
- synchronized (ChunkedNettyOutputStream.this) {
- ChunkedNettyOutputStream.this.notifyAll();
- }
- });
}
@Override
@@ -111,8 +105,10 @@
response.beforeFlush();
DefaultHttpContent content = new DefaultHttpContent(buffer);
ctx.writeAndFlush(content, ctx.channel().voidPromise());
- // The responisbility of releasing the buffer is now with the
netty pipeline since it is forwarded
- // within the http content. We must nullify buffer before we
allocate the next one to avoid
+ // The responisbility of releasing the buffer is now with the
netty pipeline
+ // since it is forwarded
+ // within the http content. We must nullify buffer before we
allocate the next
+ // one to avoid
// releasing the buffer twice in case the allocation call
fails.
buffer = null;
buffer = ctx.alloc().buffer(size);
@@ -128,7 +124,7 @@
private synchronized void ensureWritable() throws IOException {
while (!ctx.channel().isWritable()) {
try {
- if (!ctx.channel().isOpen()) {
+ if (!ctx.channel().isActive()) {
throw new IOException("Closed channel");
}
wait();
@@ -143,4 +139,8 @@
public synchronized void resume() {
notifyAll();
}
+
+ public synchronized void channelInactive() {
+ notifyAll();
+ }
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 323a463..a33007c 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -65,6 +65,7 @@
private final ChannelHandlerContext ctx;
private final ChunkedNettyOutputStream outputStream;
private final PrintWriter writer;
+ private final HttpServerHandler<?> handler;
private HttpResponse response;
private boolean headerSent;
private ByteBuf error;
@@ -72,7 +73,9 @@
private boolean done;
private final boolean keepAlive;
- public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request,
int chunkSize) {
+ public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext
ctx, FullHttpRequest request,
+ int chunkSize) {
+ this.handler = handler;
this.ctx = ctx;
outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
writer = new PrintWriter(outputStream);
@@ -108,7 +111,9 @@
writer.close();
if (error == null && response.status() == HttpResponseStatus.OK) {
if (!done) {
- future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ future = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
+ future.addListener(f -> handler.requestHandled());
+ ctx.flush();
}
} else {
// There was an error
@@ -118,11 +123,13 @@
error.release();
}
future = ctx.channel().close();
+ future.addListener(f -> handler.requestHandled());
} else {
if (keepAlive && response.status() !=
HttpResponseStatus.UNAUTHORIZED) {
response.headers().remove(HttpHeaderNames.CONNECTION);
}
- // we didn't send anything to the user, we need to send an
unchunked error response
+ // we didn't send anything to the user, we need to send an
unchunked error
+ // response
fullResponse(response.protocolVersion(), response.status(),
error == null ? ctx.alloc().buffer(0, 0) : error,
response.headers());
}
@@ -175,10 +182,13 @@
private void fullResponse(HttpVersion version, HttpResponseStatus status,
ByteBuf buffer, HttpHeaders headers) {
DefaultFullHttpResponse fullResponse = new
DefaultFullHttpResponse(version, status, buffer);
fullResponse.headers().set(headers);
- // for a full response remove chunked transfer-encoding and set the
content length instead
+ // for a full response remove chunked transfer-encoding and set the
content
+ // length instead
fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,
buffer.readableBytes());
- future = ctx.writeAndFlush(fullResponse);
+ future = ctx.write(fullResponse);
+ future.addListener(f -> handler.requestHandled());
+ ctx.flush();
headerSent = true;
done = true;
}
@@ -187,4 +197,9 @@
public void notifyChannelWritable() {
outputStream.resume();
}
+
+ @Override
+ public void notifyChannelInactive() {
+ outputStream.channelInactive();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 598048e..eb0a170 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -40,13 +40,15 @@
public class FullResponse implements IServletResponse {
private final ChannelHandlerContext ctx;
+ private final HttpServerHandler<?> handler;
private final ByteArrayOutputStream baos;
private final PrintWriter writer;
private final FullHttpResponse response;
private final boolean keepAlive;
private ChannelFuture future;
- public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+ public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext
ctx, FullHttpRequest request) {
+ this.handler = handler;
this.ctx = ctx;
baos = new ByteArrayOutputStream();
writer = new PrintWriter(baos);
@@ -69,6 +71,7 @@
}
}
future = ctx.writeAndFlush(fullResponse);
+ future.addListener(f -> handler.requestHandled());
if (response.status() != HttpResponseStatus.OK && response.status() !=
HttpResponseStatus.UNAUTHORIZED) {
future.addListener(ChannelFutureListener.CLOSE);
}
@@ -105,4 +108,10 @@
// Do nothing.
// This response is sent as a single piece
}
+
+ @Override
+ public void notifyChannelInactive() {
+ // Do nothing.
+ // This response is sent as a single piece
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
new file mode 100644
index 0000000..6824183
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.util.List;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class HttpRequestAggregator extends HttpObjectAggregator {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final FullHttpResponse TOO_LARGE_CLOSE = new
DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+ HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
Unpooled.EMPTY_BUFFER);
+ boolean failed = false;
+
+ public HttpRequestAggregator(int maxContentLength) {
+ super(maxContentLength);
+ }
+
+ public HttpRequestAggregator(int maxContentLength, boolean
closeOnExpectationFailed) {
+ super(maxContentLength, closeOnExpectationFailed);
+ }
+
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, HttpObject msg,
List<Object> out) throws Exception {
+ if (!failed) {
+ super.decode(ctx, msg, out);
+ }
+ }
+
+ @Override
+ protected void handleOversizedMessage(final ChannelHandlerContext ctx,
HttpMessage oversized) throws Exception {
+ failed = true;
+ LOGGER.warn("A large request encountered. Closing the channel");
+ if (oversized instanceof HttpRequest) {
+ // send back a 413 and close the connection
+ ChannelFuture future =
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate());
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws
Exception {
+ if (!future.isSuccess()) {
+ LOGGER.debug("Failed to send a 413 Request Entity Too
Large.", future.cause());
+ }
+ ctx.close();
+ }
+ });
+ } else {
+ throw new IllegalStateException("Unknown large message of class "
+ oversized.getClass());
+ }
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
index 3ab2ab9..2a92594 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
@@ -88,15 +88,9 @@
reject(ctx);
return;
}
+ super.channelActive(ctx);
// We disable auto read to avoid reading at all if we can't handle any
more requests
ctx.read();
- super.channelActive(ctx);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws
Exception {
- ctx.read();
- super.channelReadComplete(ctx);
}
private boolean overloaded() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index bf8e629..0bfdf59 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -41,12 +41,13 @@
private final IServletRequest request;
private final IServletResponse response;
- public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet,
IServletRequest request, int chunkSize) {
+ public HttpRequestHandler(HttpServerHandler<?> handler,
ChannelHandlerContext ctx, IServlet servlet,
+ IServletRequest request, int chunkSize) {
this.ctx = ctx;
this.servlet = servlet;
this.request = request;
- response = chunkSize == 0 ? new FullResponse(ctx,
request.getHttpRequest())
- : new ChunkedResponse(ctx, request.getHttpRequest(),
chunkSize);
+ response = chunkSize == 0 ? new FullResponse(handler, ctx,
request.getHttpRequest())
+ : new ChunkedResponse(handler, ctx, request.getHttpRequest(),
chunkSize);
request.getHttpRequest().retain();
}
@@ -57,7 +58,7 @@
if (!HttpUtil.isKeepAlive(request.getHttpRequest())) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
- } catch (Throwable th) { //NOSONAR
+ } catch (Throwable th) { // NOSONAR
LOGGER.log(Level.ERROR, "Failure handling HTTP Request", th);
ctx.close();
} finally {
@@ -94,4 +95,8 @@
public IServlet getServlet() {
return servlet;
}
+
+ public void notifyChannelInactive() {
+ response.notifyChannelWritable();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 2787b30..911e088 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -43,44 +43,72 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final T server;
protected final int chunkSize;
- protected HttpRequestHandler handler;
+ protected volatile HttpRequestHandler handler;
+ protected volatile IServlet servlet;
+ protected volatile Future<Void> task;
+ protected volatile IChannelClosedHandler closeHandler;
public HttpServerHandler(T server, int chunkSize) {
this.server = server;
this.chunkSize = chunkSize;
}
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.flush();
+ public void requestHandled() {
+ handler = null;
+ servlet = null;
+ task = null;
+ closeHandler = null;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
- if (ctx.channel().isWritable()) {
- handler.notifyChannelWritable();
+ HttpRequestHandler currentHandler = handler;
+ if (ctx.channel().isWritable() && currentHandler != null) {
+ currentHandler.notifyChannelWritable();
}
super.channelWritabilityChanged(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ HttpRequestHandler currentHandler = handler;
+ if (currentHandler != null) {
+ currentHandler.notifyChannelInactive();
+ }
+ IChannelClosedHandler currentCloseHandler = closeHandler;
+ IServlet currentServlet = servlet;
+ Future<Void> currentTask = task;
+ if (currentCloseHandler != null && currentTask != null &&
currentServlet != null) {
+ currentCloseHandler.channelClosed(server, currentServlet,
currentTask);
+ }
+ super.channelInactive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
try {
- IServlet servlet = server.getServlet(request);
+ if (request.decoderResult().isFailure()) {
+ throw new Exception("Failed to decode request",
request.decoderResult().cause());
+ }
+ if (handler != null || servlet != null || closeHandler != null ||
task != null) {
+ throw new Exception("Server doesn't support pipelined
requests");
+ }
+ servlet = server.getServlet(request);
if (servlet == null) {
handleServletNotFound(ctx, request);
} else {
submit(ctx, servlet, request);
}
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failure Submitting HTTP Request", e);
+ LOGGER.log(Level.ERROR, "Failure handling HTTP request", e);
respond(ctx, request.protocolVersion(), new
HttpResponseStatus(500, e.getMessage()));
}
}
protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion,
HttpResponseStatus status) {
DefaultHttpResponse response = new DefaultHttpResponse(httpVersion,
status);
+ requestHandled();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@@ -93,17 +121,14 @@
respond(ctx, request.protocolVersion(),
HttpResponseStatus.BAD_REQUEST);
return;
}
- handler = new HttpRequestHandler(ctx, servlet, servletRequest,
chunkSize);
+ handler = new HttpRequestHandler(this, ctx, servlet, servletRequest,
chunkSize);
submit(ctx, servlet);
}
private void submit(ChannelHandlerContext ctx, IServlet servlet) throws
IOException {
try {
- Future<Void> task = server.getExecutor(handler).submit(handler);
- final IChannelClosedHandler closeHandler =
servlet.getChannelClosedHandler(server);
- if (closeHandler != null) {
- ctx.channel().closeFuture().addListener(future ->
closeHandler.channelClosed(server, servlet, task));
- }
+ task = server.getExecutor(handler).submit(handler);
+ closeHandler = servlet.getChannelClosedHandler(server);
} catch (RejectedExecutionException e) { // NOSONAR
LOGGER.log(Level.WARN, "Request rejected by server executor
service. " + e.getMessage());
handler.reject();
@@ -122,4 +147,9 @@
LOGGER.log(Level.ERROR, "Failure handling HTTP Request", cause);
ctx.close();
}
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws
Exception {
+ ctx.read();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index bc173fd..5f7a219 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -21,7 +21,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
@@ -30,6 +29,7 @@
public static final int MAX_REQUEST_CHUNK_SIZE = 262144;
public static final int MAX_REQUEST_HEADER_SIZE = 262144;
public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
+ public static final int MAX_REQUEST_SIZE = 524288;
public static final int RESPONSE_CHUNK_SIZE = 4096;
private final HttpServer server;
@@ -44,7 +44,7 @@
p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH,
MAX_REQUEST_HEADER_SIZE,
MAX_REQUEST_CHUNK_SIZE));
p.addLast(new HttpResponseEncoder());
- p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
+ p.addLast(new HttpRequestAggregator(MAX_REQUEST_SIZE, true));
p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
index 17f6f9a..d87a569 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -41,8 +41,8 @@
protected final HttpUriRequest request;
- protected HttpRequestTask() throws URISyntaxException {
- request = post(null);
+ protected HttpRequestTask(int size) throws URISyntaxException {
+ request = post(null, size);
}
@Override
@@ -82,7 +82,7 @@
}
}
- protected HttpUriRequest get(String query) throws URISyntaxException {
+ protected static HttpUriRequest get(String query) throws
URISyntaxException {
URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST,
HttpServerTest.PORT, HttpServerTest.PATH,
query, null);
RequestBuilder builder = RequestBuilder.get(uri);
@@ -90,7 +90,7 @@
return builder.build();
}
- protected HttpUriRequest post(String query) throws URISyntaxException {
+ protected static HttpUriRequest post(String query, int size) throws
URISyntaxException {
URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST,
HttpServerTest.PORT, HttpServerTest.PATH,
query, null);
RequestBuilder builder = RequestBuilder.post(uri);
@@ -102,7 +102,11 @@
String statement = str.toString();
builder.setHeader("Content-type", "application/x-www-form-urlencoded");
builder.addParameter("statement", statement);
- builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+ for (int i = 0; i < size; i++) {
+ str.append("This is a string statement that will be ignored");
+ str.append('\n');
+ }
+ builder.setEntity(new StringEntity(str.toString(),
StandardCharsets.UTF_8));
builder.setCharset(StandardCharsets.UTF_8);
return builder.build();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 71e84e5..1ff1730 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -27,6 +27,7 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -59,7 +60,7 @@
static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
static final AtomicInteger OTHER_COUNT = new AtomicInteger();
static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
- static final List<HttpRequestTask> TASKS = new ArrayList<>();
+ static final List<Callable<Void>> TASKS = new ArrayList<>();
static final List<Future<Void>> FUTURES = new ArrayList<>();
static final ExecutorService executor = Executors.newCachedThreadPool();
@@ -204,6 +205,52 @@
}
@Test
+ public void testLargeRequest() throws Exception {
+ int numExecutors = 24;
+ int serverQueueSize = 24;
+ WebManager webMgr = new WebManager();
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, serverQueueSize);
+ ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] {
PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ Exception failure = null;
+ try {
+ request(1, 32000);
+ for (Future<Void> thread : FUTURES) {
+ thread.get();
+ }
+ } catch (Exception e) {
+ failure = e;
+ } finally {
+ webMgr.stop();
+ }
+ Assert.assertNotNull(failure);
+ }
+
+ @Test
+ public void testMultipleSequentialRequests() throws Exception {
+ int numExecutors = 24;
+ int serverQueueSize = 24;
+ WebManager webMgr = new WebManager();
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, serverQueueSize);
+ ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] {
PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ sequentialRequest(24);
+ for (Future<Void> thread : FUTURES) {
+ thread.get();
+ }
+ } finally {
+ webMgr.stop();
+ }
+ }
+
+ @Test
public void testMalformedString() throws Exception {
int numExecutors = 16;
int serverQueueSize = 16;
@@ -317,7 +364,7 @@
request(1);
waitTillQueued(server, 1);
FUTURES.remove(0);
- HttpRequestTask request = TASKS.remove(0);
+ HttpRequestTask request = (HttpRequestTask) TASKS.remove(0);
request.request.abort();
waitTillQueued(server, 0);
synchronized (servlet) {
@@ -342,8 +389,19 @@
}
private void request(int count) throws URISyntaxException {
+ request(count, 32);
+ }
+
+ private void sequentialRequest(int count) throws URISyntaxException {
+ MultipleSequentialRequestsTask requestTask = new
MultipleSequentialRequestsTask(32, count);
+ Future<Void> next = executor.submit(requestTask);
+ FUTURES.add(next);
+ TASKS.add(requestTask);
+ }
+
+ private void request(int count, int size) throws URISyntaxException {
for (int i = 0; i < count; i++) {
- HttpRequestTask requestTask = new HttpRequestTask();
+ HttpRequestTask requestTask = new HttpRequestTask(size);
Future<Void> next = executor.submit(requestTask);
FUTURES.add(next);
TASKS.add(requestTask);
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
new file mode 100644
index 0000000..889509c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class MultipleSequentialRequestsTask implements Callable<Void> {
+
+ private final int size;
+ private final int count;
+
+ protected MultipleSequentialRequestsTask(int size, int count) throws
URISyntaxException {
+ this.size = size;
+ this.count = count;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ HttpClient client =
HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+
+ for (int i = 0; i < count; i++) {
+ HttpUriRequest request = HttpRequestTask.post(null, size);
+ HttpResponse response = executeHttpRequest(client, request);
+ if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.OK.code()) {
+ HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+ } else if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.SERVICE_UNAVAILABLE.code()) {
+ HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+ } else {
+ HttpServerTest.OTHER_COUNT.incrementAndGet();
+ }
+ InputStream in = response.getEntity().getContent();
+ if (HttpServerTest.PRINT_TO_CONSOLE) {
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(in));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ IOUtils.closeQuietly(in);
+ }
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ return null;
+ }
+
+ protected HttpResponse executeHttpRequest(HttpClient client,
HttpUriRequest method) throws Exception {
+ try {
+ return client.execute(method);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
index 2b65106..ce20d37 100644
---
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java
@@ -64,24 +64,24 @@
return getOpenFileDescriptorCount != null;
}
- public static Integer getCurrentOpenFileCount() {
+ public static long getCurrentOpenFileCount() {
if (getOpenFileDescriptorCount == null) {
return -1;
}
try {
- return (Integer) getOpenFileDescriptorCount.invoke(osMXBean);
+ return (long) getOpenFileDescriptorCount.invoke(osMXBean);
} catch (Throwable e) { // NOSONAR
LOGGER.log(Level.WARN, "Failure invoking
getOpenFileDescriptorCount", e);
return -1;
}
}
- public static Integer getMaxOpenFileCount() {
+ public static long getMaxOpenFileCount() {
if (getMaxFileDescriptorCount == null) {
return -1;
}
try {
- return (Integer) getMaxFileDescriptorCount.invoke(osMXBean);
+ return (long) getMaxFileDescriptorCount.invoke(osMXBean);
} catch (Throwable e) { // NOSONAR
LOGGER.log(Level.WARN, "Failure invoking
getMaxFileDescriptorCount", e);
return -1;
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index 393119d..710e3fa 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -78,7 +78,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.1.6.Final</version>
+ <version>4.1.11.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
--
To view, visit https://asterix-gerrit.ics.uci.edu/2577
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>