>From Peeyush Gupta <peeyush.gu...@couchbase.com>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257 )
Change subject: [ASTERIXDB-3391][OTH] Make DML statements cancellable ...................................................................... [ASTERIXDB-3391][OTH] Make DML statements cancellable - user model changes: no - storage format changes: no - interface changes: no Details: With this patch INSERT/UPSERT/DELETE and COPY statements are marked cancellable. For atomic statements, the statements are cancelled as long as the ingestion job is running i.e., ACKs from all nodes/partitions are not received and the CC has not decided to commit the statement yet. Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257 Reviewed-by: Murtadha Hubail <mhub...@apache.org> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Murtadha Hubail <mhub...@apache.org> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java 5 files changed, 169 insertions(+), 6 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved; Verified Anon. E. Moose #1000171: Jenkins: Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index 0436ea8..84a6488 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -54,9 +54,9 @@ if (complete) { return; } - complete(); - state = State.CANCELLED; if (cancellable) { + complete(); + state = State.CANCELLED; doCancel(appCtx); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index 0786895..9d1e108 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@ -215,6 +215,7 @@ @Override public void abortTransaction(JobId jobId) throws Exception { IGlobalTransactionContext context = getTransactionContext(jobId); + context.resetAcksReceived(); if (context.getTxnStatus() == TransactionStatus.PREPARED) { sendJobRollbackMessages(context); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 34648dd..3035496 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4009,10 +4009,12 @@ final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); clientRequest.setJobId(jobId); + clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); + ensureNotCancelled(clientRequest); } finally { Thread.currentThread().setName(nameBefore); } @@ -4151,7 +4153,7 @@ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid()); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - reqParams, false, stmt, clientRequest); + reqParams, true, stmt, clientRequest); } else { locker.lock(); JobId jobId = null; @@ -4176,10 +4178,12 @@ } jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); clientRequest.setJobId(jobId); + clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); + ensureNotCancelled(clientRequest); } finally { Thread.currentThread().setName(nameBefore); } @@ -4246,10 +4250,12 @@ final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); clientRequest.setJobId(jobId); + clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); hcc.waitForCompletion(jobId); + ensureNotCancelled(clientRequest); } finally { Thread.currentThread().setName(nameBefore); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java new file mode 100644 index 0000000..1cb87d8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.atomic_statements; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter.CLIENT_ID; +import static org.apache.hyracks.util.file.FileUtil.joinPath; + +import java.io.File; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.api.common.LocalCloudUtil; +import org.apache.asterix.common.TestDataUtil; +import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.asterix.testframework.xml.ParameterTypeEnum; +import org.apache.asterix.testframework.xml.TestCase; +import org.apache.http.HttpResponse; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AtomicStatementsCancellationTest { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + private static final String TEST_CONFIG_FILE_NAME = "cc-cloud-storage.conf"; + private static final TestExecutor TEST_EXECUTOR = new TestExecutor(); + + private static final String TEST_CONFIG_PATH = + joinPath(System.getProperty("user.dir"), "src", "test", "resources");; + private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME; + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + + private static final String DATASET_NAME = "ds_0"; + private static final int BATCH_SIZE = 20000; + private static final int NUM_UPSERTS = 100; + + @Before + public void setUp() throws Exception { + boolean cleanStart = true; + LocalCloudUtil.startS3CloudEnvironment(cleanStart); + integrationUtil.setGracefulShutdown(true); + integrationUtil.init(true, TEST_CONFIG_FILE_PATH); + createDatasets(); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + private void createDatasets() throws Exception { + TestDataUtil.createDatasetWithoutType(DATASET_NAME, Map.of("id", "uuid"), true); + TestDataUtil.createSecondaryBTreeIndex(DATASET_NAME, DATASET_NAME + "_sidx", "name:string"); + } + + public String generateInsertStatement(String dataset, long count) throws Exception { + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < count; i++) { + stringBuilder.append("{\"name\": \"name_" + i + "\"},"); + } + stringBuilder.deleteCharAt(stringBuilder.length() - 1); + return "INSERT INTO " + dataset + "([" + stringBuilder + "]);"; + } + + private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception { + HttpResponse response = TEST_EXECUTOR.executeHttpRequest(TestExecutor.constructDeleteMethod(uri, params)); + return response.getStatusLine().getStatusCode(); + } + + @Test + public void testAtomicityWithCancellation() throws Exception { + Random rnd = new Random(); + for (int j = 0; j < NUM_UPSERTS; j++) { + String clientContextId = UUID.randomUUID().toString(); + final List<TestCase.CompilationUnit.Parameter> params = new ArrayList<>(); + TestCase.CompilationUnit.Parameter newParam = new TestCase.CompilationUnit.Parameter(); + newParam.setName(CLIENT_ID.str()); + newParam.setType(ParameterTypeEnum.STRING); + newParam.setValue(clientContextId); + params.add(newParam); + String statement = generateInsertStatement(DATASET_NAME, BATCH_SIZE); + Callable<InputStream> upsert = () -> { + try { + return TEST_EXECUTOR.executeQueryService(statement, TestCaseContext.OutputFormat.CLEAN_JSON, + TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), params, false, UTF_8); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + }; + Future<InputStream> future = executor.submit(upsert); + if (!future.isDone()) { + Thread.sleep(rnd.nextInt(900) + 800); + // Cancels the query request while the query is executing. + int rc = cancelQuery(TEST_EXECUTOR.getEndpoint(Servlets.RUNNING_REQUESTS), params); + Assert.assertTrue(rc == 200 || rc == 404 || rc == 403); + } + while (!future.isDone()) { + Thread.sleep(100); + } + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index e6b4662..f95111e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -692,7 +692,7 @@ return checkResponse(executeBasicAuthHttpRequest(method, credentials), responseCodeValidator); } - protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { + public HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { // https://issues.apache.org/jira/browse/ASTERIXDB-2315 ExecutorService executor = Executors.newSingleThreadExecutor(); CloseableHttpClient client = HttpClients.custom().addInterceptorFirst(new PreemptiveAuthInterceptor()) @@ -933,7 +933,7 @@ return false; } - protected List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) { + public List<Parameter> upsertParam(List<Parameter> params, String name, ParameterTypeEnum type, String value) { boolean replaced = false; List<Parameter> result = new ArrayList<>(); for (Parameter param : params) { @@ -2983,7 +2983,7 @@ } // adapted from https://stackoverflow.com/questions/2014700/preemptive-basic-authentication-with-apache-httpclient-4 - static class PreemptiveAuthInterceptor implements HttpRequestInterceptor { + public static class PreemptiveAuthInterceptor implements HttpRequestInterceptor { public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException { AuthState authState = (AuthState) context.getAttribute(HttpClientContext.TARGET_AUTH_STATE); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0 Gerrit-Change-Number: 18257 Gerrit-PatchSet: 6 Gerrit-Owner: Peeyush Gupta <peeyush.gu...@couchbase.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Peeyush Gupta <peeyush.gu...@couchbase.com> Gerrit-MessageType: merged