abdullah alamoudi has submitted this change and it was merged. Change subject: Use Chunked Http Response ......................................................................
Use Chunked Http Response Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1474 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java R hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java R hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java R hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java A hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java A hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java 35 files changed, 583 insertions(+), 312 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index c38e0a9..ab05f10 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -52,10 +52,11 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; @@ -107,7 +108,7 @@ String printJob = request.getParameter("print-job"); String executeQuery = request.getParameter("execute-query"); try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure setting content type", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); @@ -158,7 +159,7 @@ if ("/".equals(requestURI)) { try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure setting content type", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); @@ -177,7 +178,7 @@ // Special handler for font files and .png resources if (resourcePath.endsWith(".png")) { BufferedImage img = ImageIO.read(is); - IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG); + ServletUtils.setContentType(response, IServlet.ContentType.IMG_PNG); OutputStream outputStream = response.outputStream(); String formatName = "png"; ImageIO.write(img, formatName, outputStream); @@ -185,7 +186,7 @@ return; } String type = IServlet.ContentType.mime(QueryWebInterfaceServlet.extension(resourcePath)); - IServletResponse.setContentType(response, "".equals(type) ? IServlet.ContentType.TEXT_PLAIN : type, + ServletUtils.setContentType(response, "".equals(type) ? IServlet.ContentType.TEXT_PLAIN : type, IServlet.Encoding.UTF8); writeOutput(response, is, resourcePath); } catch (IOException e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index 6fd6c47..038ed2f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -32,10 +32,11 @@ import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.utils.JSONUtil; import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -66,7 +67,7 @@ } protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException { - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); PrintWriter responseWriter = response.writer(); try { ObjectNode json; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java index 4419e8a..3d9167e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java @@ -27,9 +27,10 @@ import java.util.logging.Logger; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -57,7 +58,7 @@ } else { json = processNode(request, hcc); } - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); responseWriter.write(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json)); } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown response.setStatus(HttpResponseStatus.NOT_FOUND); @@ -69,8 +70,7 @@ responseWriter.flush(); } - private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) - throws Exception { + private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) throws Exception { String pathInfo = path(request); if (pathInfo.endsWith("/")) { throw new IllegalArgumentException(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java index d832672..caa00f1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java @@ -38,10 +38,11 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -71,7 +72,7 @@ } response.setStatus(HttpResponseStatus.OK); try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure setting content type", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); @@ -107,8 +108,8 @@ return; } boolean temp = dataset.getDatasetDetails().isTemp(); - FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, - datasetName, temp); + FileSplit[] fileSplits = + metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp); ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); List<List<String>> primaryKeys = DatasetUtils.getPartitioningKeys(dataset); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java index bcc6914..be6e280 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java @@ -23,7 +23,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.hyracks.http.server.IServletRequest; +import org.apache.hyracks.http.api.IServletRequest; public class DdlApiServlet extends RestApiServlet { private static final byte ALLOWED_CATEGORIES = diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java index d91352d..e6a32a3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java @@ -37,9 +37,10 @@ import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.runtime.util.AppContextInfo; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -56,7 +57,7 @@ @Override protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException { - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); PrintWriter responseWriter = response.writer(); ObjectNode json; ObjectMapper om = new ObjectMapper(); @@ -95,7 +96,8 @@ Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new HashMap<>(); for (String nc : AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames()) { Map<String, Future<ObjectNode>> ncData = new HashMap<>(); - ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc))))); + ncData.put("threaddump", + executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc))))); ncData.put("config", executor .submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(nc, false, true))))); ncData.put("stats", executor.submit(() -> fixupKeys(processNodeStats(hcc, nc)))); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java index ac79088..7fe7370 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java @@ -31,10 +31,11 @@ import javax.imageio.ImageIO; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import io.netty.handler.codec.http.HttpResponseStatus; @@ -53,7 +54,7 @@ String requestURI = request.getHttpRequest().uri(); if ("/".equals(requestURI)) { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML); resourcePath = "/feed/home.html"; } else { resourcePath = requestURI; @@ -71,12 +72,12 @@ BufferedImage img = ImageIO.read(is); OutputStream outputStream = response.outputStream(); String formatName = "png"; - IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG); + ServletUtils.setContentType(response, IServlet.ContentType.IMG_PNG); ImageIO.write(img, formatName, outputStream); return; } - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); InputStreamReader isr = new InputStreamReader(is); StringBuilder sb = new StringBuilder(); BufferedReader br = new BufferedReader(isr); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java index 7788136..9c08fbd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java @@ -23,7 +23,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.hyracks.http.server.IServletRequest; +import org.apache.hyracks.http.api.IServletRequest; public class FullApiServlet extends RestApiServlet { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java index c1423e7..2dbaa54 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java @@ -31,9 +31,10 @@ import org.apache.asterix.runtime.util.ClusterStateManager; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -66,7 +67,7 @@ } else { json = processNode(request, hcc); } - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json)); } catch (IllegalStateException e) { // NOSONAR - exception not logged or rethrown response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE); @@ -80,8 +81,7 @@ responseWriter.flush(); } - private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) - throws Exception { + private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc) throws Exception { String pathInfo = path(request); if (pathInfo.endsWith("/")) { throw new IllegalArgumentException(); @@ -172,7 +172,7 @@ } final JsonNode value = valueArray.get(index); json.remove(key); - json.set(key.replaceAll("s$",""), value); + json.set(key.replaceAll("s$", ""), value); } } } @@ -205,8 +205,7 @@ String dump = hcc.getThreadDump(node); if (dump == null) { // check to see if this is a node that is simply down - throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null - ? new IllegalStateException() + throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null ? new IllegalStateException() : new IllegalArgumentException(); } return (ObjectNode) om.readTree(dump); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java index 917d9a8..160c801 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java @@ -23,7 +23,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.hyracks.http.server.IServletRequest; +import org.apache.hyracks.http.api.IServletRequest; public class QueryApiServlet extends RestApiServlet { private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 6240f51..06ddf44 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -39,10 +39,11 @@ import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -67,7 +68,7 @@ response.setStatus(HttpResponseStatus.OK); // TODO this seems wrong ... try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure setting content type", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index b302bab..43530ea 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -53,10 +53,11 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -74,8 +75,7 @@ private final IStatementExecutorFactory statementExecutorFactory; public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, - ILangCompilationProvider compilationProvider, - IStatementExecutorFactory statementExecutorFactory) { + ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory) { super(ctx, paths); this.compilationProvider = compilationProvider; this.statementExecutorFactory = statementExecutorFactory; @@ -325,8 +325,8 @@ SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(",\n"); SessionConfig.OutputFormat format = getFormat(param.format); - SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, - handlePrefix, handlePostfix); + SessionConfig sessionConfig = + new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, handlePrefix, handlePostfix); sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true); sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty); sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD, @@ -482,7 +482,7 @@ QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode); SessionConfig sessionConfig = createSessionConfig(param, resultWriter); - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); HttpResponseStatus status = HttpResponseStatus.OK; Stats stats = new Stats(); @@ -517,8 +517,8 @@ IParser parser = compilationProvider.getParserFactory().createParser(param.statement); List<Statement> statements = parser.parse(); MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(statements, sessionConfig, - compilationProvider); + IStatementExecutor translator = + statementExecutorFactory.create(statements, sessionConfig, compilationProvider); execStart = System.nanoTime(); translator.compileAndExecute(hcc, hds, delivery, stats); execEnd = System.nanoTime(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java index 197a39c..039c740 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java @@ -33,10 +33,11 @@ import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -60,7 +61,7 @@ } response.setStatus(HttpResponseStatus.OK); try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure setting content type", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java index d924cf1..fac5883 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryWebInterfaceServlet.java @@ -29,10 +29,11 @@ import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.runtime.util.AppContextInfo; import org.apache.commons.io.IOUtils; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -70,7 +71,7 @@ response.setStatus(HttpResponseStatus.OK); if ("/".equals(requestURI)) { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_HTML); resourcePath = "/queryui/queryui.html"; } else { resourcePath = requestURI; @@ -87,7 +88,7 @@ String mime = IServlet.ContentType.mime(extension); if (mime != null) { OutputStream out = response.outputStream(); - IServletResponse.setContentType(response, mime); + ServletUtils.setContentType(response, mime); try { IOUtils.copy(is, out); } catch (Exception e) { @@ -106,7 +107,7 @@ } private void doPost(IServletResponse response) throws IOException { - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); ExternalProperties externalProperties = AppContextInfo.INSTANCE.getExternalProperties(); response.setStatus(HttpResponseStatus.OK); ObjectMapper om = new ObjectMapper(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index c2d1d33..787ff47 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -40,6 +40,7 @@ import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; @@ -47,11 +48,13 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -78,17 +81,13 @@ * based on the Accept: header and other servlet parameters. */ static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8); - + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8); // CLEAN_JSON output is the default; most generally useful for a // programmatic HTTP API OutputFormat format = OutputFormat.CLEAN_JSON; // First check the "output" servlet parameter. String output = request.getParameter("output"); - String accept = request.getHeader("Accept"); - if (accept == null) { - accept = ""; - } + String accept = request.getHeader("Accept", ""); if (output != null) { if ("CSV".equals(output)) { format = OutputFormat.CSV; @@ -114,22 +113,12 @@ SessionConfig.ResultDecorator handlePrefix = (AlgebricksAppendable app) -> app.append("{ \"").append("handle").append("\": "); SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(" }"); - SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, handlePrefix, handlePostfix); // If it's JSON or ADM, check for the "wrapper-array" flag. Default is // "true" for JSON and "false" for ADM. (Not applicable for CSV.) - boolean wrapperArray; - switch (format) { - case CLEAN_JSON: - case LOSSLESS_JSON: - wrapperArray = true; - break; - default: - wrapperArray = false; - break; - } + boolean wrapperArray = format == OutputFormat.CLEAN_JSON || format == OutputFormat.LOSSLESS_JSON; String wrapperParam = request.getParameter("wrapper-array"); if (wrapperParam != null) { wrapperArray = Boolean.valueOf(wrapperParam); @@ -142,22 +131,24 @@ // Now that format is set, output the content-type switch (format) { case ADM: - IServletResponse.setContentType(response, "application/x-adm"); + ServletUtils.setContentType(response, "application/x-adm"); break; case CLEAN_JSON: // No need to reflect "clean-ness" in output type; fall through case LOSSLESS_JSON: - IServletResponse.setContentType(response, "application/json"); + ServletUtils.setContentType(response, "application/json"); break; case CSV: // Check for header parameter or in Accept:. if ("present".equals(request.getParameter("header")) || accept.contains("header=present")) { - IServletResponse.setContentType(response, "text/csv; header=present"); + ServletUtils.setContentType(response, "text/csv; header=present"); sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, true); } else { - IServletResponse.setContentType(response, "text/csv; header=absent"); + ServletUtils.setContentType(response, "text/csv; header=absent"); } break; + default: + throw new IOException("Unknown format " + format); } return sessionConfig; } @@ -169,45 +160,49 @@ // enable cross-origin resource sharing response.setHeader("Access-Control-Allow-Origin", "*"); response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept"); - SessionConfig sessionConfig = initResponse(request, response); QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request); - try { - response.setStatus(HttpResponseStatus.OK); - IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); - if (hds == null) { - synchronized (ctx) { - hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); - if (hds == null) { - hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS); - ctx.put(HYRACKS_DATASET_ATTR, hds); - } + doHandle(response, query, sessionConfig, resultDelivery); + } catch (Exception e) { + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + LOGGER.log(Level.WARNING, "Failure handling request", e); + return; + } + } + + private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig, + ResultDelivery resultDelivery) throws JsonProcessingException { + try { + response.setStatus(HttpResponseStatus.OK); + IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); + if (hds == null) { + synchronized (ctx) { + hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); + if (hds == null) { + hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS); + ctx.put(HYRACKS_DATASET_ATTR, hds); } } - IParser parser = parserFactory.createParser(query); - List<Statement> aqlStatements = parser.parse(); - validate(aqlStatements); - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = - statementExecutorFactory.create(aqlStatements, sessionConfig, compilationProvider); - translator.compileAndExecute(hcc, hds, resultDelivery); - } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); - String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query); - ObjectNode errorResp = - ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe)); - sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp)); - response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); - ResultUtil.apiErrorHandler(sessionConfig.out(), e); - response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Failure handling request", e); + IParser parser = parserFactory.createParser(query); + List<Statement> aqlStatements = parser.parse(); + validate(aqlStatements); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = + statementExecutorFactory.create(aqlStatements, sessionConfig, compilationProvider); + translator.compileAndExecute(hcc, hds, resultDelivery); + } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - return; + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); + String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query); + ObjectNode errorResp = + ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe)); + sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp)); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + ResultUtil.apiErrorHandler(sessionConfig.out(), e); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java index 25be651..dc48288 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java @@ -30,10 +30,11 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.runtime.util.ClusterStateManager; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -70,8 +71,7 @@ }, "Shutdown Servlet Worker"); try { - IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, - IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure handling request", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java index 1c173d2..0a0e680 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UpdateApiServlet.java @@ -23,7 +23,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.hyracks.http.server.IServletRequest; +import org.apache.hyracks.http.api.IServletRequest; public class UpdateApiServlet extends RestApiServlet { private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY | Statement.Category.UPDATE; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java index e5ceccc..5899660 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java @@ -28,10 +28,11 @@ import java.util.logging.Logger; import org.apache.asterix.runtime.util.AppContextInfo; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.http.server.IServlet; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.server.util.ServletUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -61,7 +62,7 @@ responseObject.put(e.getKey(), e.getValue()); } try { - IServletResponse.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8); + ServletUtils.setContentType(response, IServlet.ContentType.TEXT_PLAIN, IServlet.Encoding.UTF8); } catch (IOException e) { LOGGER.log(Level.WARNING, "Failure handling request", e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 3a8852e..b17a722 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -72,8 +72,8 @@ import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.http.api.IServlet; import org.apache.hyracks.http.server.HttpServer; -import org.apache.hyracks.http.server.IServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletMapping; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java index b1fdab5..d34a9cf 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiLetTest.java @@ -43,8 +43,8 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.junit.Assert; import org.junit.Test; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java index 936b717..b482948 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiLetTest.java @@ -35,8 +35,8 @@ import org.apache.asterix.runtime.util.AppContextInfo; import org.apache.asterix.test.runtime.SqlppExecutionTest; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.http.server.IServletRequest; -import org.apache.hyracks.http.server.IServletResponse; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; import org.junit.Assert; import org.junit.Test; diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java similarity index 98% rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java index 5691fd9..b079d36 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.http.server; +package org.apache.hyracks.http.api; import java.util.concurrent.ConcurrentMap; diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java similarity index 67% rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java index 8aebd07..610c3d1 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletRequest.java @@ -16,10 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.http.server; - -import java.util.List; -import java.util.Map; +package org.apache.hyracks.http.api; import io.netty.handler.codec.http.FullHttpRequest; @@ -34,6 +31,7 @@ /** * Get a request parameter + * * @param name * @return the parameter or null if not found */ @@ -41,23 +39,21 @@ /** * Get a request header + * * @param name * @return the header or null if not found */ String getHeader(CharSequence name); - static String getParameter(Map<String, List<String>> parameters, CharSequence name) { - List<String> parameter = parameters.get(name); - if (parameter == null) { - return null; - } else if (parameter.size() == 1) { - return parameter.get(0); - } else { - StringBuilder aString = new StringBuilder(parameter.get(0)); - for (int i = 1; i < parameter.size(); i++) { - aString.append(",").append(parameter.get(i)); - } - return aString.toString(); - } + /** + * Get a request header if found, return the default value, otherwise + * + * @param name + * @param defaultValue + * @return the header or defaultValue if not found + */ + default String getHeader(CharSequence name, String defaultValue) { + String value = getHeader(name); + return value == null ? defaultValue : value; } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java similarity index 60% rename from hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java rename to hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java index 342e643..1a7c65f 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/IServletResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.http.server; +package org.apache.hyracks.http.api; import java.io.Closeable; import java.io.IOException; @@ -24,54 +24,62 @@ import java.io.PrintWriter; import io.netty.channel.ChannelFuture; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponseStatus; /** - * A response to an instance of IServLetRequest + * A response to an instance of IServletRequest */ public interface IServletResponse extends Closeable { /** * Set a response header + * * @param name + * the name of the header * @param value + * the value of the header * @return - * @throws Exception + * the servlet response with the header set + * @throws IOException */ IServletResponse setHeader(CharSequence name, Object value) throws IOException; /** - * Get the output writer for the response - * @return - * @throws Exception - */ - PrintWriter writer(); - - /** - * Send the last http response if any and return the future - * @return - * @throws Exception - */ - ChannelFuture future() throws IOException; - - /** * Set the status of the http response + * * @param status */ void setStatus(HttpResponseStatus status); /** + * Get the output writer for the response which writes to the response output stream + * + * @return the response writer + */ + PrintWriter writer(); + + /** * Get the output stream for the response - * @return + * + * @return the response output stream */ OutputStream outputStream(); - public static void setContentType(IServletResponse response, String type, String charset) throws IOException { - response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset); - } + /** + * Get last content future + * Must only be called after the servlet response has been closed + * Used to listen to events about the last content sent through the network + * For example, to close the connection after the event has been completed + * lastContentFuture().addListener(ChannelFutureListener.CLOSE); + * + * @return + * @throws IOException + */ + ChannelFuture lastContentFuture() throws IOException; - public static void setContentType(IServletResponse response, String type) throws IOException { - response.setHeader(HttpHeaderNames.CONTENT_TYPE, type); - } + /** + * Notifies the response that the channel has become writable + * became writable or unwritable. Used for flow control + */ + void notifyChannelWritable(); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java index 22bbc50..7d24994 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java @@ -20,6 +20,9 @@ import java.util.concurrent.ConcurrentMap; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; + public abstract class AbstractServlet implements IServlet { protected final String[] paths; protected final ConcurrentMap<String, Object> ctx; diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java index 984122b..65cdd52 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -28,26 +30,24 @@ public class ChunkedNettyOutputStream extends OutputStream { + private static final Logger LOGGER = Logger.getLogger(ChunkedNettyOutputStream.class.getName()); private final ChannelHandlerContext ctx; private final ChunkedResponse response; private ByteBuf buffer; - public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize, - ChunkedResponse response) { + public ChunkedNettyOutputStream(ChannelHandlerContext ctx, int chunkSize, ChunkedResponse response) { this.response = response; this.ctx = ctx; buffer = ctx.alloc().buffer(chunkSize); } @Override - public synchronized void write(byte[] b, int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length)) { + public void write(byte[] b, int off, int len) throws IOException { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } - if (len > buffer.capacity()) { flush(); flush(b, off, len); @@ -64,45 +64,68 @@ } @Override - public synchronized void write(int b) { - if (buffer.writableBytes() > 0) { - buffer.writeByte(b); - } else { + public void write(int b) throws IOException { + if (!buffer.isWritable()) { flush(); - buffer.writeByte(b); } + buffer.writeByte(b); } @Override - public synchronized void close() throws IOException { - flush(); - buffer.release(); + public void close() throws IOException { + if (response.isHeaderSent() || response.status() != HttpResponseStatus.OK) { + flush(); + buffer.release(); + } else { + response.fullReponse(buffer); + } super.close(); } @Override - public synchronized void flush() { + public void flush() throws IOException { + ensureWritable(); if (buffer.readableBytes() > 0) { - int size = buffer.capacity(); if (response.status() == HttpResponseStatus.OK) { - response.flush(); + int size = buffer.capacity(); + response.beforeFlush(); DefaultHttpContent content = new DefaultHttpContent(buffer); - ctx.write(content); + ctx.write(content, ctx.channel().voidPromise()); + buffer = ctx.alloc().buffer(size); } else { - response.error(buffer); + ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes()); + aBuffer.writeBytes(buffer); + response.error(aBuffer); } - buffer = ctx.alloc().buffer(size); } } - private synchronized void flush(byte[] buf, int offset, int len) { + private void flush(byte[] buf, int offset, int len) throws IOException { + ensureWritable(); ByteBuf aBuffer = ctx.alloc().buffer(len); aBuffer.writeBytes(buf, offset, len); if (response.status() == HttpResponseStatus.OK) { - response.flush(); - ctx.write(new DefaultHttpContent(aBuffer)); + response.beforeFlush(); + ctx.write(new DefaultHttpContent(aBuffer), ctx.channel().voidPromise()); } else { response.error(aBuffer); } } + + private synchronized void ensureWritable() throws IOException { + while (!ctx.channel().isWritable()) { + try { + ctx.flush(); + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.log(Level.WARNING, "Interupted while waiting for channel to be writable", e); + throw new IOException(e); + } + } + } + + public synchronized void resume() { + notifyAll(); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java index 19c2664..cb0cd80 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java @@ -18,25 +18,44 @@ */ package org.apache.hyracks.http.server; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import org.apache.hyracks.http.api.IServletResponse; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; +/** + * A chunked http response. Here is how it is expected to work: + * If the response is a success aka 200 and is less than chunkSize, then it is sent as a single http response message + * If the response is larger than the chunkSize and the response status is 200, then it is sent as chunks of chunkSize. + * If the response status is non 200, then it is always sent as a single http response message. + * If the response status is non 200, then output buffered before setting the response status is discarded. + * If flush() is called on the writer and even if it is less than chunkSize, then the initial response will be sent + * with headers, followed by the buffered bytes as the first chunk. + * When chunking, an output buffer is allocated only when the previous buffer has been sent + * If an error occurs after sending the first chunk, the connection will close abruptly. + * + * Here is a breakdown of the possible cases. + * 1. smaller than chunkSize, no error -> full response + * 2. smaller than chunkSize, error -> full response + * 3. larger than chunkSize, error after header-> close connection. release buffer and release error + * 4. larger than chunkSize, no error. -> header, data, empty response + */ public class ChunkedResponse implements IServletResponse { private final ChannelHandlerContext ctx; private final ChunkedNettyOutputStream outputStream; @@ -45,12 +64,13 @@ private boolean headerSent; private ByteBuf error; private ChannelFuture future; + private boolean done; - public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request) { + public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) { this.ctx = ctx; - outputStream = new ChunkedNettyOutputStream(ctx, 4096, this); + outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this); writer = new PrintWriter(outputStream); - response = new DefaultHttpResponse(HTTP_1_1, OK); + response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); if (HttpUtil.isKeepAlive(request)) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); @@ -67,7 +87,7 @@ } @Override - public ChannelFuture future() { + public ChannelFuture lastContentFuture() { return future; } @@ -78,25 +98,45 @@ @Override public void close() throws IOException { - if (error == null) { - writer.close(); - future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + writer.close(); + if (error == null && response.status() == HttpResponseStatus.OK) { + if (!done) { + future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } + } else { + // There was an error + if (headerSent) { + if (error != null) { + error.release(); + } + future = ctx.channel().close(); + } else { + // we didn't send anything to the user, we need to send an unchunked error response + fullResponse(response.protocolVersion(), response.status(), + error == null ? ctx.alloc().buffer(0, 0) : error, response.headers()); + } } + done = true; } public HttpResponseStatus status() { return response.status(); } - public void flush() { + public void beforeFlush() { if (!headerSent && response.status() == HttpResponseStatus.OK) { - ctx.writeAndFlush(response); + ctx.write(response, ctx.channel().voidPromise()); headerSent = true; } } public void error(ByteBuf error) { - this.error = error; + if (this.error == null) { + this.error = error; + } else { + this.error.capacity(this.error.capacity() + error.capacity()); + this.error.writeBytes(error); + } } @Override @@ -106,8 +146,28 @@ @Override public void setStatus(HttpResponseStatus status) { - // update the response - // close the stream - // write the response + response.setStatus(status); + } + + public boolean isHeaderSent() { + return headerSent; + } + + public void fullReponse(ByteBuf buffer) { + fullResponse(response.protocolVersion(), response.status(), buffer, response.headers()); + } + + private void fullResponse(HttpVersion version, HttpResponseStatus status, ByteBuf buffer, HttpHeaders headers) { + DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(version, status, buffer); + fullResponse.headers().set(headers); + fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); + future = ctx.writeAndFlush(fullResponse); + headerSent = true; + done = true; + } + + @Override + public void notifyChannelWritable() { + outputStream.resume(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java index 245f28a..1023686 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java @@ -23,6 +23,8 @@ import java.io.OutputStream; import java.io.PrintWriter; +import org.apache.hyracks.http.api.IServletResponse; + import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -80,7 +82,7 @@ } @Override - public ChannelFuture future() throws IOException { + public ChannelFuture lastContentFuture() throws IOException { return future; } @@ -93,4 +95,10 @@ public void setStatus(HttpResponseStatus status) { response.setStatus(status); } + + @Override + public void notifyChannelWritable() { + // Do nothing. + // This response is sent as a single piece + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java index 0b80a78..8d308d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/GetRequest.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Map; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.server.util.ServletUtils; + import io.netty.handler.codec.http.FullHttpRequest; public class GetRequest implements IServletRequest { @@ -39,12 +42,11 @@ @Override public String getParameter(CharSequence name) { - return IServletRequest.getParameter(parameters, name); + return ServletUtils.getParameter(parameters, name); } @Override public String getHeader(CharSequence name) { return request.headers().get(name); } - } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java new file mode 100644 index 0000000..418cd26 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.server; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; + +public class HttpRequestHandler implements Callable<Void> { + private static final Logger LOGGER = Logger.getLogger(HttpRequestHandler.class.getName()); + private final ChannelHandlerContext ctx; + private final IServlet servlet; + private final IServletRequest request; + private final IServletResponse response; + + public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) { + this.ctx = ctx; + this.servlet = servlet; + this.request = request; + response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest()) + : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize); + request.getHttpRequest().retain(); + } + + @Override + public Void call() throws Exception { + try { + ChannelFuture lastContentFuture = handle(); + if (!HttpUtil.isKeepAlive(request.getHttpRequest())) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } catch (Throwable th) { //NOSONAR + LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", th); + ctx.close(); + } finally { + request.getHttpRequest().release(); + } + return null; + } + + private ChannelFuture handle() throws IOException { + try { + servlet.handle(request, response); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Failure during handling of an IServletRequest", e); + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } finally { + response.close(); + } + return response.lastContentFuture(); + } + + public void notifyChannelWritable() { + response.notifyChannelWritable(); + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index f7f55bd..302e5f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -23,12 +23,19 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.http.api.IServlet; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.logging.LogLevel; @@ -36,6 +43,8 @@ public class HttpServer { // Constants + private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024; + private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024; private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName()); private static final int FAILED = -1; private static final int STOPPED = 0; @@ -44,23 +53,26 @@ private static final int STOPPING = 3; // Final members private final Object lock = new Object(); + private final AtomicInteger threadId = new AtomicInteger(); private final ConcurrentMap<String, Object> ctx; private final List<IServlet> lets; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final int port; + private final ExecutorService executor; // Mutable members private volatile int state = STOPPED; private Channel channel; private Throwable cause; - public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, - int port) { + public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) { this.bossGroup = bossGroup; this.workerGroup = workerGroup; this.port = port; ctx = new ConcurrentHashMap<>(); lets = new ArrayList<>(); + executor = Executors.newFixedThreadPool(16, + runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement())); } public final void start() throws Exception { // NOSONAR @@ -158,7 +170,6 @@ lets.add(let); } - protected void doStart() throws InterruptedException { /* * This is a hacky way to ensure that ILets with more specific paths are checked first. @@ -172,13 +183,12 @@ */ Collections.sort(lets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length()); ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new HttpServerInitializer(this)); + b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK)) + .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpServerInitializer(this)); channel = b.bind(port).sync().channel(); } - protected void doStop() throws InterruptedException { channel.close(); @@ -212,14 +222,18 @@ return true; } } else if (c == '*') { - return path.regionMatches(path.length() - pathSpec.length() + 1, - pathSpec, 1, pathSpec.length() - 1); + return path.regionMatches(path.length() - pathSpec.length() + 1, pathSpec, 1, pathSpec.length() - 1); } return false; } + private static boolean isPathWildcardMatch(String pathSpec, String path) { int cpl = pathSpec.length() - 2; return (pathSpec.endsWith("/*") && path.regionMatches(0, pathSpec, 0, cpl)) && (path.length() == cpl || '/' == path.charAt(cpl)); } + + public ExecutorService getExecutor() { + return executor; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index c8ed937..2268c2c 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -18,13 +18,12 @@ */ package org.apache.hyracks.http.server; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.channel.ChannelFuture; +import org.apache.hyracks.http.api.IServlet; +import org.apache.hyracks.http.server.util.ServletUtils; + import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -32,21 +31,17 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.QueryStringDecoder; -import io.netty.handler.codec.http.multipart.Attribute; -import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; -import io.netty.handler.codec.http.multipart.InterfaceHttpData; -import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType; -import io.netty.handler.codec.http.multipart.MixedAttribute; public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger LOGGER = Logger.getLogger(HttpServerHandler.class.getName()); protected final HttpServer server; + protected final int chunkSize; + protected HttpRequestHandler handler; - public HttpServerHandler(HttpServer server) { + public HttpServerHandler(HttpServer server, int chunkSize) { this.server = server; + this.chunkSize = chunkSize; } @Override @@ -55,72 +50,34 @@ } @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().isWritable()) { + handler.notifyChannelWritable(); + } + super.channelWritabilityChanged(ctx); + } + + @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { try { - FullHttpRequest http = (FullHttpRequest) msg; - IServlet servlet = server.getServlet(http); + FullHttpRequest request = (FullHttpRequest) msg; + IServlet servlet = server.getServlet(request); if (servlet == null) { - DefaultHttpResponse response = new DefaultHttpResponse(http.protocolVersion(), - HttpResponseStatus.NOT_FOUND); - ctx.write(response).addListener(ChannelFutureListener.CLOSE); + DefaultHttpResponse notFound = + new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.NOT_FOUND); + ctx.write(notFound).addListener(ChannelFutureListener.CLOSE); + } else if (request.method() != HttpMethod.GET && request.method() != HttpMethod.POST) { + DefaultHttpResponse notAllowed = + new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED); + ctx.write(notAllowed).addListener(ChannelFutureListener.CLOSE); } else { - if (http.method() != HttpMethod.GET && http.method() != HttpMethod.POST) { - DefaultHttpResponse response = new DefaultHttpResponse(http.protocolVersion(), - HttpResponseStatus.METHOD_NOT_ALLOWED); - ctx.write(response).addListener(ChannelFutureListener.CLOSE); - return; - } - IServletRequest request = http.method() == HttpMethod.GET ? get(http) : post(http); - IServletResponse response = new FullResponse(ctx, http); - try { - servlet.handle(request, response); - } catch (Throwable th) { // NOSONAR - LOGGER.log(Level.WARNING, "Failure during handling of an IServLetRequest", th); - response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - } finally { - response.close(); - } - ChannelFuture lastContentFuture = response.future(); - if (!HttpUtil.isKeepAlive(http)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } + handler = new HttpRequestHandler(ctx, servlet, ServletUtils.toServletRequest(request), chunkSize); + server.getExecutor().submit(handler); } } catch (Exception e) { LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", e); ctx.close(); } - } - - public static IServletRequest post(FullHttpRequest request) throws IOException { - List<String> names = new ArrayList<>(); - List<String> values = new ArrayList<>(); - HttpPostRequestDecoder decoder = null; - try { - decoder = new HttpPostRequestDecoder(request); - } catch (Exception e) { - //ignore. this means that the body of the POST request does not have key value pairs - LOGGER.log(Level.WARNING, "Failed to decode a post message. Fix the API not to have queries as POST body", - e); - } - if (decoder != null) { - try { - List<InterfaceHttpData> bodyHttpDatas = decoder.getBodyHttpDatas(); - for (InterfaceHttpData data : bodyHttpDatas) { - if (data.getHttpDataType().equals(HttpDataType.Attribute)) { - Attribute attr = (MixedAttribute) data; - names.add(data.getName()); - values.add(attr.getValue()); - } - } - } finally { - decoder.destroy(); - } - } - return new PostRequest(request, new QueryStringDecoder(request.uri()).parameters(), names, values); - } - - public static IServletRequest get(FullHttpRequest request) throws IOException { - return new GetRequest(request, new QueryStringDecoder(request.uri()).parameters()); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java index 3b32ee6..bc67865 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java @@ -27,9 +27,10 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { - private static final int MAX_CHUNK_SIZE = 262144; - private static final int MAX_HEADER_SIZE = 262144; - private static final int MAX_INITIAL_LINE_LENGTH = 131072; + private static final int MAX_REQUEST_CHUNK_SIZE = 262144; + private static final int MAX_REQUEST_HEADER_SIZE = 262144; + private static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072; + private static final int RESPONSE_CHUNK_SIZE = 4096; private HttpServer server; public HttpServerInitializer(HttpServer server) { @@ -39,9 +40,10 @@ @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); - p.addLast(new HttpRequestDecoder(MAX_INITIAL_LINE_LENGTH, MAX_HEADER_SIZE, MAX_CHUNK_SIZE)); + p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, MAX_REQUEST_HEADER_SIZE, + MAX_REQUEST_CHUNK_SIZE)); p.addLast(new HttpResponseEncoder()); p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); - p.addLast(new HttpServerHandler(server)); + p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java index 99f338c..338ef40 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/PostRequest.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Map; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.server.util.ServletUtils; + import io.netty.handler.codec.http.FullHttpRequest; public class PostRequest implements IServletRequest { @@ -49,7 +52,7 @@ return values.get(i); } } - return IServletRequest.getParameter(parameters, name); + return ServletUtils.getParameter(parameters, name); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java new file mode 100644 index 0000000..1ffab6d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/util/ServletUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.http.server.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.GetRequest; +import org.apache.hyracks.http.server.PostRequest; + +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.codec.http.multipart.Attribute; +import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType; +import io.netty.handler.codec.http.multipart.MixedAttribute; + +public class ServletUtils { + private static final Logger LOGGER = Logger.getLogger(ServletUtils.class.getName()); + + private ServletUtils() { + } + + public static String getParameter(Map<String, List<String>> parameters, CharSequence name) { + List<String> parameter = parameters.get(name); + if (parameter == null) { + return null; + } else if (parameter.size() == 1) { + return parameter.get(0); + } else { + StringBuilder aString = new StringBuilder(parameter.get(0)); + for (int i = 1; i < parameter.size(); i++) { + aString.append(",").append(parameter.get(i)); + } + return aString.toString(); + } + } + + public static IServletRequest post(FullHttpRequest request) throws IOException { + List<String> names = new ArrayList<>(); + List<String> values = new ArrayList<>(); + HttpPostRequestDecoder decoder = null; + try { + decoder = new HttpPostRequestDecoder(request); + } catch (Exception e) { + //ignore. this means that the body of the POST request does not have key value pairs + LOGGER.log(Level.WARNING, "Failed to decode a post message. Fix the API not to have queries as POST body", + e); + } + if (decoder != null) { + try { + List<InterfaceHttpData> bodyHttpDatas = decoder.getBodyHttpDatas(); + for (InterfaceHttpData data : bodyHttpDatas) { + if (data.getHttpDataType().equals(HttpDataType.Attribute)) { + Attribute attr = (MixedAttribute) data; + names.add(data.getName()); + values.add(attr.getValue()); + } + } + } finally { + decoder.destroy(); + } + } + return new PostRequest(request, new QueryStringDecoder(request.uri()).parameters(), names, values); + } + + public static IServletRequest get(FullHttpRequest request) throws IOException { + return new GetRequest(request, new QueryStringDecoder(request.uri()).parameters()); + } + + public static IServletRequest toServletRequest(FullHttpRequest request) throws IOException { + return request.method() == HttpMethod.GET ? ServletUtils.get(request) : ServletUtils.post(request); + } + + public static void setContentType(IServletResponse response, String type, String charset) throws IOException { + response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset); + } + + public static void setContentType(IServletResponse response, String type) throws IOException { + response.setHeader(HttpHeaderNames.CONTENT_TYPE, type); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1474 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I249180f58e92058dd3b264ea17c4196b4baf4348 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
