>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725 )
Change subject: [NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements ...................................................................... [NO ISSUE][TX] Concurrently wirte checkpoints for atomic statements - user model changes: no - storage format changes: no - interface changes: no Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725 Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java 1 file changed, 39 insertions(+), 1 deletion(-) Approvals: Murtadha Hubail: Looks good to me, approved Peeyush Gupta: Looks good to me, but someone else must approve Jenkins: Verified; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java index fac023c..e653b7a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobCommitMessage.java @@ -18,7 +18,11 @@ */ package org.apache.asterix.app.message; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; @@ -46,13 +50,29 @@ @Override public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); + ForkJoinPool commonPool = ForkJoinPool.commonPool(); + List<Future> futures = new ArrayList<>(); for (Integer datasetId : datasetIds) { for (IndexInfo indexInfo : datasetLifecycleManager.getDatasetInfo(datasetId).getIndexes().values()) { if (indexInfo.getIndex().isPrimaryIndex()) { - ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit(); + futures.add(commonPool.submit(() -> { + try { + ((PrimaryIndexOperationTracker) indexInfo.getIndex().getOperationTracker()).commit(); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } + })); } } } + for (Future f : futures) { + try { + f.get(); + } catch (ExecutionException e) { + futures.forEach(future -> future.cancel(true)); + throw HyracksDataException.create(e); + } + } AtomicJobCompletionMessage message = new AtomicJobCompletionMessage(jobId, appCtx.getServiceContext().getNodeId()); NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17725 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I3846bfa534ebe4077f55f3a9acccd3dc3d8d0cda Gerrit-Change-Number: 17725 Gerrit-PatchSet: 9 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Al Hubail <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-MessageType: merged
