This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.0 by this push: new aabe05c Retry new Bulk import on merge. Fixes #471 (#1367) aabe05c is described below commit aabe05c19ec39b30908a9093a1f5090c01cf3fc1 Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Oct 10 14:55:14 2019 -0400 Retry new Bulk import on merge. Fixes #471 (#1367) * Created AccumuloBulkMergeException for handling merges with Retry maxWait up to 2 Minutes --- .../clientImpl/AccumuloBulkMergeException.java | 32 +++++++++++ .../core/clientImpl/TableOperationsImpl.java | 2 + .../accumulo/core/clientImpl/bulk/BulkImport.java | 64 ++++++++++++++++++---- .../thrift/TableOperationExceptionType.java | 5 +- core/src/main/thrift/client.thrift | 1 + .../master/tableOps/bulkVer2/PrepBulkImport.java | 7 +-- .../tableOps/bulkVer2/PrepBulkImportTest.java | 6 +- 7 files changed, 97 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java new file mode 100644 index 0000000..2a7527e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import org.apache.accumulo.core.client.AccumuloException; + +/** + * Internal class indicating a concurrent merge occurred during the new bulk import. + */ +public class AccumuloBulkMergeException extends AccumuloException { + + private static final String MSG = "Concurrent merge happened"; + + public AccumuloBulkMergeException(final Throwable cause) { + super(MSG, cause); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index fbd5f74..762b85c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -382,6 +382,8 @@ public class TableOperationsImpl extends TableOperationsHelper { case OFFLINE: throw new TableOfflineException( Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName))); + case BULK_CONCURRENT_MERGE: + throw new AccumuloBulkMergeException(e); default: throw new AccumuloException(e.description, e); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 48a7f93..52860b7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -17,6 +17,8 @@ package org.apache.accumulo.core.clientImpl.bulk; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; import java.io.FileNotFoundException; @@ -41,6 +43,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -49,6 +52,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.clientImpl.AccumuloBulkMergeException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.clientImpl.Tables; @@ -72,6 +76,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.volume.VolumeConfiguration; +import org.apache.accumulo.fate.util.Retry; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -125,21 +130,56 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti Path srcPath = checkPath(fs, dir); SortedMap<KeyExtent,Bulk.Files> mappings; - if (plan == null) { - mappings = computeMappingFromFiles(fs, tableId, srcPath); - } else { - mappings = computeMappingFromPlan(fs, tableId, srcPath); - } + TableOperationsImpl tableOps = new TableOperationsImpl(context); + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5) + .logInterval(3, TimeUnit.MINUTES).createRetry(); + + // retry if a merge occurs + boolean shouldRetry = true; + while (shouldRetry) { + if (plan == null) { + mappings = computeMappingFromFiles(fs, tableId, srcPath); + } else { + mappings = computeMappingFromPlan(fs, tableId, srcPath); + } - if (mappings.isEmpty()) - throw new IllegalArgumentException("Attempted to import zero files from " + srcPath); + if (mappings.isEmpty()) + throw new IllegalArgumentException("Attempted to import zero files from " + srcPath); - BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create); + BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create); + + List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)), + ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)), + ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); + try { + tableOps.doBulkFateOperation(args, tableName); + shouldRetry = false; + } catch (AccumuloBulkMergeException ae) { + if (plan != null) { + checkPlanForSplits(ae); + } + try { + retry.waitForNextAttempt(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + log.info(ae.getMessage() + ". Retrying bulk import to " + tableName); + } + } + } - List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)), - ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)), - ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); - new TableOperationsImpl(context).doBulkFateOperation(args, tableName); + /** + * Check if splits were specified in plan when a concurrent merge occurred. If so, throw error + * back to user since retrying won't help. If not, then retry. + */ + private void checkPlanForSplits(AccumuloBulkMergeException abme) throws AccumuloException { + for (Destination des : plan.getDestinations()) { + if (des.getRangeType().equals(RangeType.TABLE)) { + throw new AccumuloException("The splits provided in Load Plan do not exist in " + tableName, + abme); + } + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java index 7764e3a..bd57e1c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java @@ -34,7 +34,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum { NAMESPACE_EXISTS(7), NAMESPACE_NOTFOUND(8), INVALID_NAME(9), - BULK_BAD_LOAD_MAPPING(10); + BULK_BAD_LOAD_MAPPING(10), + BULK_CONCURRENT_MERGE(11); private final int value; @@ -78,6 +79,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum { return INVALID_NAME; case 10: return BULK_BAD_LOAD_MAPPING; + case 11: + return BULK_CONCURRENT_MERGE; default: return null; } diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift index 91aec2e..85c93b2 100644 --- a/core/src/main/thrift/client.thrift +++ b/core/src/main/thrift/client.thrift @@ -52,6 +52,7 @@ enum TableOperationExceptionType { NAMESPACE_NOTFOUND INVALID_NAME BULK_BAD_LOAD_MAPPING + BULK_CONCURRENT_MERGE } enum ConfigurationType { diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java index b215774..2199e99 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java @@ -146,12 +146,9 @@ public class PrepBulkImport extends MasterRepo { } if (currRange != null || lmi.hasNext()) { - // a merge happened between the time the mapping was generated and the table lock was - // acquired + // merge happened after the mapping was generated and before the table lock was acquired throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, - TableOperationExceptionType.OTHER, "Concurrent merge happened"); // TODO need to handle - // this on the client - // side + TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); } } diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java index 0b6f8e7..80d1615 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -103,7 +104,8 @@ public class PrepBulkImportTest { .collect(Collectors.joining(",")); } - public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) { + public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) + throws AccumuloException { try { runTest(loadRanges, tabletRanges); fail("expected " + toRangeStrings(loadRanges) + " to fail against " @@ -144,7 +146,7 @@ public class PrepBulkImportTest { } @Test - public void testException() { + public void testException() throws Exception { for (List<KeyExtent> loadRanges : powerSet(nke(null, "b"), nke("b", "m"), nke("m", "r"), nke("r", "v"), nke("v", null))) {