This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit cee2c7a66f494c3e761ccdbee6a9c1353925a961 Author: Kadir <kozde...@salesforce.com> AuthorDate: Wed Oct 23 22:38:13 2019 -0700 PHOENIX-5478 IndexTool mapper task should not timeout --- .../org/apache/phoenix/end2end/IndexToolIT.java | 2 +- .../coprocessor/BaseScannerRegionObserver.java | 2 + .../UngroupedAggregateRegionObserver.java | 220 +++++++++++---------- .../apache/phoenix/index/GlobalIndexChecker.java | 4 - .../PhoenixServerBuildIndexInputFormat.java | 2 + .../apache/phoenix/mapreduce/index/IndexTool.java | 12 -- .../org/apache/phoenix/query/QueryServices.java | 2 + .../apache/phoenix/query/QueryServicesOptions.java | 3 +- 8 files changed, 130 insertions(+), 117 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 2f12ae9..8af5295 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -80,7 +80,6 @@ import com.google.common.collect.Maps; @RunWith(Parameterized.class) public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { - private final boolean localIndex; private final boolean mutable; private final boolean transactional; @@ -118,6 +117,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5)); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index b73615f..cb4d0af 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -75,6 +75,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; + // The number of index rows to be rebuild in one RPC call + public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging"; /* * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. * Needed for backward compatibility purposes. TODO: get rid of this in next major release. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 3a03f94..0a16a68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; @@ -1056,116 +1057,137 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver throw new RuntimeException(e); } } - - private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, - Configuration config) throws IOException { - byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); - boolean useProto = true; - // for backward compatibility fall back to look up by the old attribute - if (indexMetaData == null) { - useProto = false; - indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + + private class IndexRebuildRegionScanner extends BaseRegionScanner { + private long pageSizeInRows = Long.MAX_VALUE; + private boolean hasMore; + private final int maxBatchSize; + private MutationList mutations; + private final long maxBatchSizeBytes; + private final long blockingMemstoreSize; + private final byte[] clientVersionBytes; + private List<Cell> results = new ArrayList<Cell>(); + private byte[] indexMetaData; + private boolean useProto = true; + private Scan scan; + private RegionScanner innerScanner; + final Region region; + + IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan, + final Configuration config) { + super(innerScanner); + if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) { + pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS); + } + + maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + mutations = new MutationList(maxBatchSize); + maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + blockingMemstoreSize = getBlockingMemstoreSize(region, config); + clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); + if (indexMetaData == null) { + useProto = false; + indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + } + this.scan = scan; + this.innerScanner = innerScanner; + this.region = region; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); - boolean hasMore; - int rowCount = 0; - try { - int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); - final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); - MutationList mutations = new MutationList(maxBatchSize); - region.startRegionOperation(); - byte[] uuidValue = ServerCacheClient.generateId(); - synchronized (innerScanner) { - do { - List<Cell> results = new ArrayList<Cell>(); - hasMore = innerScanner.nextRaw(results); - if (!results.isEmpty()) { - Put put = null; - Delete del = null; - for (Cell cell : results) { - - if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { - if (put == null) { - put = new Put(CellUtil.cloneRow(cell)); - put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); - put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); - put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); - mutations.add(put); - // Since we're replaying existing mutations, it makes no sense to write them to the wal - put.setDurability(Durability.SKIP_WAL); - } - put.add(cell); - } else { - if (del == null) { - del = new Delete(CellUtil.cloneRow(cell)); - del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); - del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); - del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); - mutations.add(del); - // Since we're replaying existing mutations, it makes no sense to write them to the wal - del.setDurability(Durability.SKIP_WAL); + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { return hasMore; } + + @Override + public void close() throws IOException { innerScanner.close(); } + + private void setMutationAttributes(Mutation m, byte[] uuidValue) { + m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); + m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); + m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes); + // Since we're replaying existing mutations, it makes no sense to write them to the wal + m.setDurability(Durability.SKIP_WAL); + } + + @Override + public boolean next(List<Cell> results) throws IOException { + int rowCount = 0; + try { + byte[] uuidValue = ServerCacheClient.generateId(); + synchronized (innerScanner) { + do { + List<Cell> row = new ArrayList<Cell>(); + hasMore = innerScanner.nextRaw(row); + if (!row.isEmpty()) { + Put put = null; + Delete del = null; + for (Cell cell : row) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + setMutationAttributes(put, uuidValue); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + setMutationAttributes(del, uuidValue); + mutations.add(del); + } + del.addDeleteMarker(cell); } - del.addDeleteMarker(cell); } + if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { + checkForRegionClosingOrSplitting(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); + uuidValue = ServerCacheClient.generateId(); + mutations.clear(); + } + rowCount++; } - if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - checkForRegionClosingOrSplitting(); - commitBatchWithRetries(region, mutations, blockingMemstoreSize); - uuidValue = ServerCacheClient.generateId(); - mutations.clear(); - } - rowCount++; + + } while (hasMore && rowCount < pageSizeInRows); + if (!mutations.isEmpty()) { + checkForRegionClosingOrSplitting(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); } - - } while (hasMore); - if (!mutations.isEmpty()) { - checkForRegionClosingOrSplitting(); - commitBatchWithRetries(region, mutations, blockingMemstoreSize); + } + } catch (IOException e) { + hasMore = false; + LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e)); + throw e; + } finally { + if (!hasMore) { + region.closeRegionOperation(); } } - } catch (IOException e) { - LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e)); - throw e; - } finally { - region.closeRegionOperation(); + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + results.add(aggKeyValue); + return hasMore; } - byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); - final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, - SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); - - RegionScanner scanner = new BaseRegionScanner(innerScanner) { - @Override - public HRegionInfo getRegionInfo() { - return region.getRegionInfo(); - } - - @Override - public boolean isFilterDone() { - return true; - } - @Override - public void close() throws IOException { - innerScanner.close(); - } + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } + } - @Override - public boolean next(List<Cell> results) throws IOException { - results.add(aggKeyValue); - return false; - } + private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, + final Configuration config) throws IOException { - @Override - public long getMaxResultSize() { - return scan.getMaxResultSize(); - } - }; + region.startRegionOperation(); + RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config); return scanner; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 1b3b9c3..80e036e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -253,10 +253,6 @@ public class GlobalIndexChecker extends BaseRegionObserver { return result.getRow(); } }; - for (Cell cell : result.rawCells()) { - String cellString = cell.toString(); - LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell))); - } byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp); if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length, indexRowKey, 0, indexRowKey.length) != 0) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java index 1abcef4..7e61b2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; /** * {@link InputFormat} implementation from Phoenix for building index @@ -90,6 +91,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph try { scan.setTimeRange(0, scn); + scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES); } catch (IOException e) { throw new SQLException(e); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index dbaeead..22013e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -500,17 +500,6 @@ public class IndexTool extends Configured implements Tool { private Job configureJobForServerBuildIndex() throws Exception { - long indexRebuildQueryTimeoutMs = - configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, - QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); - // Set various phoenix and hbase level timeouts and rpc retries - configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, - Long.toString(indexRebuildQueryTimeoutMs)); - configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - Long.toString(indexRebuildQueryTimeoutMs)); - configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, - Long.toString(indexRebuildQueryTimeoutMs)); - PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable); PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); @@ -526,7 +515,6 @@ public class IndexTool extends Configured implements Tool { fs = outputPath.getFileSystem(configuration); fs.delete(outputPath, true); } - configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs)); final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index ffeec51..93e218e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -352,6 +352,8 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled"; // Enable support for long view index(default is false) public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled"; + // The number of index rows to be rebuild in one RPC call + public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows"; // Before 4.15 when we created a view we included the parent table column metadata in the view diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index eb4cf7c..fe51f89 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -209,7 +209,7 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins // 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries - public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = 60000 * 60 * 24; // 24 hrs + public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100; public static final long DEFAULT_INDEX_REBUILD_RPC_TIMEOUT = 30000 * 60; // 30 mins public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level @@ -358,6 +358,7 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; + public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 1024*1024; public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;