abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1972
Change subject: [NO ISSUE][OTH] Interrupt Http Executor on Client Close
......................................................................
[NO ISSUE][OTH] Interrupt Http Executor on Client Close
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Previously, we didn't know that an Http client closed the
connection until we try to write and find that the channel has
been closed.
- After this change, the moment the channel is closed, the http
task is interrupted.
Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
---
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
M
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
A
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
M
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
4 files changed, 187 insertions(+), 92 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/72/1972/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 46b693b..154cff6 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.http.server;
import java.io.IOException;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -91,12 +92,13 @@
return;
}
handler = new HttpRequestHandler(ctx, servlet, servletRequest,
chunkSize);
- submit();
+ submit(ctx);
}
- private void submit() throws IOException {
+ private void submit(ChannelHandlerContext ctx) throws IOException {
try {
- server.getExecutor().submit(handler);
+ Future<Void> task = server.getExecutor().submit(handler);
+ ctx.channel().closeFuture().addListener(future ->
task.cancel(true));
} catch (RejectedExecutionException e) { // NOSONAR
LOGGER.log(Level.WARNING, "Request rejected by server executor
service. " + e.getMessage());
handler.reject();
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 6bfa0cf..2a5a0a9 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -31,6 +31,7 @@
public class SleepyServlet extends AbstractServlet {
private volatile boolean sleep = true;
+ private int numSlept = 0;
public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
@@ -46,8 +47,11 @@
response.setStatus(HttpResponseStatus.OK);
if (sleep) {
synchronized (this) {
- while (sleep) {
- this.wait();
+ if (sleep) {
+ incrementSleptCount();
+ while (sleep) {
+ this.wait();
+ }
}
}
}
@@ -55,6 +59,15 @@
response.outputStream().write("I am playing hard to
get".getBytes(StandardCharsets.UTF_8));
}
+ private void incrementSleptCount() {
+ numSlept++;
+ notifyAll();
+ }
+
+ public int getNumSlept() {
+ return numSlept;
+ }
+
public synchronized void wakeUp() {
sleep = false;
notifyAll();
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
new file mode 100644
index 0000000..17f6f9a
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpRequestTask implements Callable<Void> {
+
+ protected final HttpUriRequest request;
+
+ protected HttpRequestTask() throws URISyntaxException {
+ request = post(null);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ HttpResponse response = executeHttpRequest(request);
+ if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.OK.code()) {
+ HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+ } else if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.SERVICE_UNAVAILABLE.code()) {
+ HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+ } else {
+ HttpServerTest.OTHER_COUNT.incrementAndGet();
+ }
+ InputStream in = response.getEntity().getContent();
+ if (HttpServerTest.PRINT_TO_CONSOLE) {
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(in));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ IOUtils.closeQuietly(in);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ return null;
+ }
+
+ protected HttpResponse executeHttpRequest(HttpUriRequest method) throws
Exception {
+ HttpClient client =
HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+ try {
+ return client.execute(method);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected HttpUriRequest get(String query) throws URISyntaxException {
+ URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST,
HttpServerTest.PORT, HttpServerTest.PATH,
+ query, null);
+ RequestBuilder builder = RequestBuilder.get(uri);
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+
+ protected HttpUriRequest post(String query) throws URISyntaxException {
+ URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST,
HttpServerTest.PORT, HttpServerTest.PATH,
+ query, null);
+ RequestBuilder builder = RequestBuilder.post(uri);
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < 32; i++) {
+ str.append("This is a string statement that will be ignored");
+ str.append('\n');
+ }
+ String statement = str.toString();
+ builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+ builder.addParameter("statement", statement);
+ builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 66d1b77..ef26534 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -19,15 +19,12 @@
package org.apache.hyracks.http.test;
import java.io.BufferedReader;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.Socket;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -35,14 +32,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
import org.apache.hyracks.http.servlet.ChattyServlet;
@@ -63,6 +52,7 @@
static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
static final AtomicInteger OTHER_COUNT = new AtomicInteger();
static final List<Future<Void>> FUTURES = new ArrayList<>();
+ static final List<HttpRequestTask> TASKS = new ArrayList<>();
static final ExecutorService executor = Executors.newCachedThreadPool();
@Before
@@ -70,25 +60,26 @@
SUCCESS_COUNT.set(0);
UNAVAILABLE_COUNT.set(0);
OTHER_COUNT.set(0);
+ FUTURES.clear();
+ TASKS.clear();
}
@Test
public void testOverloadingServer() throws Exception {
WebManager webMgr = new WebManager();
int numExecutors = 16;
- int serverQueueSize = 16;
+ int queueSize = 16;
int numRequests = 48;
- HttpServer server =
- new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, serverQueueSize);
+ HttpServer server = new HttpServer(webMgr.getBosses(),
webMgr.getWorkers(), PORT, numExecutors, queueSize);
SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] {
PATH });
server.addServlet(servlet);
webMgr.add(server);
webMgr.start();
- int expectedSuccess = numExecutors + serverQueueSize;
+ int expectedSuccess = numExecutors + queueSize;
int expectedUnavailable = numRequests - expectedSuccess;
try {
request(expectedSuccess);
- waitTillQueued(server, serverQueueSize);
+ waitTillQueued(server, queueSize);
ArrayList<Future<Void>> successSet = started();
request(expectedUnavailable);
ArrayList<Future<Void>> rejectedSet = started();
@@ -111,17 +102,55 @@
}
private void waitTillQueued(HttpServer server, int expectedQueued) throws
Exception {
- int maxAttempts = 5;
+ int maxAttempts = 10;
int attempt = 0;
int queued = server.getWorkQueueSize();
while (queued != expectedQueued) {
attempt++;
if (attempt > maxAttempts) {
throw new Exception("Number of queued requests (" + queued +
") didn't match the expected number ("
- + expectedQueued + ")");
+ + expectedQueued + ") during " + maxAttempts + "s");
}
Thread.sleep(1000); // NOSONAR polling is the clean way
queued = server.getWorkQueueSize();
+ }
+ }
+
+ @Test
+ public void testInterruptOnClientClose() throws Exception {
+ WebManager webMgr = new WebManager();
+ int numExecutors = 1;
+ int queueSize = 1;
+ HttpServer server = new HttpServer(webMgr.getBosses(),
webMgr.getWorkers(), PORT, numExecutors, queueSize);
+ SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] {
PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ request(1);
+ synchronized (servlet) {
+ while (servlet.getNumSlept() == 0) {
+ servlet.wait();
+ }
+ }
+ request(1);
+ waitTillQueued(server, 1);
+ FUTURES.remove(0);
+ HttpRequestTask request = TASKS.remove(0);
+ request.request.abort();
+ waitTillQueued(server, 0);
+ synchronized (servlet) {
+ while (servlet.getNumSlept() == 1) {
+ servlet.wait();
+ }
+ }
+ servlet.wakeUp();
+ for (Future<Void> f : FUTURES) {
+ f.get();
+ }
+ FUTURES.clear();
+ } finally {
+ webMgr.stop();
}
}
@@ -130,17 +159,16 @@
WebManager webMgr = new WebManager();
int numRequests = 64;
int numExecutors = 2;
- int serverQueueSize = 2;
+ int queueSize = 2;
int numPatches = 60;
- HttpServer server =
- new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, serverQueueSize);
+ HttpServer server = new HttpServer(webMgr.getBosses(),
webMgr.getWorkers(), PORT, numExecutors, queueSize);
SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] {
PATH });
server.addServlet(servlet);
webMgr.add(server);
webMgr.start();
- request(numExecutors + serverQueueSize);
+ request(numExecutors + queueSize);
ArrayList<Future<Void>> stuck = started();
- waitTillQueued(server, serverQueueSize);
+ waitTillQueued(server, queueSize);
try {
try {
for (int i = 0; i < numPatches; i++) {
@@ -166,6 +194,7 @@
private ArrayList<Future<Void>> started() {
ArrayList<Future<Void>> started = new ArrayList<>(FUTURES);
FUTURES.clear();
+ TASKS.clear();
return started;
}
@@ -198,10 +227,9 @@
@Test
public void testMalformedString() throws Exception {
int numExecutors = 16;
- int serverQueueSize = 16;
+ int queueSize = 16;
WebManager webMgr = new WebManager();
- HttpServer server =
- new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT,
numExecutors, serverQueueSize);
+ HttpServer server = new HttpServer(webMgr.getBosses(),
webMgr.getWorkers(), PORT, numExecutors, queueSize);
SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] {
PATH });
server.addServlet(servlet);
webMgr.add(server);
@@ -237,69 +265,12 @@
f.set(obj, value);
}
- private void request(int count) {
+ private void request(int count) throws URISyntaxException {
for (int i = 0; i < count; i++) {
- Future<Void> next = executor.submit(() -> {
- try {
- HttpUriRequest request = post(null);
- HttpResponse response = executeHttpRequest(request);
- if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.OK.code()) {
- SUCCESS_COUNT.incrementAndGet();
- } else if (response.getStatusLine().getStatusCode() ==
HttpResponseStatus.SERVICE_UNAVAILABLE
- .code()) {
- UNAVAILABLE_COUNT.incrementAndGet();
- } else {
- OTHER_COUNT.incrementAndGet();
- }
- InputStream in = response.getEntity().getContent();
- if (PRINT_TO_CONSOLE) {
- BufferedReader reader = new BufferedReader(new
InputStreamReader(in));
- String line = null;
- while ((line = reader.readLine()) != null) {
- System.out.println(line);
- }
- }
- IOUtils.closeQuietly(in);
- } catch (Throwable th) {
- th.printStackTrace();
- throw th;
- }
- return null;
- });
+ HttpRequestTask requestTask = new HttpRequestTask();
+ Future<Void> next = executor.submit(requestTask);
FUTURES.add(next);
+ TASKS.add(requestTask);
}
- }
-
- protected HttpResponse executeHttpRequest(HttpUriRequest method) throws
Exception {
- HttpClient client =
HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
- try {
- return client.execute(method);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- protected HttpUriRequest get(String query) throws URISyntaxException {
- URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
- RequestBuilder builder = RequestBuilder.get(uri);
- builder.setCharset(StandardCharsets.UTF_8);
- return builder.build();
- }
-
- protected HttpUriRequest post(String query) throws URISyntaxException {
- URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
- RequestBuilder builder = RequestBuilder.post(uri);
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < 32; i++) {
- str.append("This is a string statement that will be ignored");
- str.append('\n');
- }
- String statement = str.toString();
- builder.setHeader("Content-type", "application/x-www-form-urlencoded");
- builder.addParameter("statement", statement);
- builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
- builder.setCharset(StandardCharsets.UTF_8);
- return builder.build();
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1972
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>