abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2577
Change subject: [NO ISSUE][HTTP] Http Server Fixes ...................................................................... [NO ISSUE][HTTP] Http Server Fixes - user model changes: no - storage format changes: no - interface changes: no Details: - Since a single connection handles multiple requests, adding a close listener with each request can cause a lot of leftover objects. - To fix this, we maintain a pointer to the request currently being processed and notify when the channel becomes inactive. - When the user sends a new request before the previous one completes processing, the request is failed and the connection is closed. - When the user sends a large request, the connection is always closed. Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2 --- M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java A hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java A hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java M hyracks-fullstack/pom.xml 14 files changed, 344 insertions(+), 56 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/77/2577/1 diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java index 1a7c65f..4897449 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java @@ -79,7 +79,12 @@ /** * Notifies the response that the channel has become writable - * became writable or unwritable. Used for flow control + * Used for flow control. */ void notifyChannelWritable(); + + /** + * Notifies the response that the channel has become Inactive + */ + void notifyChannelInactive(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java index d5f81e5..da11a6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java @@ -43,12 +43,6 @@ this.response = response; this.ctx = ctx; buffer = ctx.alloc().buffer(chunkSize); - // register listener for channel closed - ctx.channel().closeFuture().addListener(futureListener -> { - synchronized (ChunkedNettyOutputStream.this) { - ChunkedNettyOutputStream.this.notifyAll(); - } - }); } @Override @@ -111,8 +105,10 @@ response.beforeFlush(); DefaultHttpContent content = new DefaultHttpContent(buffer); ctx.writeAndFlush(content, ctx.channel().voidPromise()); - // The responisbility of releasing the buffer is now with the netty pipeline since it is forwarded - // within the http content. We must nullify buffer before we allocate the next one to avoid + // The responisbility of releasing the buffer is now with the netty pipeline + // since it is forwarded + // within the http content. We must nullify buffer before we allocate the next + // one to avoid // releasing the buffer twice in case the allocation call fails. buffer = null; buffer = ctx.alloc().buffer(size); @@ -128,7 +124,7 @@ private synchronized void ensureWritable() throws IOException { while (!ctx.channel().isWritable()) { try { - if (!ctx.channel().isOpen()) { + if (!ctx.channel().isActive()) { throw new IOException("Closed channel"); } wait(); @@ -143,4 +139,8 @@ public synchronized void resume() { notifyAll(); } + + public synchronized void channelInactive() { + notifyAll(); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java index 323a463..a33007c 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java @@ -65,6 +65,7 @@ private final ChannelHandlerContext ctx; private final ChunkedNettyOutputStream outputStream; private final PrintWriter writer; + private final HttpServerHandler<?> handler; private HttpResponse response; private boolean headerSent; private ByteBuf error; @@ -72,7 +73,9 @@ private boolean done; private final boolean keepAlive; - public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) { + public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request, + int chunkSize) { + this.handler = handler; this.ctx = ctx; outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this); writer = new PrintWriter(outputStream); @@ -108,7 +111,9 @@ writer.close(); if (error == null && response.status() == HttpResponseStatus.OK) { if (!done) { - future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + future = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT); + future.addListener(f -> handler.requestHandled()); + ctx.flush(); } } else { // There was an error @@ -118,11 +123,13 @@ error.release(); } future = ctx.channel().close(); + future.addListener(f -> handler.requestHandled()); } else { if (keepAlive && response.status() != HttpResponseStatus.UNAUTHORIZED) { response.headers().remove(HttpHeaderNames.CONNECTION); } - // we didn't send anything to the user, we need to send an unchunked error response + // we didn't send anything to the user, we need to send an unchunked error + // response fullResponse(response.protocolVersion(), response.status(), error == null ? ctx.alloc().buffer(0, 0) : error, response.headers()); } @@ -175,10 +182,13 @@ private void fullResponse(HttpVersion version, HttpResponseStatus status, ByteBuf buffer, HttpHeaders headers) { DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(version, status, buffer); fullResponse.headers().set(headers); - // for a full response remove chunked transfer-encoding and set the content length instead + // for a full response remove chunked transfer-encoding and set the content + // length instead fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING); fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); - future = ctx.writeAndFlush(fullResponse); + future = ctx.write(fullResponse); + future.addListener(f -> handler.requestHandled()); + ctx.flush(); headerSent = true; done = true; } @@ -187,4 +197,9 @@ public void notifyChannelWritable() { outputStream.resume(); } + + @Override + public void notifyChannelInactive() { + outputStream.channelInactive(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java index 598048e..eb0a170 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java @@ -40,13 +40,15 @@ public class FullResponse implements IServletResponse { private final ChannelHandlerContext ctx; + private final HttpServerHandler<?> handler; private final ByteArrayOutputStream baos; private final PrintWriter writer; private final FullHttpResponse response; private final boolean keepAlive; private ChannelFuture future; - public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) { + public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) { + this.handler = handler; this.ctx = ctx; baos = new ByteArrayOutputStream(); writer = new PrintWriter(baos); @@ -69,6 +71,7 @@ } } future = ctx.writeAndFlush(fullResponse); + future.addListener(f -> handler.requestHandled()); if (response.status() != HttpResponseStatus.OK && response.status() != HttpResponseStatus.UNAUTHORIZED) { future.addListener(ChannelFutureListener.CLOSE); } @@ -105,4 +108,10 @@ // Do nothing. // This response is sent as a single piece } + + @Override + public void notifyChannelInactive() { + // Do nothing. + // This response is sent as a single piece + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java new file mode 100644 index 0000000..6824183 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestAggregator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.server; + +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; + +public class HttpRequestAggregator extends HttpObjectAggregator { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final FullHttpResponse TOO_LARGE_CLOSE = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER); + boolean failed = false; + + public HttpRequestAggregator(int maxContentLength) { + super(maxContentLength); + } + + public HttpRequestAggregator(int maxContentLength, boolean closeOnExpectationFailed) { + super(maxContentLength, closeOnExpectationFailed); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception { + if (!failed) { + super.decode(ctx, msg, out); + } + } + + @Override + protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage oversized) throws Exception { + failed = true; + LOGGER.warn("A large request encountered. Closing the channel"); + if (oversized instanceof HttpRequest) { + // send back a 413 and close the connection + ChannelFuture future = ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + LOGGER.debug("Failed to send a 413 Request Entity Too Large.", future.cause()); + } + ctx.close(); + } + }); + } else { + throw new IllegalStateException("Unknown large message of class " + oversized.getClass()); + } + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java index 3ab2ab9..2a92594 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java @@ -88,15 +88,9 @@ reject(ctx); return; } + super.channelActive(ctx); // We disable auto read to avoid reading at all if we can't handle any more requests ctx.read(); - super.channelActive(ctx); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.read(); - super.channelReadComplete(ctx); } private boolean overloaded() { diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java index bf8e629..0bfdf59 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java @@ -41,12 +41,13 @@ private final IServletRequest request; private final IServletResponse response; - public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) { + public HttpRequestHandler(HttpServerHandler<?> handler, ChannelHandlerContext ctx, IServlet servlet, + IServletRequest request, int chunkSize) { this.ctx = ctx; this.servlet = servlet; this.request = request; - response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest()) - : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize); + response = chunkSize == 0 ? new FullResponse(handler, ctx, request.getHttpRequest()) + : new ChunkedResponse(handler, ctx, request.getHttpRequest(), chunkSize); request.getHttpRequest().retain(); } @@ -57,7 +58,7 @@ if (!HttpUtil.isKeepAlive(request.getHttpRequest())) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } - } catch (Throwable th) { //NOSONAR + } catch (Throwable th) { // NOSONAR LOGGER.log(Level.ERROR, "Failure handling HTTP Request", th); ctx.close(); } finally { @@ -94,4 +95,8 @@ public IServlet getServlet() { return servlet; } + + public void notifyChannelInactive() { + response.notifyChannelWritable(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 2787b30..911e088 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -43,44 +43,72 @@ private static final Logger LOGGER = LogManager.getLogger(); protected final T server; protected final int chunkSize; - protected HttpRequestHandler handler; + protected volatile HttpRequestHandler handler; + protected volatile IServlet servlet; + protected volatile Future<Void> task; + protected volatile IChannelClosedHandler closeHandler; public HttpServerHandler(T server, int chunkSize) { this.server = server; this.chunkSize = chunkSize; } - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); + public void requestHandled() { + handler = null; + servlet = null; + task = null; + closeHandler = null; } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - if (ctx.channel().isWritable()) { - handler.notifyChannelWritable(); + HttpRequestHandler currentHandler = handler; + if (ctx.channel().isWritable() && currentHandler != null) { + currentHandler.notifyChannelWritable(); } super.channelWritabilityChanged(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + HttpRequestHandler currentHandler = handler; + if (currentHandler != null) { + currentHandler.notifyChannelInactive(); + } + IChannelClosedHandler currentCloseHandler = closeHandler; + IServlet currentServlet = servlet; + Future<Void> currentTask = task; + if (currentCloseHandler != null && currentTask != null && currentServlet != null) { + currentCloseHandler.channelClosed(server, currentServlet, currentTask); + } + super.channelInactive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { FullHttpRequest request = (FullHttpRequest) msg; try { - IServlet servlet = server.getServlet(request); + if (request.decoderResult().isFailure()) { + throw new Exception("Failed to decode request", request.decoderResult().cause()); + } + if (handler != null || servlet != null || closeHandler != null || task != null) { + throw new Exception("Server doesn't support pipelined requests"); + } + servlet = server.getServlet(request); if (servlet == null) { handleServletNotFound(ctx, request); } else { submit(ctx, servlet, request); } } catch (Exception e) { - LOGGER.log(Level.ERROR, "Failure Submitting HTTP Request", e); + LOGGER.log(Level.ERROR, "Failure handling HTTP request", e); respond(ctx, request.protocolVersion(), new HttpResponseStatus(500, e.getMessage())); } } protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) { DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status); + requestHandled(); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @@ -93,17 +121,14 @@ respond(ctx, request.protocolVersion(), HttpResponseStatus.BAD_REQUEST); return; } - handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize); + handler = new HttpRequestHandler(this, ctx, servlet, servletRequest, chunkSize); submit(ctx, servlet); } private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException { try { - Future<Void> task = server.getExecutor(handler).submit(handler); - final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server); - if (closeHandler != null) { - ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task)); - } + task = server.getExecutor(handler).submit(handler); + closeHandler = servlet.getChannelClosedHandler(server); } catch (RejectedExecutionException e) { // NOSONAR LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage()); handler.reject(); @@ -122,4 +147,9 @@ LOGGER.log(Level.ERROR, "Failure handling HTTP Request", cause); ctx.close(); } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java index bc173fd..5f7a219 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java @@ -21,7 +21,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; @@ -30,6 +29,7 @@ public static final int MAX_REQUEST_CHUNK_SIZE = 262144; public static final int MAX_REQUEST_HEADER_SIZE = 262144; public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072; + public static final int MAX_REQUEST_SIZE = 524288; public static final int RESPONSE_CHUNK_SIZE = 4096; private final HttpServer server; @@ -44,7 +44,7 @@ p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, MAX_REQUEST_HEADER_SIZE, MAX_REQUEST_CHUNK_SIZE)); p.addLast(new HttpResponseEncoder()); - p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); + p.addLast(new HttpRequestAggregator(MAX_REQUEST_SIZE, true)); p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java index 17f6f9a..d87a569 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java @@ -41,8 +41,8 @@ protected final HttpUriRequest request; - protected HttpRequestTask() throws URISyntaxException { - request = post(null); + protected HttpRequestTask(int size) throws URISyntaxException { + request = post(null, size); } @Override @@ -82,7 +82,7 @@ } } - protected HttpUriRequest get(String query) throws URISyntaxException { + protected static HttpUriRequest get(String query) throws URISyntaxException { URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH, query, null); RequestBuilder builder = RequestBuilder.get(uri); @@ -90,7 +90,7 @@ return builder.build(); } - protected HttpUriRequest post(String query) throws URISyntaxException { + protected static HttpUriRequest post(String query, int size) throws URISyntaxException { URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH, query, null); RequestBuilder builder = RequestBuilder.post(uri); @@ -102,7 +102,11 @@ String statement = str.toString(); builder.setHeader("Content-type", "application/x-www-form-urlencoded"); builder.addParameter("statement", statement); - builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8)); + for (int i = 0; i < size; i++) { + str.append("This is a string statement that will be ignored"); + str.append('\n'); + } + builder.setEntity(new StringEntity(str.toString(), StandardCharsets.UTF_8)); builder.setCharset(StandardCharsets.UTF_8); return builder.build(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 71e84e5..1ff1730 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -27,6 +27,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -59,7 +60,7 @@ static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger(); static final AtomicInteger OTHER_COUNT = new AtomicInteger(); static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger(); - static final List<HttpRequestTask> TASKS = new ArrayList<>(); + static final List<Callable<Void>> TASKS = new ArrayList<>(); static final List<Future<Void>> FUTURES = new ArrayList<>(); static final ExecutorService executor = Executors.newCachedThreadPool(); @@ -204,6 +205,52 @@ } @Test + public void testLargeRequest() throws Exception { + int numExecutors = 24; + int serverQueueSize = 24; + WebManager webMgr = new WebManager(); + HttpServer server = + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + Exception failure = null; + try { + request(1, 32000); + for (Future<Void> thread : FUTURES) { + thread.get(); + } + } catch (Exception e) { + failure = e; + } finally { + webMgr.stop(); + } + Assert.assertNotNull(failure); + } + + @Test + public void testMultipleSequentialRequests() throws Exception { + int numExecutors = 24; + int serverQueueSize = 24; + WebManager webMgr = new WebManager(); + HttpServer server = + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + try { + sequentialRequest(24); + for (Future<Void> thread : FUTURES) { + thread.get(); + } + } finally { + webMgr.stop(); + } + } + + @Test public void testMalformedString() throws Exception { int numExecutors = 16; int serverQueueSize = 16; @@ -317,7 +364,7 @@ request(1); waitTillQueued(server, 1); FUTURES.remove(0); - HttpRequestTask request = TASKS.remove(0); + HttpRequestTask request = (HttpRequestTask) TASKS.remove(0); request.request.abort(); waitTillQueued(server, 0); synchronized (servlet) { @@ -342,8 +389,19 @@ } private void request(int count) throws URISyntaxException { + request(count, 32); + } + + private void sequentialRequest(int count) throws URISyntaxException { + MultipleSequentialRequestsTask requestTask = new MultipleSequentialRequestsTask(32, count); + Future<Void> next = executor.submit(requestTask); + FUTURES.add(next); + TASKS.add(requestTask); + } + + private void request(int count, int size) throws URISyntaxException { for (int i = 0; i < count; i++) { - HttpRequestTask requestTask = new HttpRequestTask(); + HttpRequestTask requestTask = new HttpRequestTask(size); Future<Void> next = executor.submit(requestTask); FUTURES.add(next); TASKS.add(requestTask); diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java new file mode 100644 index 0000000..889509c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/MultipleSequentialRequestsTask.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.test; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.client.StandardHttpRequestRetryHandler; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public class MultipleSequentialRequestsTask implements Callable<Void> { + + private final int size; + private final int count; + + protected MultipleSequentialRequestsTask(int size, int count) throws URISyntaxException { + this.size = size; + this.count = count; + } + + @Override + public Void call() throws Exception { + try { + HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build(); + + for (int i = 0; i < count; i++) { + HttpUriRequest request = HttpRequestTask.post(null, size); + HttpResponse response = executeHttpRequest(client, request); + if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) { + HttpServerTest.SUCCESS_COUNT.incrementAndGet(); + } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE.code()) { + HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet(); + } else { + HttpServerTest.OTHER_COUNT.incrementAndGet(); + } + InputStream in = response.getEntity().getContent(); + if (HttpServerTest.PRINT_TO_CONSOLE) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line = null; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + IOUtils.closeQuietly(in); + } + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + return null; + } + + protected HttpResponse executeHttpRequest(HttpClient client, HttpUriRequest method) throws Exception { + try { + return client.execute(method); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java index 2b65106..ce20d37 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/MXHelper.java @@ -64,24 +64,24 @@ return getOpenFileDescriptorCount != null; } - public static Integer getCurrentOpenFileCount() { + public static long getCurrentOpenFileCount() { if (getOpenFileDescriptorCount == null) { return -1; } try { - return (Integer) getOpenFileDescriptorCount.invoke(osMXBean); + return (long) getOpenFileDescriptorCount.invoke(osMXBean); } catch (Throwable e) { // NOSONAR LOGGER.log(Level.WARN, "Failure invoking getOpenFileDescriptorCount", e); return -1; } } - public static Integer getMaxOpenFileCount() { + public static long getMaxOpenFileCount() { if (getMaxFileDescriptorCount == null) { return -1; } try { - return (Integer) getMaxFileDescriptorCount.invoke(osMXBean); + return (long) getMaxFileDescriptorCount.invoke(osMXBean); } catch (Throwable e) { // NOSONAR LOGGER.log(Level.WARN, "Failure invoking getMaxFileDescriptorCount", e); return -1; diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml index 393119d..710e3fa 100644 --- a/hyracks-fullstack/pom.xml +++ b/hyracks-fullstack/pom.xml @@ -78,7 +78,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.1.6.Final</version> + <version>4.1.11.Final</version> </dependency> <dependency> <groupId>junit</groupId> -- To view, visit https://asterix-gerrit.ics.uci.edu/2577 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If547b94a2dd6cf459c1bb4626c39444a058755b2 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>