>From Peeyush Gupta <peeyush.gu...@couchbase.com>:

Peeyush Gupta has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257 )

Change subject: [ASTERIXDB-3391][OTH] Make DML statements cancellable
......................................................................

[ASTERIXDB-3391][OTH] Make DML statements cancellable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
With this patch INSERT/UPSERT/DELETE and COPY statements are marked
cancellable. For atomic statements, the statements are cancelled as
long as the ingestion job is running i.e., ACKs from all nodes/partitions
are not received and the CC has not decided to commit the statement yet.

Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Murtadha Hubail <mhub...@apache.org>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
5 files changed, 169 insertions(+), 6 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved; Verified
  Anon. E. Moose #1000171:
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 0436ea8..84a6488 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -54,9 +54,9 @@
         if (complete) {
             return;
         }
-        complete();
-        state = State.CANCELLED;
         if (cancellable) {
+            complete();
+            state = State.CANCELLED;
             doCancel(appCtx);
         }
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index 0786895..9d1e108 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -215,6 +215,7 @@
     @Override
     public void abortTransaction(JobId jobId) throws Exception {
         IGlobalTransactionContext context = getTransactionContext(jobId);
+        context.resetAcksReceived();
         if (context.getTxnStatus() == TransactionStatus.PREPARED) {
             sendJobRollbackMessages(context);
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 34648dd..3035496 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4009,10 +4009,12 @@
                 final ClientRequest clientRequest =
                         (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4151,7 +4153,7 @@
         ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqParams.getRequestReference().getUuid());
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
-                    reqParams, false, stmt, clientRequest);
+                    reqParams, true, stmt, clientRequest);
         } else {
             locker.lock();
             JobId jobId = null;
@@ -4176,10 +4178,12 @@
                 }
                 jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
@@ -4246,10 +4250,12 @@
                 final ClientRequest clientRequest =
                         (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
                 clientRequest.setJobId(jobId);
+                clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
                     Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
                     hcc.waitForCompletion(jobId);
+                    ensureNotCancelled(clientRequest);
                 } finally {
                     Thread.currentThread().setName(nameBefore);
                 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
new file mode 100644
index 0000000..1cb87d8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsCancellationTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter.CLIENT_ID;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.http.HttpResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsCancellationTest {
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
+
+    private static final String TEST_CONFIG_FILE_NAME = 
"cc-cloud-storage.conf";
+    private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+
+    private static final String TEST_CONFIG_PATH =
+            joinPath(System.getProperty("user.dir"), "src", "test", 
"resources");;
+    private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + 
File.separator + TEST_CONFIG_FILE_NAME;
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+
+    private static final String DATASET_NAME = "ds_0";
+    private static final int BATCH_SIZE = 20000;
+    private static final int NUM_UPSERTS = 100;
+
+    @Before
+    public void setUp() throws Exception {
+        boolean cleanStart = true;
+        LocalCloudUtil.startS3CloudEnvironment(cleanStart);
+        integrationUtil.setGracefulShutdown(true);
+        integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+        createDatasets();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    private void createDatasets() throws Exception {
+        TestDataUtil.createDatasetWithoutType(DATASET_NAME, Map.of("id", 
"uuid"), true);
+        TestDataUtil.createSecondaryBTreeIndex(DATASET_NAME, DATASET_NAME + 
"_sidx", "name:string");
+    }
+
+    public String generateInsertStatement(String dataset, long count) throws 
Exception {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < count; i++) {
+            stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+        }
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        return "INSERT INTO " + dataset + "([" + stringBuilder + "]);";
+    }
+
+    private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> 
params) throws Exception {
+        HttpResponse response = 
TEST_EXECUTOR.executeHttpRequest(TestExecutor.constructDeleteMethod(uri, 
params));
+        return response.getStatusLine().getStatusCode();
+    }
+
+    @Test
+    public void testAtomicityWithCancellation() throws Exception {
+        Random rnd = new Random();
+        for (int j = 0; j < NUM_UPSERTS; j++) {
+            String clientContextId = UUID.randomUUID().toString();
+            final List<TestCase.CompilationUnit.Parameter> params = new 
ArrayList<>();
+            TestCase.CompilationUnit.Parameter newParam = new 
TestCase.CompilationUnit.Parameter();
+            newParam.setName(CLIENT_ID.str());
+            newParam.setType(ParameterTypeEnum.STRING);
+            newParam.setValue(clientContextId);
+            params.add(newParam);
+            String statement = generateInsertStatement(DATASET_NAME, 
BATCH_SIZE);
+            Callable<InputStream> upsert = () -> {
+                try {
+                    return TEST_EXECUTOR.executeQueryService(statement, 
TestCaseContext.OutputFormat.CLEAN_JSON,
+                            TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), 
params, false, UTF_8);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                }
+            };
+            Future<InputStream> future = executor.submit(upsert);
+            if (!future.isDone()) {
+                Thread.sleep(rnd.nextInt(900) + 800);
+                // Cancels the query request while the query is executing.
+                int rc = 
cancelQuery(TEST_EXECUTOR.getEndpoint(Servlets.RUNNING_REQUESTS), params);
+                Assert.assertTrue(rc == 200 || rc == 404 || rc == 403);
+            }
+            while (!future.isDone()) {
+                Thread.sleep(100);
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index e6b4662..f95111e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -692,7 +692,7 @@
         return checkResponse(executeBasicAuthHttpRequest(method, credentials), 
responseCodeValidator);
     }

-    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws 
Exception {
+    public HttpResponse executeHttpRequest(HttpUriRequest method) throws 
Exception {
         // https://issues.apache.org/jira/browse/ASTERIXDB-2315
         ExecutorService executor = Executors.newSingleThreadExecutor();
         CloseableHttpClient client = 
HttpClients.custom().addInterceptorFirst(new PreemptiveAuthInterceptor())
@@ -933,7 +933,7 @@
         return false;
     }

-    protected List<Parameter> upsertParam(List<Parameter> params, String name, 
ParameterTypeEnum type, String value) {
+    public List<Parameter> upsertParam(List<Parameter> params, String name, 
ParameterTypeEnum type, String value) {
         boolean replaced = false;
         List<Parameter> result = new ArrayList<>();
         for (Parameter param : params) {
@@ -2983,7 +2983,7 @@
     }

     // adapted from 
https://stackoverflow.com/questions/2014700/preemptive-basic-authentication-with-apache-httpclient-4
-    static class PreemptiveAuthInterceptor implements HttpRequestInterceptor {
+    public static class PreemptiveAuthInterceptor implements 
HttpRequestInterceptor {

         public void process(final HttpRequest request, final HttpContext 
context) throws HttpException, IOException {
             AuthState authState = (AuthState) 
context.getAttribute(HttpClientContext.TARGET_AUTH_STATE);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18257
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I16410ab9353c24597f77ab38ce06996fc5dfacd0
Gerrit-Change-Number: 18257
Gerrit-PatchSet: 6
Gerrit-Owner: Peeyush Gupta <peeyush.gu...@couchbase.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Peeyush Gupta <peeyush.gu...@couchbase.com>
Gerrit-MessageType: merged

Reply via email to