PHOENIX-3280 Automatic attempt to rebuild all disabled index

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a223adf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a223adf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a223adf

Branch: refs/heads/calcite
Commit: 2a223adfbeb32f598308da1dc6d3251ee0980d79
Parents: 27697b3
Author: James Taylor <jamestay...@apache.org>
Authored: Thu Sep 15 00:48:24 2016 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Thu Sep 15 00:48:24 2016 -0700

----------------------------------------------------------------------
 .../coprocessor/MetaDataRegionObserver.java     | 179 +++++++++++--------
 1 file changed, 104 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a223adf/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index f1dc982..00981f5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,8 +20,8 @@ package org.apache.phoenix.coprocessor;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -79,6 +79,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 
 /**
@@ -223,13 +224,11 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
 
-                PTable dataPTable = null;
+                Map<PTable, List<PTable>> dataTableToIndexesMap = null;
                 MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
-                List<PTable> indexesToPartiallyRebuild = 
Collections.emptyList();
                 scanner = this.env.getRegion().getScanner(scan);
-                long earliestDisableTimestamp = Long.MAX_VALUE;
 
                 do {
                     results.clear();
@@ -249,19 +248,12 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     if (disabledTimeStampVal <= 0) {
                         continue;
                     }
-                    if (disabledTimeStampVal < earliestDisableTimestamp) {
-                        earliestDisableTimestamp = disabledTimeStampVal;
-                    }
-
                     byte[] dataTable = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                    byte[] indexStat = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    byte[] indexState = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
-                    if ((dataTable == null || dataTable.length == 0) || 
(indexStat == null || indexStat.length == 0)
-                            || (dataPTable != null
-                                    && 
Bytes.compareTo(dataPTable.getName().getBytes(), dataTable) != 0)) {
+                    if ((dataTable == null || dataTable.length == 0) || 
(indexState == null || indexState.length == 0)) {
                         // data table name can't be empty
-                        // we need to build indexes of same data table. so 
skip other indexes for this task.
                         continue;
                     }
 
@@ -284,14 +276,19 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                        // don't run a second index populations upsert select 
                         
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = QueryUtil.getConnectionOnServer(props, 
env.getConfiguration()).unwrap(PhoenixConnection.class);
-                        String dataTableFullName = 
SchemaUtil.getTableName(schemaName, dataTable);
-                        dataPTable = PhoenixRuntime.getTable(conn, 
dataTableFullName);
-                        indexesToPartiallyRebuild = 
Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
                         client = new MetaDataClient(conn);
+                        dataTableToIndexesMap = Maps.newHashMap();
                     }
+                    String dataTableFullName = 
SchemaUtil.getTableName(schemaName, dataTable);
+                    PTable dataPTable = PhoenixRuntime.getTableNoCache(conn, 
dataTableFullName);
 
                     String indexTableFullName = 
SchemaUtil.getTableName(schemaName, indexTable);
-                    PTable indexPTable = PhoenixRuntime.getTable(conn, 
indexTableFullName);
+                    PTable indexPTable = PhoenixRuntime.getTableNoCache(conn, 
indexTableFullName);
+                    // Sanity check in case index was removed from table
+                    if (!dataPTable.getIndexes().contains(indexPTable)) {
+                        continue;
+                    }
+                    
                     if 
(!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                         LOG.debug("Index rebuild has been skipped because not 
all regions of index table="
                                 + indexPTable.getName() + " are online.");
@@ -299,82 +296,114 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     }
                     // Allow index to begin incremental maintenance as index 
is back online and we
                     // cannot transition directly from DISABLED -> ACTIVE
-                    if 
(Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) {
+                    if 
(Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) {
                         AlterIndexStatement statement = new 
AlterIndexStatement(
                                 
NamedTableNode.create(indexPTable.getSchemaName().getString(), 
indexPTable.getTableName().getString()),
                                 dataPTable.getTableName().getString(),
                                 false, PIndexState.INACTIVE);
                         client.alterIndex(statement);
                     }
+                    List<PTable> indexesToPartiallyRebuild = 
dataTableToIndexesMap.get(dataPTable);
+                    if (indexesToPartiallyRebuild == null) {
+                        indexesToPartiallyRebuild = 
Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+                        dataTableToIndexesMap.put(dataPTable, 
indexesToPartiallyRebuild);
+                    }
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
 
-                if (!indexesToPartiallyRebuild.isEmpty()) {
+                if (dataTableToIndexesMap != null) {
                     long overlapTime = env.getConfiguration().getLong(
-                        
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
-                        
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    long timeStamp = Math.max(0, earliestDisableTimestamp - 
overlapTime);
-                    
-                    LOG.info("Starting to build indexes=" + 
indexesToPartiallyRebuild + " from timestamp=" + timeStamp);
-                    new Scan();
-                    List<IndexMaintainer> maintainers = 
Lists.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
-                    for (PTable index : indexesToPartiallyRebuild) {
-                        maintainers.add(index.getIndexMaintainer(dataPTable, 
conn));
-                    }
-                    Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(maintainers);
-                    dataTableScan.setTimeRange(timeStamp, 
HConstants.LATEST_TIMESTAMP);
-                    byte[] physicalTableName = 
dataPTable.getPhysicalName().getBytes();
-                    try (HTableInterface dataHTable = 
conn.getQueryServices().getTable(physicalTableName)) {
-                        Result result;
-                        try (ResultScanner dataTableScanner = 
dataHTable.getScanner(dataTableScan)) {
-                            int batchSize = conn.getMutateBatchSize();
-                            List<Mutation> mutations = 
Lists.newArrayListWithExpectedSize(batchSize);
-                            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
-                            IndexMaintainer.serializeAdditional(dataPTable, 
indexMetaDataPtr, indexesToPartiallyRebuild, conn);
-                            byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                            byte[] uuidValue = ServerCacheClient.generateId();
-        
-                            while ((result = dataTableScanner.next()) != null 
&& !result.isEmpty()) {
-                                Put put = null;
-                                Delete del = null;
-                                for (Cell cell : result.rawCells()) {
-                                    if 
(KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                        if (put == null) {
-                                            put = new 
Put(CellUtil.cloneRow(cell));
-                                            
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                            
put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                            
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
-                                            mutations.add(put);
+                            
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+                            
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+                    for (Map.Entry<PTable, List<PTable>> entry : 
dataTableToIndexesMap.entrySet()) {
+                        PTable dataPTable = entry.getKey();
+                        List<PTable> indexesToPartiallyRebuild = 
entry.getValue();
+                        try {
+                            long earliestDisableTimestamp = Long.MAX_VALUE;
+                            List<IndexMaintainer> maintainers = Lists
+                                    
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+                            for (PTable index : indexesToPartiallyRebuild) {
+                                long disabledTimeStampVal = 
index.getIndexDisableTimestamp();
+                                if (disabledTimeStampVal > 0) {
+                                    if (disabledTimeStampVal < 
earliestDisableTimestamp) {
+                                        earliestDisableTimestamp = 
disabledTimeStampVal;
+                                    }
+    
+                                    
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+                                }
+                            }
+                            // No indexes are disabled, so skip this table
+                            if (earliestDisableTimestamp == Long.MAX_VALUE) {
+                                continue;
+                            }
+
+                            long timeStamp = Math.max(0, 
earliestDisableTimestamp - overlapTime);
+                            LOG.info("Starting to build " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
+                                    + " from timestamp=" + timeStamp);
+                            Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(maintainers);
+                            dataTableScan.setTimeRange(timeStamp, 
HConstants.LATEST_TIMESTAMP);
+                            byte[] physicalTableName = 
dataPTable.getPhysicalName().getBytes();
+                            try (HTableInterface dataHTable = 
conn.getQueryServices().getTable(physicalTableName)) {
+                                Result result;
+                                try (ResultScanner dataTableScanner = 
dataHTable.getScanner(dataTableScan)) {
+                                    int batchSize = conn.getMutateBatchSize();
+                                    List<Mutation> mutations = 
Lists.newArrayListWithExpectedSize(batchSize);
+                                    ImmutableBytesWritable indexMetaDataPtr = 
new ImmutableBytesWritable(
+                                            ByteUtil.EMPTY_BYTE_ARRAY);
+                                    
IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
+                                            indexesToPartiallyRebuild, conn);
+                                    byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                                    byte[] uuidValue = 
ServerCacheClient.generateId();
+
+                                    while ((result = dataTableScanner.next()) 
!= null && !result.isEmpty()) {
+                                        Put put = null;
+                                        Delete del = null;
+                                        for (Cell cell : result.rawCells()) {
+                                            if 
(KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                                if (put == null) {
+                                                    put = new 
Put(CellUtil.cloneRow(cell));
+                                                    
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                                    
put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                                    
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                                            
PDataType.TRUE_BYTES);
+                                                    mutations.add(put);
+                                                }
+                                                put.add(cell);
+                                            } else {
+                                                if (del == null) {
+                                                    del = new 
Delete(CellUtil.cloneRow(cell));
+                                                    
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                                    
del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                                    
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                                            
PDataType.TRUE_BYTES);
+                                                    mutations.add(del);
+                                                }
+                                                del.addDeleteMarker(cell);
+                                            }
                                         }
-                                        put.add(cell);
-                                    } else {
-                                        if (del == null) {
-                                            del = new 
Delete(CellUtil.cloneRow(cell));
-                                            
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                            
del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                            
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, 
PDataType.TRUE_BYTES);
-                                            mutations.add(del);
+                                        if (mutations.size() == batchSize) {
+                                            dataHTable.batch(mutations);
+                                            uuidValue = 
ServerCacheClient.generateId();
+                                            mutations.clear();
                                         }
-                                        del.addDeleteMarker(cell);
                                     }
-                                }
-                                if (mutations.size() == batchSize) {
-                                    dataHTable.batch(mutations);
-                                    uuidValue = ServerCacheClient.generateId();
+                                    if (!mutations.isEmpty()) {
+                                        dataHTable.batch(mutations);
+                                    }
                                 }
                             }
-                            if (!mutations.isEmpty()) {
-                                dataHTable.batch(mutations);
+                            for (PTable indexPTable : 
indexesToPartiallyRebuild) {
+                                AlterIndexStatement statement = new 
AlterIndexStatement(
+                                        
NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable
+                                                .getTableName().getString()), 
dataPTable.getTableName().getString(),
+                                        false, PIndexState.ACTIVE);
+                                client.alterIndex(statement);
                             }
+                        } catch (Exception e) { // Log, but try next table's 
indexes
+                            LOG.warn("Unable to rebuild " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
+                                    + ". Will try again next on next scheduled 
invocation.", e);
                         }
                     }
-                    for (PTable indexPTable : indexesToPartiallyRebuild) {
-                        AlterIndexStatement statement = new 
AlterIndexStatement(
-                                
NamedTableNode.create(indexPTable.getSchemaName().getString(), 
indexPTable.getTableName().getString()),
-                                dataPTable.getTableName().getString(),
-                                false, PIndexState.ACTIVE);
-                        client.alterIndex(statement);
-                    }
                 }
             } catch (Throwable t) {
                 LOG.warn("ScheduledBuildIndexTask failed!", t);

Reply via email to