http://git-wip-us.apache.org/repos/asf/hbase/blob/ff23c022/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index a826f8c..579d547 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; @@ -484,23 +485,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) { CompletableFuture<TableDescriptor> future = new CompletableFuture<>(); - this.<List<TableSchema>> newMasterCaller() - .action( - (controller, stub) -> this - .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( - controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s, - c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp - .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - if (!tableSchemas.isEmpty()) { - future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); - } else { - future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); - } - }); + addListener(this.<List<TableSchema>> newMasterCaller() + .action((controller, stub) -> this + .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call( + controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), + (s, c, req, done) -> s.getTableDescriptors(c, req, done), + (resp) -> resp.getTableSchemaList())) + .call(), (tableSchemas, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!tableSchemas.isEmpty()) { + future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0))); + } else { + future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); + } + }); return future; } @@ -583,7 +584,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { CompletableFuture<Boolean> future = new CompletableFuture<>(); - AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> { + addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -600,7 +601,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Boolean> isTableDisabled(TableName tableName) { CompletableFuture<Boolean> future = new CompletableFuture<>(); - AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> { + addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -629,40 +630,37 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<Boolean> isTableAvailable(TableName tableName, Optional<byte[][]> splitKeys) { CompletableFuture<Boolean> future = new CompletableFuture<>(); - isTableEnabled(tableName).whenComplete( - (enabled, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - if (!enabled) { - future.complete(false); - } else { - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)) - .whenComplete( - (locations, error1) -> { - if (error1 != null) { - future.completeExceptionally(error1); - return; - } - List<HRegionLocation> notDeployedRegions = - locations.stream().filter(loc -> loc.getServerName() == null) - .collect(Collectors.toList()); - if (notDeployedRegions.size() > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() - + " regions"); - } - future.complete(false); - return; - } + addListener(isTableEnabled(tableName), (enabled, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!enabled) { + future.complete(false); + } else { + addListener( + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)), + (locations, error1) -> { + if (error1 != null) { + future.completeExceptionally(error1); + return; + } + List<HRegionLocation> notDeployedRegions = locations.stream() + .filter(loc -> loc.getServerName() == null).collect(Collectors.toList()); + if (notDeployedRegions.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions"); + } + future.complete(false); + return; + } - Optional<Boolean> available = - splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); - future.complete(available.orElse(true)); - }); - } - }); + Optional<Boolean> available = + splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys)); + future.complete(available.orElse(true)); + }); + } + }); return future; } @@ -784,20 +782,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> flush(TableName tableName) { CompletableFuture<Void> future = new CompletableFuture<>(); - tableExists(tableName).whenComplete((exists, err) -> { + addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); } else if (!exists) { future.completeExceptionally(new TableNotFoundException(tableName)); } else { - isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> { + addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else if (!tableEnabled) { future.completeExceptionally(new TableNotEnabledException(tableName)); } else { - execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), - new HashMap<>()).whenComplete((ret, err3) -> { + addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), + new HashMap<>()), (ret, err3) -> { if (err3 != null) { future.completeExceptionally(err3); } else { @@ -814,27 +812,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> flushRegion(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionLocation(regionName).whenComplete( - (location, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; + addListener(getRegionLocation(regionName), (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + addListener(flush(serverName, location.getRegion()), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); } - flush(serverName, location.getRegion()) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); }); + }); return future; } @@ -852,7 +848,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> flushRegionServer(ServerName sn) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegions(sn).whenComplete((hRegionInfos, err) -> { + addListener(getRegions(sn), (hRegionInfos, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -861,9 +857,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { if (hRegionInfos != null) { hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); } - CompletableFuture - .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) - .whenComplete((ret, err2) -> { + addListener(CompletableFuture.allOf( + compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -936,7 +931,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegions(sn).whenComplete((hRegionInfos, err) -> { + addListener(getRegions(sn), (hRegionInfos, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -945,15 +940,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { if (hRegionInfos != null) { hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); } - CompletableFuture - .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); + addListener(CompletableFuture.allOf( + compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); }); return future; } @@ -961,28 +955,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, boolean major) { CompletableFuture<Void> future = new CompletableFuture<>(); - - getRegionLocation(regionName).whenComplete( - (location, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; - } - compact(location.getServerName(), location.getRegion(), major, columnFamily) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + addListener(getRegionLocation(regionName), (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @@ -994,19 +986,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); // For meta table, we use zk to fetch all locations. AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); - registry.getMetaRegionLocation().whenComplete( - (metaRegions, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (metaRegions == null || metaRegions.isEmpty() - || metaRegions.getDefaultRegionLocation() == null) { - future.completeExceptionally(new IOException("meta region does not found")); - } else { - future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); - } - // close the registry. - IOUtils.closeQuietly(registry); - }); + addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (metaRegions == null || metaRegions.isEmpty() || + metaRegions.getDefaultRegionLocation() == null) { + future.completeExceptionally(new IOException("meta region does not found")); + } else { + future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); + } + // close the registry. + IOUtils.closeQuietly(registry); + }); return future; } else { // For non-meta table, we fetch all locations by scanning hbase:meta table @@ -1017,40 +1008,40 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { /** * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() */ - private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, - boolean major, CompactType compactType) { + private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, + CompactType compactType) { CompletableFuture<Void> future = new CompletableFuture<>(); switch (compactType) { case MOB: - connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + addListener(connection.registry.getMasterAddress(), (serverName, err) -> { if (err != null) { future.completeExceptionally(err); return; } RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); - compact(serverName, regionInfo, major, columnFamily) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); + addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); }); break; case NORMAL: - getTableHRegionLocations(tableName).whenComplete((locations, err) -> { + addListener(getTableHRegionLocations(tableName), (locations, err) -> { if (err != null) { future.completeExceptionally(err); return; } - CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) + CompletableFuture<?>[] compactFutures = + locations.stream().filter(l -> l.getRegion() != null) .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) .toArray(CompletableFuture<?>[]::new); // future complete unless all of the compact futures are completed. - CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { + addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -1091,29 +1082,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, CompletableFuture<TableName> result) { - getRegionLocation(encodeRegionName).whenComplete( - (location, err) -> { - if (err != null) { - result.completeExceptionally(err); - return; - } - RegionInfo regionInfo = location.getRegion(); - if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { - result.completeExceptionally(new IllegalArgumentException( - "Can't invoke merge on non-default regions directly")); - return; - } - if (!tableName.compareAndSet(null, regionInfo.getTable())) { - if (!tableName.get().equals(regionInfo.getTable())) { - // tables of this two region should be same. - result.completeExceptionally(new IllegalArgumentException( - "Cannot merge regions from two different tables " + tableName.get() + " and " - + regionInfo.getTable())); - } else { - result.complete(tableName.get()); - } + addListener(getRegionLocation(encodeRegionName), (location, err) -> { + if (err != null) { + result.completeExceptionally(err); + return; + } + RegionInfo regionInfo = location.getRegion(); + if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + result.completeExceptionally( + new IllegalArgumentException("Can't invoke merge on non-default regions directly")); + return; + } + if (!tableName.compareAndSet(null, regionInfo.getTable())) { + if (!tableName.get().equals(regionInfo.getTable())) { + // tables of this two region should be same. + result.completeExceptionally( + new IllegalArgumentException("Cannot merge regions from two different tables " + + tableName.get() + " and " + regionInfo.getTable())); + } else { + result.complete(tableName.get()); } - }); + } + }); } private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA, @@ -1178,41 +1168,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA); final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB); - checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB) - .whenComplete((tableName, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } + addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB), + (tableName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } - MergeTableRegionsRequest request = null; - try { - request = RequestConverter.buildMergeTableRegionsRequest( - new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), - ng.newNonce()); - } catch (DeserializationException e) { - future.completeExceptionally(e); - return; - } + MergeTableRegionsRequest request = null; + try { + request = RequestConverter.buildMergeTableRegionsRequest( + new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(), + ng.newNonce()); + } catch (DeserializationException e) { + future.completeExceptionally(e); + return; + } + addListener( this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request, (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), - new MergeTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - - }); + new MergeTableRegionProcedureBiConsumer(tableName)), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @Override public CompletableFuture<Void> split(TableName tableName) { CompletableFuture<Void> future = new CompletableFuture<>(); - tableExists(tableName).whenComplete((exist, error) -> { + addListener(tableExists(tableName), (exist, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -1221,45 +1212,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(new TableNotFoundException(tableName)); return; } - metaTable + addListener( + metaTable .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY) - .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) - .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))) - .whenComplete((results, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - return; - } - if (results != null && !results.isEmpty()) { - List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); - for (Result r : results) { - if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) continue; - RegionLocations rl = MetaTableAccessor.getRegionLocations(r); - if (rl != null) { - for (HRegionLocation h : rl.getRegionLocations()) { - if (h != null && h.getServerName() != null) { - RegionInfo hri = h.getRegion(); - if (hri == null || hri.isSplitParent() - || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) - continue; - splitFutures.add(split(hri, null)); + .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION)) + .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))), + (results, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (results != null && !results.isEmpty()) { + List<CompletableFuture<Void>> splitFutures = new ArrayList<>(); + for (Result r : results) { + if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) { + continue; + } + RegionLocations rl = MetaTableAccessor.getRegionLocations(r); + if (rl != null) { + for (HRegionLocation h : rl.getRegionLocations()) { + if (h != null && h.getServerName() != null) { + RegionInfo hri = h.getRegion(); + if (hri == null || hri.isSplitParent() || + hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + continue; } + splitFutures.add(split(hri, null)); } } } - CompletableFuture - .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])) - .whenComplete((ret, exception) -> { - if (exception != null) { - future.completeExceptionally(exception); - return; - } - future.complete(ret); - }); - } else { - future.complete(null); } - }); + addListener( + CompletableFuture + .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])), + (ret, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + return; + } + future.complete(ret); + }); + } else { + future.complete(null); + } + }); }); return future; } @@ -1270,54 +1266,52 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { if (splitPoint == null) { return failedFuture(new IllegalArgumentException("splitPoint can not be null.")); } - connection.getRegionLocator(tableName).getRegionLocation(splitPoint) - .whenComplete((loc, err) -> { - if (err != null) { - result.completeExceptionally(err); - } else if (loc == null || loc.getRegion() == null) { - result.completeExceptionally(new IllegalArgumentException( - "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); - } else { - splitRegion(loc.getRegion().getRegionName(), splitPoint) - .whenComplete((ret, err2) -> { - if (err2 != null) { - result.completeExceptionally(err2); - } else { - result.complete(ret); - } + addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint), + (loc, err) -> { + if (err != null) { + result.completeExceptionally(err); + } else if (loc == null || loc.getRegion() == null) { + result.completeExceptionally(new IllegalArgumentException( + "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); + } else { + addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> { + if (err2 != null) { + result.completeExceptionally(err2); + } else { + result.complete(ret); + } - }); - } - }); + }); + } + }); return result; } @Override public CompletableFuture<Void> splitRegion(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionLocation(regionName).whenComplete( - (location, err) -> { - RegionInfo regionInfo = location.getRegion(); - if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { - future.completeExceptionally(new IllegalArgumentException( - "Can't split replicas directly. " - + "Replicas are auto-split when their primary is split.")); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; + addListener(getRegionLocation(regionName), (location, err) -> { + RegionInfo regionInfo = location.getRegion(); + if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + future + .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + + "Replicas are auto-split when their primary is split.")); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + addListener(split(regionInfo, null), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); } - split(regionInfo, null).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); }); + }); return future; } @@ -1326,35 +1320,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { Preconditions.checkNotNull(splitPoint, "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead"); CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionLocation(regionName).whenComplete( - (location, err) -> { - RegionInfo regionInfo = location.getRegion(); - if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { - future.completeExceptionally(new IllegalArgumentException( - "Can't split replicas directly. " - + "Replicas are auto-split when their primary is split.")); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; - } - if (regionInfo.getStartKey() != null - && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { - future.completeExceptionally(new IllegalArgumentException( - "should not give a splitkey which equals to startkey!")); - return; + addListener(getRegionLocation(regionName), (location, err) -> { + RegionInfo regionInfo = location.getRegion(); + if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + future + .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " + + "Replicas are auto-split when their primary is split.")); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + if (regionInfo.getStartKey() != null && + Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) { + future.completeExceptionally( + new IllegalArgumentException("should not give a splitkey which equals to startkey!")); + return; + } + addListener(split(regionInfo, splitPoint), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); } - split(regionInfo, splitPoint).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); }); + }); return future; } @@ -1363,121 +1356,119 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { TableName tableName = hri.getTable(); SplitTableRegionRequest request = null; try { - request = RequestConverter - .buildSplitTableRegionRequest(hri, splitPoint, - ng.getNonceGroup(), ng.newNonce()); + request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(), + ng.newNonce()); } catch (DeserializationException e) { future.completeExceptionally(e); return future; } - this.<SplitTableRegionRequest, SplitTableRegionResponse>procedureCall(request, - (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), - new SplitTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); + addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request, + (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), + new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); return future; } @Override public CompletableFuture<Void> assign(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionInfo(regionName).whenComplete( - (regionInfo, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - this.<Void> newMasterCaller() - .action( - ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( - controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), - resp -> null))).call().whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + addListener(getRegionInfo(regionName), (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + addListener(this.<Void> newMasterCaller() + .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( + controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), + (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) + .call(), (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @Override public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionInfo(regionName).whenComplete( - (regionInfo, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } + addListener(getRegionInfo(regionName), (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + addListener( this.<Void> newMasterCaller() - .action( - ((controller, stub) -> this - .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, - RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), - (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call() - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + .action(((controller, stub) -> this + .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, + RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), + (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) + .call(), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @Override public CompletableFuture<Void> offline(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionInfo(regionName).whenComplete( - (regionInfo, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } + addListener(getRegionInfo(regionName), (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + addListener( this.<Void> newMasterCaller() - .action( - ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( - controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done), - resp -> null))).call().whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + .action(((controller, stub) -> this + .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub, + RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), + (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) + .call(), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @Override public CompletableFuture<Void> move(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionInfo(regionName).whenComplete( - (regionInfo, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } + addListener(getRegionInfo(regionName), (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + addListener( moveRegion( - RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @@ -1486,20 +1477,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { Preconditions.checkNotNull(destServerName, "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead"); CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionInfo(regionName).whenComplete((regionInfo, err) -> { + addListener(getRegionInfo(regionName), (regionInfo, err) -> { if (err != null) { future.completeExceptionally(err); return; } - moveRegion( - RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); + addListener(moveRegion(RequestConverter + .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), + (ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); }); return future; } @@ -1634,11 +1625,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture<Void> future = new CompletableFuture<Void>(); - getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { + addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = - ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); - updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } @@ -1656,24 +1647,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture<Void> future = new CompletableFuture<Void>(); - getReplicationPeerConfig(id).whenComplete( - (peerConfig, error) -> { - if (!completeExceptionally(future, error)) { - ReplicationPeerConfig newPeerConfig = null; - try { - newPeerConfig = ReplicationPeerConfigUtil - .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); - } catch (ReplicationException e) { - future.completeExceptionally(e); - return; - } - updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { - if (!completeExceptionally(future, error)) { - future.complete(result); - } - }); + addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + ReplicationPeerConfig newPeerConfig = null; + try { + newPeerConfig = ReplicationPeerConfigUtil + .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; } - }); + addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); return future; } @@ -1708,31 +1698,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() { CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>(); - listTableDescriptors().whenComplete( - (tables, error) -> { - if (!completeExceptionally(future, error)) { - List<TableCFs> replicatedTableCFs = new ArrayList<>(); - tables.forEach(table -> { - Map<String, Integer> cfs = new HashMap<>(); - Stream.of(table.getColumnFamilies()) - .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) - .forEach(column -> { - cfs.put(column.getNameAsString(), column.getScope()); - }); - if (!cfs.isEmpty()) { - replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); - } - }); - future.complete(replicatedTableCFs); - } - }); + addListener(listTableDescriptors(), (tables, error) -> { + if (!completeExceptionally(future, error)) { + List<TableCFs> replicatedTableCFs = new ArrayList<>(); + tables.forEach(table -> { + Map<String, Integer> cfs = new HashMap<>(); + Stream.of(table.getColumnFamilies()) + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); + if (!cfs.isEmpty()) { + replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); + } + }); + future.complete(replicatedTableCFs); + } + }); return future; } @Override public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { - SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil - .createHBaseProtosSnapshotDesc(snapshotDesc); + SnapshotProtos.SnapshotDescription snapshot = + ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); try { ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); } catch (IllegalArgumentException e) { @@ -1740,47 +1729,47 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture<Void> future = new CompletableFuture<>(); final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); - this.<Long> newMasterCaller() - .action( - (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, - stub, request, (s, c, req, done) -> s.snapshot(c, req, done), - resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - TimerTask pollingTask = new TimerTask() { - int tries = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + expectedTimeout; - long maxPauseTime = expectedTimeout / maxAttempts; - - @Override - public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() < endTime) { - isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (done) { - future.complete(null); - } else { - // retry again after pauseTime. - long pauseTime = ConnectionUtils.getPauseTime( - TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + addListener(this.<Long> newMasterCaller() + .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, + stub, request, (s, c, req, done) -> s.snapshot(c, req, done), + resp -> resp.getExpectedTimeout())) + .call(), (expectedTimeout, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (done) { + future.complete(null); + } else { + // retry again after pauseTime. + long pauseTime = + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER - .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); + AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, + TimeUnit.MILLISECONDS); } - } ); - } else { - future.completeExceptionally(new SnapshotCreationException("Snapshot '" - + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout - + " ms", snapshotDesc)); - } + }); + } else { + future.completeExceptionally( + new SnapshotCreationException("Snapshot '" + snapshot.getName() + + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc)); } - }; - AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); - }); + } + }; + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + }); return future; } @@ -1806,52 +1795,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) { + public CompletableFuture<Void> restoreSnapshot(String snapshotName, + boolean takeFailSafeSnapshot) { CompletableFuture<Void> future = new CompletableFuture<>(); - listSnapshots(Pattern.compile(snapshotName)).whenComplete( - (snapshotDescriptions, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - TableName tableName = null; - if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { - for (SnapshotDescription snap : snapshotDescriptions) { - if (snap.getName().equals(snapshotName)) { - tableName = snap.getTableName(); - break; - } + addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TableName tableName = null; + if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { + for (SnapshotDescription snap : snapshotDescriptions) { + if (snap.getName().equals(snapshotName)) { + tableName = snap.getTableName(); + break; } } - if (tableName == null) { - future.completeExceptionally(new RestoreSnapshotException( - "Unable to find the table name for snapshot=" + snapshotName)); - return; - } - final TableName finalTableName = tableName; - tableExists(finalTableName) - .whenComplete((exists, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (!exists) { - // if table does not exist, then just clone snapshot into new table. - completeConditionalOnFuture(future, - internalRestoreSnapshot(snapshotName, finalTableName)); + } + if (tableName == null) { + future.completeExceptionally(new RestoreSnapshotException( + "Unable to find the table name for snapshot=" + snapshotName)); + return; + } + final TableName finalTableName = tableName; + addListener(tableExists(finalTableName), (exists, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!exists) { + // if table does not exist, then just clone snapshot into new table. + completeConditionalOnFuture(future, + internalRestoreSnapshot(snapshotName, finalTableName)); + } else { + addListener(isTableDisabled(finalTableName), (disabled, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else if (!disabled) { + future.completeExceptionally(new TableNotDisabledException(finalTableName)); } else { - isTableDisabled(finalTableName).whenComplete( - (disabled, err4) -> { - if (err4 != null) { - future.completeExceptionally(err4); - } else if (!disabled) { - future.completeExceptionally(new TableNotDisabledException(finalTableName)); - } else { - completeConditionalOnFuture(future, - restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); - } - }); + completeConditionalOnFuture(future, + restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); } - } ); + }); + } }); + }); return future; } @@ -1860,49 +1847,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { if (takeFailSafeSnapshot) { CompletableFuture<Void> future = new CompletableFuture<>(); // Step.1 Take a snapshot of the current state - String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get( - HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, - HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); - final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat - .replace("{snapshot.name}", snapshotName) + String failSafeSnapshotSnapshotNameFormat = + this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, + HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); + final String failSafeSnapshotSnapshotName = + failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); - snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> { + addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> { if (err != null) { future.completeExceptionally(err); } else { // Step.2 Restore snapshot - internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> { - if (err2 != null) { - // Step.3.a Something went wrong during the restore and try to rollback. - internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete( - (void3, err3) -> { - if (err3 != null) { - future.completeExceptionally(err3); - } else { - String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" - + failSafeSnapshotSnapshotName + " succeeded."; - future.completeExceptionally(new RestoreSnapshotException(msg)); - } - }); - } else { - // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. - LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); - deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete( - (ret3, err3) -> { - if (err3 != null) { - LOG.error( - "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3); - future.completeExceptionally(err3); - } else { - future.complete(ret3); - } - }); + addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> { + if (err2 != null) { + // Step.3.a Something went wrong during the restore and try to rollback. + addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName), + (void3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + String msg = + "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + + failSafeSnapshotSnapshotName + " succeeded."; + future.completeExceptionally(new RestoreSnapshotException(msg)); + } + }); + } else { + // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. + LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); + addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> { + if (err3 != null) { + LOG.error( + "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, + err3); + future.completeExceptionally(err3); + } else { + future.complete(ret3); + } + }); + } + }); } - } ); - } - } ); + }); return future; } else { return internalRestoreSnapshot(snapshotName, tableName); @@ -1911,7 +1899,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, CompletableFuture<T> parentFuture) { - parentFuture.whenComplete((res, err) -> { + addListener(parentFuture, (res, err) -> { if (err != null) { dependentFuture.completeExceptionally(err); } else { @@ -1923,7 +1911,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) { CompletableFuture<Void> future = new CompletableFuture<>(); - tableExists(tableName).whenComplete((exists, err) -> { + addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); } else if (exists) { @@ -1993,31 +1981,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots( Pattern tableNamePattern, Pattern snapshotNamePattern) { CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); - listTableNames(tableNamePattern, false).whenComplete( - (tableNames, err) -> { - if (err != null) { - future.completeExceptionally(err); + addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (tableNames == null || tableNames.size() <= 0) { + future.complete(Collections.emptyList()); + return; + } + addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); return; } - if (tableNames == null || tableNames.size() <= 0) { + if (snapshotDescList == null || snapshotDescList.isEmpty()) { future.complete(Collections.emptyList()); return; } - getCompletedSnapshots(snapshotNamePattern).whenComplete( - (snapshotDescList, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - return; - } - if (snapshotDescList == null || snapshotDescList.isEmpty()) { - future.complete(Collections.emptyList()); - return; - } - future.complete(snapshotDescList.stream() - .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) - .collect(Collectors.toList())); - }); + future.complete(snapshotDescList.stream() + .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) + .collect(Collectors.toList())); }); + }); return future; } @@ -2064,7 +2050,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern); } CompletableFuture<Void> future = new CompletableFuture<>(); - listSnapshotsFuture.whenComplete(((snapshotDescriptions, err) -> { + addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -2073,12 +2059,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.complete(null); return; } - List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>(); - snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures - .add(internalDeleteSnapshot(snapDesc))); - CompletableFuture.allOf( - deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()])) - .thenAccept(v -> future.complete(v)); + addListener(CompletableFuture.allOf(snapshotDescriptions.stream() + .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> { + if (e != null) { + future.completeExceptionally(e); + } else { + future.complete(v); + } + }); })); return future; } @@ -2100,50 +2088,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { Map<String, String> props) { CompletableFuture<Void> future = new CompletableFuture<>(); ProcedureDescription procDesc = - ProtobufUtil.buildProcedureDescription(signature, instance, props); - this.<Long> newMasterCaller() - .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( - controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), - (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) - .call().whenComplete((expectedTimeout, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - TimerTask pollingTask = new TimerTask() { - int tries = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + expectedTimeout; - long maxPauseTime = expectedTimeout / maxAttempts; - - @Override - public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() < endTime) { - isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - return; - } - if (done) { - future.complete(null); - } else { - // retry again after pauseTime. - long pauseTime = ConnectionUtils - .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); - pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, - TimeUnit.MICROSECONDS); - } - }); - } else { - future.completeExceptionally(new IOException("Procedure '" + signature + " : " - + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); - } + ProtobufUtil.buildProcedureDescription(signature, instance, props); + addListener(this.<Long> newMasterCaller() + .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call( + controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(), + (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout())) + .call(), (expectedTimeout, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + addListener(isProcedureFinished(signature, instance, props), (done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (done) { + future.complete(null); + } else { + // retry again after pauseTime. + long pauseTime = + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, + TimeUnit.MICROSECONDS); + } + }); + } else { + future.completeExceptionally(new IOException("Procedure '" + signature + " : " + + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms")); } - }; - // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. - AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); - }); + } + }; + // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + }); return future; } @@ -2262,15 +2250,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); - future.whenComplete((location, err) -> { + addListener(future, (location, err) -> { if (err != null) { returnedFuture.completeExceptionally(err); return; } if (!location.isPresent() || location.get().getRegion() == null) { - returnedFuture.completeExceptionally(new UnknownRegionException( - "Invalid region name or encoded region name: " - + Bytes.toStringBinary(regionNameOrEncodedRegionName))); + returnedFuture.completeExceptionally( + new UnknownRegionException("Invalid region name or encoded region name: " + + Bytes.toStringBinary(regionNameOrEncodedRegionName))); } else { returnedFuture.complete(location.get()); } @@ -2294,14 +2282,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } if (Bytes.equals(regionNameOrEncodedRegionName, - RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) - || Bytes.equals(regionNameOrEncodedRegionName, + RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) || + Bytes.equals(regionNameOrEncodedRegionName, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO); } CompletableFuture<RegionInfo> future = new CompletableFuture<>(); - getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> { + addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> { if (err != null) { future.completeExceptionally(err); } else { @@ -2343,7 +2331,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { + private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { abstract void onFinished(); @@ -2359,7 +2347,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { protected final TableName tableName; TableProcedureBiConsumer(TableName tableName) { @@ -2384,7 +2372,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { protected final String namespaceName; NamespaceProcedureBiConsumer(String namespaceName) { @@ -2408,7 +2396,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { + private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { CreateTableProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2420,7 +2408,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { + private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { super(tableName); @@ -2450,7 +2438,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { + private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { TruncateTableProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2462,7 +2450,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { + private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer { EnableTableProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2474,7 +2462,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { + private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer { DisableTableProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2486,7 +2474,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { + private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { AddColumnFamilyProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2498,7 +2486,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { + private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { DeleteColumnFamilyProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2510,7 +2498,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { + private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer { ModifyColumnFamilyProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2522,7 +2510,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { + private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { CreateNamespaceProcedureBiConsumer(String namespaceName) { super(namespaceName); @@ -2534,7 +2522,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { + private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { DeleteNamespaceProcedureBiConsumer(String namespaceName) { super(namespaceName); @@ -2546,7 +2534,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { + private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { ModifyNamespaceProcedureBiConsumer(String namespaceName) { super(namespaceName); @@ -2558,7 +2546,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { + private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { MergeTableRegionProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2570,7 +2558,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { + private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer { SplitTableRegionProcedureBiConsumer(TableName tableName) { super(tableName); @@ -2584,7 +2572,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { CompletableFuture<Void> future = new CompletableFuture<>(); - procFuture.whenComplete((procId, error) -> { + addListener(procFuture, (procId, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -2595,30 +2583,33 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { - this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this - .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( - controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), - (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) - .call().whenComplete((response, error) -> { - if (error != null) { - LOG.warn("failed to get the procedure result procId={}", procId, - ConnectionUtils.translateException(error)); - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), - ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); - return; - } - if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), - ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); - return; - } - if (response.hasException()) { - IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); - future.completeExceptionally(ioe); - } else { - future.complete(null); - } - }); + addListener( + this.<GetProcedureResultResponse> newMasterCaller() + .action((controller, stub) -> this + .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call( + controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), + (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) + .call(), + (response, error) -> { + if (error != null) { + LOG.warn("failed to get the procedure result procId={}", procId, + ConnectionUtils.translateException(error)); + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.getState() == GetProcedureResultResponse.State.RUNNING) { + retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + return; + } + if (response.hasException()) { + IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); + future.completeExceptionally(ioe); + } else { + future.complete(null); + } + }); } private <T> CompletableFuture<T> failedFuture(Throwable error) { @@ -2700,24 +2691,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> updateConfiguration() { CompletableFuture<Void> future = new CompletableFuture<Void>(); - getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)) - .whenComplete((status, err) -> { + addListener( + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)), + (status, err) -> { if (err != null) { future.completeExceptionally(err); } else { List<CompletableFuture<Void>> futures = new ArrayList<>(); status.getLiveServerMetrics().keySet() - .forEach(server -> futures.add(updateConfiguration(server))); + .forEach(server -> futures.add(updateConfiguration(server))); futures.add(updateConfiguration(status.getMasterName())); status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) - .whenComplete((result, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(result); - } - }); + addListener( + CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), + (result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); } }); return future; @@ -2800,88 +2793,87 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { switch (compactType) { case MOB: - connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + addListener(connection.registry.getMasterAddress(), (serverName, err) -> { if (err != null) { future.completeExceptionally(err); return; } RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); - this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action( - (controller, stub) -> this - .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( + addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName) + .action((controller, stub) -> this + .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( controller, stub, RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), - (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp) - ).call().whenComplete((resp2, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - if (resp2.hasCompactionState()) { - future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); + (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)) + .call(), (resp2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); } else { - future.complete(CompactionState.NONE); + if (resp2.hasCompactionState()) { + future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); + } else { + future.complete(CompactionState.NONE); + } } - } - }); + }); }); break; case NORMAL: - getTableHRegionLocations(tableName).whenComplete( - (locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - List<CompactionState> regionStates = new ArrayList<>(); - List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); - locations.stream().filter(loc -> loc.getServerName() != null) - .filter(loc -> loc.getRegion() != null) - .filter(loc -> !loc.getRegion().isOffline()) - .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { - futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { - // If any region compaction state is MAJOR_AND_MINOR - // the table compaction state is MAJOR_AND_MINOR, too. - if (err2 != null) { - future.completeExceptionally(err2); - } else if (regionState == CompactionState.MAJOR_AND_MINOR) { - future.complete(regionState); - } else { - regionStates.add(regionState); - } - })); - }); - CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) - .whenComplete((ret, err3) -> { - // If future not completed, check all regions's compaction state - if (!future.isCompletedExceptionally() && !future.isDone()) { - CompactionState state = CompactionState.NONE; - for (CompactionState regionState : regionStates) { - switch (regionState) { - case MAJOR: - if (state == CompactionState.MINOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); - } else { - state = CompactionState.MAJOR; - } - break; - case MINOR: - if (state == CompactionState.MAJOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); - } else { - state = CompactionState.MINOR; - } - break; - case NONE: - default: + addListener(getTableHRegionLocations(tableName), (locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List<CompactionState> regionStates = new ArrayList<>(); + List<CompletableFuture<CompactionState>> futures = new ArrayList<>(); + locations.stream().filter(loc -> loc.getServerName() != null) + .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline()) + .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { + futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { + // If any region compaction state is MAJOR_AND_MINOR + // the table compaction state is MAJOR_AND_MINOR, too. + if (err2 != null) { + future.completeExceptionally(err2); + } else if (regionState == CompactionState.MAJOR_AND_MINOR) { + future.complete(regionState); + } else { + regionStates.add(regionState); + } + })); + }); + addListener( + CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])), + (ret, err3) -> { + // If future not completed, check all regions's compaction state + if (!future.isCompletedExceptionally() && !future.isDone()) { + CompactionState state = CompactionState.NONE; + for (CompactionState regionState : regionStates) { + switch (regionState) { + case MAJOR: + if (state == CompactionState.MINOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MAJOR; } - if (!future.isDone()) { - future.complete(state); + break; + case MINOR: + if (state == CompactionState.MAJOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MINOR; } - } + break; + case NONE: + default: } - }); - }); + if (!future.isDone()) { + future.complete(state); + } + } + } + }); + }); break; default: throw new IllegalArgumentException("Unknown compactType: " + compactType); @@ -2893,37 +2885,38 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) { CompletableFuture<CompactionState> future = new CompletableFuture<>(); - getRegionLocation(regionName).whenComplete( - (location, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; - } + addListener(getRegionLocation(regionName), (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + addListener( this.<GetRegionInfoResponse> newAdminCaller() - .action( - (controller, stub) -> this - .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall( - controller, stub, RequestConverter.buildGetRegionInfoRequest(location - .getRegion().getRegionName(), true), (s, c, req, done) -> s - .getRegionInfo(controller, req, done), resp -> resp)) - .serverName(serverName).call().whenComplete((resp2, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - if (resp2.hasCompactionState()) { - future.complete(
<TRUNCATED>