abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-2052][OTH] Release resources on http request rejection ......................................................................
[ASTERIXDB-2052][OTH] Release resources on http request rejection - user model changes: no - storage format changes: no - interface changes: no details: - When a request is rejected, we release its resources. - A test case was added which sends 3500+ rejected requests and causes the server to throw out of memory error prior to this fix. Change-Id: Ia0e3f3e6e2f94a31f296b3491a07f624a4fea604 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1955 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M hyracks-fullstack/hyracks/hyracks-http/pom.xml M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java R hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java M hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java 8 files changed, 174 insertions(+), 31 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1cec616..1f1d282 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -62,6 +62,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import io.netty.handler.codec.http.HttpResponseStatus; public class QueryServiceServlet extends AbstractQueryApiServlet { @@ -93,6 +94,12 @@ // Servlet methods should not throw exceptions // http://cwe.mitre.org/data/definitions/600.html GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); + } catch (Throwable th) {// NOSONAR: Logging and re-throwing + try { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, th.getMessage(), th); + } catch (Throwable ignored) { // NOSONAR: Logging failure + } + throw th; } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml index 6439adb..cb69caa 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml @@ -27,6 +27,9 @@ <artifactId>hyracks-http</artifactId> <properties> <root.dir>${basedir}/../..</root.dir> + <direct.mem>-XX:MaxDirectMemorySize</direct.mem> + <num.arenas>-Dio.netty.allocator.numDirectArenas</num.arenas> + <max.order>-Dio.netty.allocator.maxOrder</max.order> </properties> <build> <plugins> @@ -34,7 +37,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <argLine>-XX:MaxDirectMemorySize=16M</argLine> + <argLine>${direct.mem}=16M ${num.arenas}=4 ${max.order}=7</argLine> </configuration> </plugin> </plugins> diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java index 64be051..47714ae 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java @@ -113,7 +113,7 @@ } else { // There was an error if (headerSent) { - LOGGER.log(Level.WARNING,"Error after header write of chunked response"); + LOGGER.log(Level.WARNING, "Error after header write of chunked response"); if (error != null) { error.release(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java index aa0f32a..cabb01f 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java @@ -82,7 +82,11 @@ } public void reject() throws IOException { - response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); - response.close(); + try { + response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); + response.close(); + } finally { + request.getHttpRequest().release(); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 6ceafc6..45634ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -60,6 +60,7 @@ private final Object lock = new Object(); private final AtomicInteger threadId = new AtomicInteger(); private final ConcurrentMap<String, Object> ctx; + private final LinkedBlockingQueue<Runnable> workQueue; private final List<IServlet> servlets; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; @@ -81,8 +82,8 @@ this.port = port; ctx = new ConcurrentHashMap<>(); servlets = new ArrayList<>(); - executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(requestQueueSize), + workQueue = new LinkedBlockingQueue<>(requestQueueSize); + executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue, runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement())); long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE; @@ -270,4 +271,8 @@ protected EventLoopGroup getWorkerGroup() { return workerGroup; } + + public int getWorkQueueSize() { + return workQueue.size(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java index 863eddd..bf0452b 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java @@ -54,6 +54,11 @@ } @Override + protected void post(IServletRequest request, IServletResponse response) throws Exception { + get(request, response); + } + + @Override protected void get(IServletRequest request, IServletResponse response) throws Exception { response.setStatus(HttpResponseStatus.OK); HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8); diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java similarity index 71% rename from hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java rename to hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java index 065d803..6bfa0cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java @@ -28,16 +28,39 @@ import io.netty.handler.codec.http.HttpResponseStatus; -public class SlowServlet extends AbstractServlet { - public SlowServlet(ConcurrentMap<String, Object> ctx, String[] paths) { +public class SleepyServlet extends AbstractServlet { + + private volatile boolean sleep = true; + + public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) { super(ctx, paths); + } + + @Override + protected void post(IServletRequest request, IServletResponse response) throws Exception { + get(request, response); } @Override protected void get(IServletRequest request, IServletResponse response) throws Exception { response.setStatus(HttpResponseStatus.OK); - Thread.sleep(5000); // NOSONAR + if (sleep) { + synchronized (this) { + while (sleep) { + this.wait(); + } + } + } HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8); response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8)); } + + public synchronized void wakeUp() { + sleep = false; + notifyAll(); + } + + public void sleep() { + sleep = true; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java index 6512dc1..66d1b77 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java @@ -30,6 +30,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; @@ -37,12 +40,13 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.StandardHttpRequestRetryHandler; import org.apache.hyracks.http.server.HttpServer; import org.apache.hyracks.http.server.WebManager; import org.apache.hyracks.http.servlet.ChattyServlet; -import org.apache.hyracks.http.servlet.SlowServlet; +import org.apache.hyracks.http.servlet.SleepyServlet; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,16 +56,14 @@ public class HttpServerTest { static final boolean PRINT_TO_CONSOLE = false; static final int PORT = 9898; - static final int NUM_EXECUTOR_THREADS = 16; - static final int SERVER_QUEUE_SIZE = 16; - static final int NUM_OF_REQUESTS = 48; static final String HOST = "localhost"; static final String PROTOCOL = "http"; static final String PATH = "/"; static final AtomicInteger SUCCESS_COUNT = new AtomicInteger(); static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger(); static final AtomicInteger OTHER_COUNT = new AtomicInteger(); - static final List<Thread> THREADS = new ArrayList<>(); + static final List<Future<Void>> FUTURES = new ArrayList<>(); + static final ExecutorService executor = Executors.newCachedThreadPool(); @Before public void setUp() { @@ -73,31 +75,106 @@ @Test public void testOverloadingServer() throws Exception { WebManager webMgr = new WebManager(); + int numExecutors = 16; + int serverQueueSize = 16; + int numRequests = 48; HttpServer server = - new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE); - SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH }); + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); server.addServlet(servlet); webMgr.add(server); webMgr.start(); + int expectedSuccess = numExecutors + serverQueueSize; + int expectedUnavailable = numRequests - expectedSuccess; try { - request(NUM_OF_REQUESTS); - for (Thread thread : THREADS) { - thread.join(); + request(expectedSuccess); + waitTillQueued(server, serverQueueSize); + ArrayList<Future<Void>> successSet = started(); + request(expectedUnavailable); + ArrayList<Future<Void>> rejectedSet = started(); + for (Future<Void> f : rejectedSet) { + f.get(); } - Assert.assertEquals(32, SUCCESS_COUNT.get()); - Assert.assertEquals(16, UNAVAILABLE_COUNT.get()); + servlet.wakeUp(); + for (Future<Void> f : successSet) { + f.get(); + } + Assert.assertEquals(expectedSuccess, SUCCESS_COUNT.get()); + Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get()); Assert.assertEquals(0, OTHER_COUNT.get()); + } catch (Throwable th) { + th.printStackTrace(); + throw th; } finally { webMgr.stop(); } } + private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception { + int maxAttempts = 5; + int attempt = 0; + int queued = server.getWorkQueueSize(); + while (queued != expectedQueued) { + attempt++; + if (attempt > maxAttempts) { + throw new Exception("Number of queued requests (" + queued + ") didn't match the expected number (" + + expectedQueued + ")"); + } + Thread.sleep(1000); // NOSONAR polling is the clean way + queued = server.getWorkQueueSize(); + } + } + + @Test + public void testReleaseRejectedRequest() throws Exception { + WebManager webMgr = new WebManager(); + int numRequests = 64; + int numExecutors = 2; + int serverQueueSize = 2; + int numPatches = 60; + HttpServer server = + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); + server.addServlet(servlet); + webMgr.add(server); + webMgr.start(); + request(numExecutors + serverQueueSize); + ArrayList<Future<Void>> stuck = started(); + waitTillQueued(server, serverQueueSize); + try { + try { + for (int i = 0; i < numPatches; i++) { + ChattyServlet.printMemUsage(); + request(numRequests); + for (Future<Void> f : FUTURES) { + f.get(); + } + FUTURES.clear(); + } + } finally { + ChattyServlet.printMemUsage(); + servlet.wakeUp(); + for (Future<Void> f : stuck) { + f.get(); + } + } + } finally { + webMgr.stop(); + } + } + + private ArrayList<Future<Void>> started() { + ArrayList<Future<Void>> started = new ArrayList<>(FUTURES); + FUTURES.clear(); + return started; + } + @Test public void testChattyServer() throws Exception { - ChattyServlet.printMemUsage(); int numRequests = 64; int numExecutors = 32; int serverQueueSize = 32; + ChattyServlet.printMemUsage(); WebManager webMgr = new WebManager(); HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); @@ -107,8 +184,8 @@ webMgr.start(); try { request(numRequests); - for (Thread thread : THREADS) { - thread.join(); + for (Future<Void> thread : FUTURES) { + thread.get(); } Assert.assertEquals(numRequests, SUCCESS_COUNT.get()); Assert.assertEquals(0, UNAVAILABLE_COUNT.get()); @@ -120,10 +197,12 @@ @Test public void testMalformedString() throws Exception { + int numExecutors = 16; + int serverQueueSize = 16; WebManager webMgr = new WebManager(); HttpServer server = - new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE); - SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH }); + new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize); + SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH }); server.addServlet(servlet); webMgr.add(server); webMgr.start(); @@ -160,9 +239,9 @@ private void request(int count) { for (int i = 0; i < count; i++) { - Thread next = new Thread(() -> { + Future<Void> next = executor.submit(() -> { try { - HttpUriRequest request = request(null); + HttpUriRequest request = post(null); HttpResponse response = executeHttpRequest(request); if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) { SUCCESS_COUNT.incrementAndGet(); @@ -183,10 +262,11 @@ IOUtils.closeQuietly(in); } catch (Throwable th) { th.printStackTrace(); + throw th; } + return null; }); - THREADS.add(next); - next.start(); + FUTURES.add(next); } } @@ -200,10 +280,26 @@ } } - protected HttpUriRequest request(String query) throws URISyntaxException { + protected HttpUriRequest get(String query) throws URISyntaxException { URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null); RequestBuilder builder = RequestBuilder.get(uri); builder.setCharset(StandardCharsets.UTF_8); return builder.build(); } + + protected HttpUriRequest post(String query) throws URISyntaxException { + URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null); + RequestBuilder builder = RequestBuilder.post(uri); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < 32; i++) { + str.append("This is a string statement that will be ignored"); + str.append('\n'); + } + String statement = str.toString(); + builder.setHeader("Content-type", "application/x-www-form-urlencoded"); + builder.addParameter("statement", statement); + builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8)); + builder.setCharset(StandardCharsets.UTF_8); + return builder.build(); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1955 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ia0e3f3e6e2f94a31f296b3491a07f624a4fea604 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
