Yingyi Bu has submitted this change and it was merged. Change subject: Extensible exception handling in QueryServiceServlet ......................................................................
Extensible exception handling in QueryServiceServlet Change-Id: If8037a97f3d0b0febb8caf68e099f1fd24e0ac49 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1836 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/pom.xml M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java M asterixdb/pom.xml M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java M hyracks-fullstack/hyracks/hyracks-ipc/pom.xml M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java 12 files changed, 128 insertions(+), 35 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 01afdcf..beac085 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -433,6 +433,10 @@ <artifactId>hyracks-net</artifactId> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + </dependency> + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <scope>test</scope> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index ba98f73..dea5259 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -202,9 +202,7 @@ stopNcTheard.join(); } - if (cc != null) { - cc.stop(); - } + stopCC(false); if (deleteOldInstanceData) { deleteTransactionLogs(); @@ -212,6 +210,13 @@ } } + public void stopCC(boolean terminateNCService) throws Exception { + if (cc != null) { + cc.stop(terminateNCService); + cc = null; + } + } + protected String getDefaultStoragePath() { return joinPath("target", "io", "dir"); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 2b70685..9547514 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -20,12 +20,16 @@ package org.apache.asterix.api.http.server; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.app.message.ExecuteStatementRequestMessage; import org.apache.asterix.app.message.ExecuteStatementResponseMessage; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; @@ -35,6 +39,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.ipc.exceptions.IPCException; /** * Query service servlet that can run on NC nodes. @@ -91,4 +96,14 @@ sessionOutput.out().append(responseMsg.getResult()); } } + + @Override + protected HttpResponseStatus handleExecuteStatementException(Throwable t) { + if (t instanceof IPCException || t instanceof TimeoutException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t); + return HttpResponseStatus.SERVICE_UNAVAILABLE; + } else { + return super.handleExecuteStatementException(t); + } + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index ec90ae9..9ee064e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -420,21 +420,10 @@ ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS); } errorCount = 0; - } catch (AlgebricksException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.getMessage(), pe); - ResultUtil.printError(resultWriter, pe); - ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.BAD_REQUEST; - } catch (HyracksException pe) { - GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); - ResultUtil.printError(resultWriter, pe); - ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.INTERNAL_SERVER_ERROR; - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); + } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) { + status = handleExecuteStatementException(e); ResultUtil.printError(resultWriter, e); ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.INTERNAL_SERVER_ERROR; } finally { if (execStartEnd[0] == -1) { execStartEnd[1] = -1; @@ -475,4 +464,18 @@ param.clientContextID, queryCtx); outExecStartEnd[1] = System.nanoTime(); } + + protected HttpResponseStatus handleExecuteStatementException(Throwable t) { + if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t instanceof TokenMgrError + || t instanceof AlgebricksException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t); + return HttpResponseStatus.BAD_REQUEST; + } else if (t instanceof HyracksException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t); + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } else { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", t); + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } + } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java index 8b1bbe0..3d6543b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -48,7 +48,11 @@ import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,10 +65,20 @@ public class ConnectorApiServletTest { - @Test - public void testGet() throws Exception { + @BeforeClass + public static void setup() throws Exception { // Starts test asterixdb cluster. SqlppExecutionTest.setUp(); + } + + @AfterClass + public static void teardown() throws Exception { + // Tears down the asterixdb cluster. + SqlppExecutionTest.tearDown(); + } + + @Test + public void testGet() throws Exception { // Configures a test connector api servlet. ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }, (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext()); @@ -113,9 +127,6 @@ ArrayNode splits = (ArrayNode) actualResponse.get("splits"); String path = (splits.get(0)).get("path").asText(); Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset")); - - // Tears down the asterixdb cluster. - SqlppExecutionTest.tearDown(); } @Test diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index 1744836..50c2986 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.function.Predicate; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest; @@ -46,14 +47,15 @@ @Override public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri, - List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) - throws Exception { + List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, + Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { String clientContextId = UUID.randomUUID().toString(); final List<TestCase.CompilationUnit.Parameter> newParams = cancellable ? upsertParam(params, "client_context_id", clientContextId) : params; Callable<InputStream> query = () -> { try { - return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, true); + return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, + responseCodeValidator, true); } catch (Exception e) { e.printStackTrace(); throw e; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 35ab1df..ed6a77a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -518,17 +518,27 @@ public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception { - return executeQueryService(str, fmt, uri, params, jsonEncoded, false); + return executeQueryService(str, fmt, uri, params, jsonEncoded, null, false); + } + + public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, + List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator) + throws Exception { + return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false); } protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, - List<CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) throws Exception { + List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, + boolean cancellable) throws Exception { final List<CompilationUnit.Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams) : constructPostMethodUrl(str, uri, "statement", newParams); // Set accepted output response type method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType()); HttpResponse response = executeHttpRequest(method); + if (responseCodeValidator != null) { + checkResponse(response, responseCodeValidator); + } return response.getEntity().getContent(); } @@ -637,8 +647,13 @@ } public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception { + return executeJSONGet(fmt, uri, code -> code == HttpStatus.SC_OK); + } + + public InputStream executeJSONGet(OutputFormat fmt, URI uri, Predicate<Integer> responseCodeValidator) + throws Exception { HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>()); - HttpResponse response = executeAndCheckHttpRequest(request); + HttpResponse response = executeAndCheckHttpRequest(request, responseCodeValidator); return response.getEntity().getContent(); } @@ -1101,7 +1116,7 @@ } final URI uri = getEndpoint(Servlets.QUERY_SERVICE); if (DELIVERY_IMMEDIATE.equals(delivery)) { - resultStream = executeQueryService(statement, fmt, uri, params, true, true); + resultStream = executeQueryService(statement, fmt, uri, params, true, null, true); resultStream = ResultExtractor.extract(resultStream); } else { String handleVar = getHandleVariable(statement); @@ -1355,7 +1370,7 @@ return uri; } - protected URI getEndpoint(String servlet) throws URISyntaxException { + public URI getEndpoint(String servlet) throws URISyntaxException { return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null); } diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index e49f9aa..dd86bea 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -826,6 +826,11 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${hyracks.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> <artifactId>algebricks-compiler</artifactId> <version>${algebricks.version}</version> </dependency> diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 00b3cb6..e89ed56 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -78,7 +78,7 @@ protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) { DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status); - ctx.write(response).addListener(ChannelFutureListener.CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private void submit(ChannelHandlerContext ctx, IServlet servlet, FullHttpRequest request) throws IOException { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml index 45c5e04..cc3a513 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml @@ -41,6 +41,10 @@ </properties> <dependencies> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 9efd70e..d1659a8 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -23,6 +23,7 @@ import java.net.ServerSocket; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.Channel; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; @@ -32,12 +33,16 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; + +import org.apache.commons.io.IOUtils; public class IPCConnectionManager { private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName()); @@ -88,9 +93,10 @@ networkThread.start(); } - void stop() throws IOException { + void stop() { stopped = true; - serverSocketChannel.close(); + IOUtils.closeQuietly(serverSocketChannel); + networkThread.selector.wakeup(); } IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException { @@ -174,6 +180,8 @@ private class NetworkThread extends Thread { private final Selector selector; + private final Set<SocketChannel> openChannels = new HashSet<>(); + public NetworkThread() { super("IPC Network Listener Thread [" + address + "]"); setDaemon(true); @@ -186,6 +194,14 @@ @Override public void run() { + try { + doRun(); + } finally { + cleanup(); + } + } + + private void doRun() { try { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { @@ -204,6 +220,7 @@ if (!workingPendingConnections.isEmpty()) { for (IPCHandle handle : workingPendingConnections) { SocketChannel channel = SocketChannel.open(); + openChannels.add(channel); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.configureBlocking(false); SelectionKey cKey; @@ -269,7 +286,8 @@ system.getPerformanceCounters().addMessageBytesReceived(len); if (len < 0) { key.cancel(); - channel.close(); + IOUtils.closeQuietly(channel); + openChannels.remove(channel); handle.close(); } else { handle.processIncomingMessages(); @@ -285,7 +303,8 @@ system.getPerformanceCounters().addMessageBytesSent(len); if (len < 0) { key.cancel(); - channel.close(); + IOUtils.closeQuietly(channel); + openChannels.remove(channel); handle.close(); } else if (!writeBuffer.hasRemaining()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); @@ -297,6 +316,7 @@ } else if (key.isAcceptable()) { assert sc == serverSocketChannel; SocketChannel channel = serverSocketChannel.accept(); + openChannels.add(channel); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.configureBlocking(false); IPCHandle handle = new IPCHandle(system, null); @@ -334,6 +354,14 @@ } } + private void cleanup() { + for (Channel channel : openChannels) { + IOUtils.closeQuietly(channel); + } + openChannels.clear(); + IOUtils.closeQuietly(selector); + } + private boolean finishConnect(SocketChannel channel) { boolean connectFinished = false; try { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index 0997e57..d9ab210 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -59,7 +59,8 @@ public void start() { cMgr.start(); } - public void stop() throws IOException{ + + public void stop() { cMgr.stop(); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1836 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If8037a97f3d0b0febb8caf68e099f1fd24e0ac49 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
