>From Michael Blow <[email protected]>:
Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email )
Change subject: [NO ISSUE][*DB][RT] Guard against cancelled request removed
from tracker
......................................................................
[NO ISSUE][*DB][RT] Guard against cancelled request removed from tracker
Ext-ref: MB-71890
Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
1 file changed, 21 insertions(+), 12 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/33/21233/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b930ce1..08907f2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4264,7 +4264,7 @@
try {
Thread.currentThread().setName(nameBefore + " :
WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
} finally {
Thread.currentThread().setName(nameBefore);
}
@@ -4291,8 +4291,9 @@
Stats stats) throws Exception {
CopyToStatement copyTo = (CopyToStatement) stmt;
final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ String reqId = requestParameters.getRequestReference().getUuid();
final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
+ (ClientRequest) requestTracker.get(reqId);
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws RuntimeDataException,
InterruptedException {
@@ -4300,7 +4301,7 @@
compilationLock.readLock().lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
throw e;
}
}
@@ -4469,7 +4470,7 @@
try {
Thread.currentThread().setName(nameBefore + " :
WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
} finally {
Thread.currentThread().setName(nameBefore);
}
@@ -4541,7 +4542,7 @@
try {
Thread.currentThread().setName(nameBefore + " :
WaitForCompletionForJobId: " + jobId);
hcc.waitForCompletion(jobId);
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
} finally {
Thread.currentThread().setName(nameBefore);
}
@@ -4565,6 +4566,9 @@
private static JobId runTrackJob(IHyracksClientConnection hcc,
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
String reqId, String clientCtxId, ClientRequest clientRequest,
JobKind jobKind) throws Exception {
+ // Guard before submitting the job: if the request was cancelled and
removed from the tracker,
+ // clientRequest will be null here; treat that as a cancellation
rather than NPE on setJobId.
+ ensureNotCancelled(clientRequest, reqId);
jobSpec.setRequestId(reqId);
jobSpec.setProperty(JOB_KIND, jobKind);
JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
@@ -5540,8 +5544,9 @@
IRequestParameters requestParameters, Map<String, IAObject>
stmtParams, IStatementRewriter stmtRewriter)
throws Exception {
final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ String reqId = requestParameters.getRequestReference().getUuid();
final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
+ (ClientRequest) requestTracker.get(reqId);
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws RuntimeDataException,
InterruptedException {
@@ -5549,7 +5554,7 @@
compilationLock.readLock().lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
throw e;
}
}
@@ -5789,6 +5794,8 @@
String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
+ // Guard before markCancellable: request may have been cancelled and
deregistered already.
+ ensureNotCancelled(clientRequest, reqId);
if (cancellable) {
clientRequest.markCancellable();
}
@@ -5804,7 +5811,7 @@
SchedulableClientRequest.of(clientRequest,
requestParameters, metadataProvider, jobSpec);
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
if (atomicStatement != null) {
Dataset ds = metadataProvider.findDataset(((InsertStatement)
atomicStatement).getDatabaseName(),
((InsertStatement) atomicStatement).getDataverseName(),
@@ -5838,7 +5845,7 @@
hcc.waitForCompletion(jobId);
} else {
hcc.waitForCompletion(jobId);
- ensureNotCancelled(clientRequest);
+ ensureNotCancelled(clientRequest, reqId);
printer.print(jobId);
}
if (atomic) {
@@ -6096,9 +6103,11 @@
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(),
dataset, sourceLoc);
}
- private static void ensureNotCancelled(ClientRequest clientRequest) throws
RuntimeDataException {
- if (clientRequest.isCancelled()) {
- throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED,
clientRequest.getId());
+ private static void ensureNotCancelled(ClientRequest clientRequest, String
reqId) throws RuntimeDataException {
+ // clientRequest may be null if the request was cancelled and
deregistered from the tracker
+ // between when it was looked up and when this check is reached; treat
that as cancelled.
+ if (clientRequest == null || clientRequest.isCancelled()) {
+ throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, reqId);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21233?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: Ib5c93b1607444612b0c485f649ab9424a004b68c
Gerrit-Change-Number: 21233
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>