This is an automated email from the ASF dual-hosted git repository.

pboado pushed a commit to branch 5.x-cdh6
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit e62d06126733ccf71c256448dd7e0580e25ce411
Author: Gokcen Iskender <gisken...@salesforce.com>
AuthorDate: Fri Feb 15 22:07:01 2019 +0000

    PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
    
    Signed-off-by: Geoffrey Jacoby <gjac...@apache.org>
---
 .../phoenix/end2end/IndexScrutinyToolIT.java       | 1398 +++++++++++---------
 .../mapreduce/index/IndexScrutinyMapper.java       |    2 +
 .../mapreduce/index/IndexScrutinyTableOutput.java  |    1 +
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |   61 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |    5 +-
 .../phoenix/mapreduce/util/IndexColumnNames.java   |    3 +
 6 files changed, 820 insertions(+), 650 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 692a98c..046c3f0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -46,9 +46,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -58,6 +61,7 @@ import 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
 import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -70,6 +74,7 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -81,681 +86,850 @@ import com.google.common.collect.Sets;
  * Tests for the {@link IndexScrutinyTool}
  */
 @Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class IndexScrutinyToolIT extends BaseTest {
+@RunWith(Enclosed.class)
+public class IndexScrutinyToolIT {
+
+    abstract public static class SharedIndexToolIT extends BaseTest {
+        protected String outputDir;
+
+        @BeforeClass public static void doSetup() throws Exception {
+            Map<String, String> serverProps = Maps.newHashMap();
+            //disable major compactions
+            serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+            Map<String, String> clientProps = Maps.newHashMap();
+            setUpTestDriver(new 
ReadOnlyProps(serverProps.entrySet().iterator()),
+                    new ReadOnlyProps(clientProps.entrySet().iterator()));
+        }
 
-    private String dataTableDdl;
-    private String indexTableDdl;
+        protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+            IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+            Configuration conf = new 
Configuration(getUtility().getConfiguration());
+            scrutiny.setConf(conf);
+            int status = scrutiny.run(cmdArgs);
+            assertEquals(0, status);
+            for (Job job : scrutiny.getJobs()) {
+                assertTrue(job.waitForCompletion(true));
+            }
+            return scrutiny.getJobs();
+        }
 
-    private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
+        protected String[] getArgValues(String schemaName, String dataTable, 
String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, 
OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long 
scrutinyTs) {
+            final List<String> args = Lists.newArrayList();
+            if (schemaName != null) {
+                args.add("-s");
+                args.add(schemaName);
+            }
+            args.add("-dt");
+            args.add(dataTable);
+            args.add("-it");
+            args.add(indxTable);
+
+            // TODO test snapshot reads
+            // if(useSnapshot) {
+            // args.add("-snap");
+            // }
+
+            if (OutputFormat.FILE.equals(outputFormat)) {
+                args.add("-op");
+                outputDir = "/tmp/" + UUID.randomUUID().toString();
+                args.add(outputDir);
+            }
 
-    private static final String INDEX_UPSERT_SQL =
-        "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") 
values (?,?,?,?)";
+            args.add("-t");
+            args.add(String.valueOf(scrutinyTs));
+            args.add("-run-foreground");
+            if (batchSize != null) {
+                args.add("-b");
+                args.add(String.valueOf(batchSize));
+            }
 
-    private static final String DELETE_SQL = "DELETE FROM %s ";
+            // default to using data table as the source table
+            args.add("-src");
+            if (sourceTable == null) {
+                args.add(SourceTable.DATA_TABLE_SOURCE.name());
+            } else {
+                args.add(sourceTable.name());
+            }
+            if (outputInvalidRows) {
+                args.add("-o");
+            }
+            if (outputFormat != null) {
+                args.add("-of");
+                args.add(outputFormat.name());
+            }
+            if (maxOutputRows != null) {
+                args.add("-om");
+                args.add(maxOutputRows.toString());
+            }
+            if (tenantId != null) {
+                args.add("-tenant");
+                args.add(tenantId);
+            }
+            return args.toArray(new String[0]);
+        }
 
-    private String schemaName;
-    private String dataTableName;
-    private String dataTableFullName;
-    private String indexTableName;
-    private String indexTableFullName;
-    private String outputDir;
+        protected long getCounterValue(Counters counters, 
Enum<PhoenixScrutinyJobCounters> counter) {
+            return counters.findCounter(counter).getValue();
+        }
+    }
 
-    private Connection conn;
+    @RunWith(Parameterized.class) public static class 
IndexScrutinyToolNonTenantIT extends SharedIndexToolIT {
 
-    private PreparedStatement dataTableUpsertStmt;
+        private String dataTableDdl;
+        private String indexTableDdl;
 
-    private PreparedStatement indexTableUpsertStmt;
+        private static final String UPSERT_SQL = "UPSERT INTO %s 
VALUES(?,?,?,?)";
 
-    private long testTime;
-    private Properties props;
+        private static final String
+                INDEX_UPSERT_SQL =
+                "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", 
\"0:EMPLOY_DATE\") values (?,?,?,?)";
 
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s 
ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE 
INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE 
LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }
-            });
-    }
+        private static final String DELETE_SQL = "DELETE FROM %s ";
 
-    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
-        this.dataTableDdl = dataTableDdl;
-        this.indexTableDdl = indexTableDdl;
-    }
+        private String schemaName;
+        private String dataTableName;
+        private String dataTableFullName;
+        private String indexTableName;
+        private String indexTableFullName;
 
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMap();
-        //disable major compactions
-        serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
-        Map<String, String> clientProps = Maps.newHashMap();
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
-    }
+        private Connection conn;
 
-    /**
-     * Create the test data and index tables
-     */
-    @Before
-    public void setup() throws SQLException {
-        generateUniqueTableNames();
-        createTestTable(getUrl(), String.format(dataTableDdl, 
dataTableFullName));
-        createTestTable(getUrl(),
-            String.format(indexTableDdl, indexTableName, dataTableFullName));
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl(), props);
-        String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
-        dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
-        String indexTableUpsert = String.format(INDEX_UPSERT_SQL, 
indexTableFullName);
-        indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
-        conn.setAutoCommit(false);
-        testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+        private PreparedStatement dataTableUpsertStmt;
 
-    }
+        private PreparedStatement indexTableUpsertStmt;
 
-    @After
-    public void teardown() throws SQLException {
-        if (conn != null) {
-            conn.close();
+        private long testTime;
+        private Properties props;
+
+        @Parameterized.Parameters public static Collection<Object[]> data() {
+            return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID 
INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, 
EMPLOYER VARCHAR)",
+                    "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE 
(ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+                    "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" 
}, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP 
INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+                    "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE 
(ZIP)" } });
         }
-    }
 
-    /**
-     * Tests a data table that is correctly indexed. Scrutiny should report 
all rows as valid.
-     */
-    @Test
-    public void testValidIndex() throws Exception {
-        // insert two rows
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        int numDataRows = countRows(dataTableFullName);
-        int numIndexRows = countRows(indexTableFullName);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
-        // make sure row counts weren't modified by scrutiny
-        assertEquals(numDataRows, countRows(dataTableFullName));
-        assertEquals(numIndexRows, countRows(indexTableFullName));
-    }
+        public IndexScrutinyToolNonTenantIT(String dataTableDdl, String 
indexTableDdl) {
+            this.dataTableDdl = dataTableDdl;
+            this.indexTableDdl = indexTableDdl;
+        }
 
-    /**
-     * Tests running a scrutiny while updates and deletes are happening.
-     * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
-     */
-    @Test
-    @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS 
scanner")
-    public void testScrutinyWhileTakingWrites() throws Exception {
-        int id = 0;
-        while (id < 1000) {
-            int index = 1;
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setString(index++, "name-" + id);
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
-            dataTableUpsertStmt.executeUpdate();
-            id++;
-        }
-        conn.commit();
-
-        //CURRENT_SCN for scrutiny
-        long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
-
-        // launch background upserts and deletes
-        final Random random = new Random(0);
-        Runnable backgroundUpserts = new Runnable() {
-            @Override
-            public void run() {
-                int idToUpsert = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
-                    PreparedStatement dataPS =
-                            conn.prepareStatement(String.format(UPSERT_SQL, 
dataTableFullName));
-                    upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, 
idToUpsert + 1000);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        /**
+         * Create the test data and index tables
+         */
+        @Before public void setup() throws SQLException {
+            generateUniqueTableNames();
+            createTestTable(getUrl(), String.format(dataTableDdl, 
dataTableFullName));
+            createTestTable(getUrl(), String.format(indexTableDdl, 
indexTableName, dataTableFullName));
+            props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+            String dataTableUpsert = String.format(UPSERT_SQL, 
dataTableFullName);
+            dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+            String indexTableUpsert = String.format(INDEX_UPSERT_SQL, 
indexTableFullName);
+            indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+            conn.setAutoCommit(false);
+            testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
+        }
+
+        @After public void teardown() throws SQLException {
+            if (conn != null) {
+                conn.close();
             }
-        };
-        Runnable backgroundDeletes = new Runnable() {
-            @Override
-            public void run() {
-                int idToDelete = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
-                    String deleteSql =
-                            String.format(DELETE_SQL, indexTableFullName) + 
"WHERE \":ID\"="
-                                    + idToDelete;
-                    conn.createStatement().executeUpdate(deleteSql);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        }
+
+        /**
+         * Tests a data table that is correctly indexed. Scrutiny should 
report all rows as valid.
+         */
+        @Test public void testValidIndex() throws Exception {
+            // insert two rows
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            int numDataRows = countRows(dataTableFullName);
+            int numIndexRows = countRows(indexTableFullName);
+
+            // scrutiny should report everything as ok
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+            // make sure row counts weren't modified by scrutiny
+            assertEquals(numDataRows, countRows(dataTableFullName));
+            assertEquals(numIndexRows, countRows(indexTableFullName));
+        }
+
+        /**
+         * Tests running a scrutiny while updates and deletes are happening.
+         * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+         */
+        @Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true 
on RS scanner")
+        public void testScrutinyWhileTakingWrites() throws Exception {
+            int id = 0;
+            while (id < 1000) {
+                int index = 1;
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setString(index++, "name-" + id);
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setTimestamp(index++, new 
Timestamp(testTime));
+                dataTableUpsertStmt.executeUpdate();
+                id++;
             }
-        };
-        ScheduledExecutorService scheduledThreadPool = 
Executors.newScheduledThreadPool(2);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
-            TimeUnit.MILLISECONDS);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
-            TimeUnit.MILLISECONDS);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs =
-                runScrutinyCurrentSCN(schemaName, dataTableName, 
indexTableName,
-                    scrutinyTS);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-        scheduledThreadPool.shutdown();
-        scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-    }
+            conn.commit();
+
+            //CURRENT_SCN for scrutiny
+            long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+            // launch background upserts and deletes
+            final Random random = new Random(0);
+            Runnable backgroundUpserts = new Runnable() {
+                @Override public void run() {
+                    int idToUpsert = random.nextInt(1000);
+                    try (Connection conn = 
DriverManager.getConnection(getUrl(), props)) {
+                        PreparedStatement dataPS =
+                                
conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+                        upsertRow(dataPS, idToUpsert, "modified-" + 
idToUpsert, idToUpsert + 1000);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Runnable backgroundDeletes = new Runnable() {
+                @Override public void run() {
+                    int idToDelete = random.nextInt(1000);
+                    try (Connection conn = 
DriverManager.getConnection(getUrl(), props)) {
+                        String deleteSql =
+                                String.format(DELETE_SQL, indexTableFullName) 
+ "WHERE \":ID\"="
+                                        + idToDelete;
+                        conn.createStatement().executeUpdate(deleteSql);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            ScheduledExecutorService scheduledThreadPool = 
Executors.newScheduledThreadPool(2);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 
200, TimeUnit.MILLISECONDS);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 
200, TimeUnit.MILLISECONDS);
+
+            // scrutiny should report everything as ok
+            List<Job> completedJobs =
+                    runScrutinyCurrentSCN(schemaName, dataTableName, 
indexTableName, scrutinyTS);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            scheduledThreadPool.shutdown();
+            scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+        }
 
-    /**
-     * Tests an index with the same # of rows as the data table, but one of 
the index rows is
-     * incorrect Scrutiny should report the invalid rows.
-     */
-    @Test
-    public void testEqualRowCountIndexIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+        /**
+         * Tests an index with the same # of rows as the data table, but one 
of the index rows is
+         * incorrect Scrutiny should report the invalid rows.
+         */
+        @Test public void testEqualRowCountIndexIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
 
-    /**
-     * Tests an index where the index pk is correct (indexed col values are 
indexed correctly), but
-     * a covered index value is incorrect. Scrutiny should report the invalid 
row
-     */
-    @Test
-    public void testCoveredValueIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable index and insert another data row
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad index row for the above data row
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
-    }
+        /**
+         * Tests an index where the index pk is correct (indexed col values 
are indexed correctly), but
+         * a covered index value is incorrect. Scrutiny should report the 
invalid row
+         */
+        @Test public void testCoveredValueIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable index and insert another data row
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad index row for the above data row
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, 
BAD_COVERED_COL_VAL_COUNT));
+        }
 
-    /**
-     * Test batching of row comparisons Inserts 1001 rows, with some random 
bad rows, and runs
-     * scrutiny with batchsize of 10,
-     */
-    @Test
-    public void testBatching() throws Exception {
-        // insert 1001 data and index rows
-        int numTestRows = 1001;
-        for (int i = 0; i < numTestRows; i++) {
-            upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
-        }
-        conn.commit();
-
-        disableIndex();
-
-        // randomly delete some rows from the index
-        Random random = new Random();
-        for (int i = 0; i < 100; i++) {
-            int idToDelete = random.nextInt(numTestRows);
-            deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
-        }
-        conn.commit();
-        int numRows = countRows(indexTableFullName);
-        int numDeleted = numTestRows - numRows;
-
-        // run scrutiny with batch size of 10
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(numTestRows - numDeleted, getCounterValue(counters, 
VALID_ROW_COUNT));
-        assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(numTestRows / 10 + numTestRows % 10,
-            getCounterValue(counters, BATCHES_PROCESSED_COUNT));
-    }
+        /**
+         * Test batching of row comparisons Inserts 1001 rows, with some 
random bad rows, and runs
+         * scrutiny with batchsize of 10,
+         */
+        @Test public void testBatching() throws Exception {
+            // insert 1001 data and index rows
+            int numTestRows = 1001;
+            for (int i = 0; i < numTestRows; i++) {
+                upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+            }
+            conn.commit();
 
-    /**
-     * Tests when there are more data table rows than index table rows 
Scrutiny should report the
-     * number of incorrect rows
-     */
-    @Test
-    public void testMoreDataRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these rows won't have a corresponding index row
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
-        conn.commit();
-
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            disableIndex();
 
-    /**
-     * Tests when there are more index table rows than data table rows 
Scrutiny should report the
-     * number of incorrect rows when run with the index as the source table
-     */
-    @Test
-    public void testMoreIndexRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these index rows won't have a corresponding data row
-        upsertIndexRow("name-2", 2, 95124);
-        upsertIndexRow("name-3", 3, 95125);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.INDEX_TABLE_SOURCE);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            // randomly delete some rows from the index
+            Random random = new Random();
+            for (int i = 0; i < 100; i++) {
+                int idToDelete = random.nextInt(numTestRows);
+                deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+            }
+            conn.commit();
+            int numRows = countRows(indexTableFullName);
+            int numDeleted = numTestRows - numRows;
 
-    /**
-     * Tests running with both the index and data tables as the source table 
If we have an
-     * incorrectly indexed row, it should be reported in each direction
-     */
-    @Test
-    public void testBothDataAndIndexAsSource() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.BOTH);
-        assertEquals(2, completedJobs.size());
-        for (Job job : completedJobs) {
+            // run scrutiny with batch size of 10
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName, 10L);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(numTestRows - numDeleted, getCounterValue(counters, 
VALID_ROW_COUNT));
+            assertEquals(numDeleted, getCounterValue(counters, 
INVALID_ROW_COUNT));
+            assertEquals(numTestRows / 10 + numTestRows % 10,
+                    getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+        }
+
+        /**
+         * Tests when there are more data table rows than index table rows 
Scrutiny should report the
+         * number of incorrect rows
+         */
+        @Test public void testMoreDataRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these rows won't have a corresponding index row
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+            Job job = completedJobs.get(0);
             assertTrue(job.isSuccessful());
             Counters counters = job.getCounters();
             assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+
+        /**
+         * Tests when there are more index table rows than data table rows 
Scrutiny should report the
+         * number of incorrect rows when run with the index as the source table
+         */
+        @Test public void testMoreIndexRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these index rows won't have a corresponding data row
+            upsertIndexRow("name-2", 2, 95124);
+            upsertIndexRow("name-3", 3, 95125);
+            conn.commit();
+
+            List<Job> completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 
10L, SourceTable.INDEX_TABLE_SOURCE);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+
+        /**
+         * Tests running with both the index and data tables as the source 
table If we have an
+         * incorrectly indexed row, it should be reported in each direction
+         */
+        @Test public void testBothDataAndIndexAsSource() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            List<Job> completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 
10L, SourceTable.BOTH);
+            assertEquals(2, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
         }
-    }
 
-    /**
-     * Tests that with the output to file option set, the scrutiny tool 
outputs invalid rows to file
-     */
-    @Test
-    public void testOutputInvalidRowsToFile() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, 
null);
-        runScrutiny(argValues);
-
-        // check the output files
-        Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), 
dataTableFullName);
-        DistributedFileSystem fs = 
getUtility().getDFSCluster().getFileSystem();
-        List<Path> paths = Lists.newArrayList();
-        Path firstPart = null;
-        for (FileStatus outputFile : fs.listStatus(outputPath)) {
-            if (outputFile.getPath().getName().startsWith("part")) {
-                if (firstPart == null) {
-                    firstPart = outputFile.getPath();
-                } else {
-                    paths.add(outputFile.getPath());
+        /**
+         * Tests that with the output to file option set, the scrutiny tool 
outputs invalid rows to file
+         */
+        @Test public void testOutputInvalidRowsToFile() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 
10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
+            runScrutiny(argValues);
+
+            // check the output files
+            Path outputPath = CsvBulkImportUtil.getOutputPath(new 
Path(outputDir), dataTableFullName);
+            DistributedFileSystem fs = 
getUtility().getDFSCluster().getFileSystem();
+            List<Path> paths = Lists.newArrayList();
+            Path firstPart = null;
+            for (FileStatus outputFile : fs.listStatus(outputPath)) {
+                if (outputFile.getPath().getName().startsWith("part")) {
+                    if (firstPart == null) {
+                        firstPart = outputFile.getPath();
+                    } else {
+                        paths.add(outputFile.getPath());
+                    }
                 }
             }
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                fs.concat(firstPart, paths.toArray(new Path[0]));
+            }
+            Path outputFilePath = firstPart;
+            assertTrue(fs.exists(outputFilePath));
+            FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+            BufferedReader reader = new BufferedReader(new 
InputStreamReader(fsDataInputStream));
+            TreeSet<String> lines = Sets.newTreeSet();
+            try {
+                String line = null;
+                while ((line = reader.readLine()) != null) {
+                    lines.add(line);
+                }
+            } finally {
+                IOUtils.closeQuietly(reader);
+                IOUtils.closeQuietly(fsDataInputStream);
+            }
+            Iterator<String> lineIterator = lines.iterator();
+            assertEquals(
+                    "[2, name-2, " + new Timestamp(testTime).toString() + ", 
95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", 
lineIterator.next());
+            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + 
", 95123]\tTarget row not found",
+                    lineIterator.next());
+
+        }
+
+        /**
+         * Tests writing of results to the output table
+         */
+        @Test public void testOutputInvalidRowsToTable() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 
10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            // check that the output table contains the invalid rows
+            long scrutinyTimeMillis =
+                    PhoenixConfigurationUtil
+                            
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String invalidRowsQuery =
+                    IndexScrutinyTableOutput
+                            .getSqlQueryAllInvalidRows(conn, getColNames(), 
scrutinyTimeMillis);
+            ResultSet rs = 
conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName, 
rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName, 
rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(2, rs.getInt("ID"));
+            assertEquals(2, rs.getInt(":ID"));
+            assertEquals(95123, rs.getInt("ZIP"));
+            assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip 
incorrect
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName, 
rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName, 
rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(3, rs.getInt("ID"));
+            assertEquals(null, rs.getObject(":ID")); // null for missing 
target row
+            assertFalse(rs.next());
+
+            // check that the job results were written correctly to the 
metadata table
+            assertMetadataTableValues(argValues, scrutinyTimeMillis, 
invalidRowsQuery);
         }
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            fs.concat(firstPart, paths.toArray(new Path[0]));
-        }
-        Path outputFilePath = firstPart;
-        assertTrue(fs.exists(outputFilePath));
-        FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
-        BufferedReader reader = new BufferedReader(new 
InputStreamReader(fsDataInputStream));
-        TreeSet<String> lines = Sets.newTreeSet();
-        try {
-            String line = null;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line);
+
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test public void testMaxOutputRows() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            // set max to 1.  There are two bad rows, but only 1 should get 
written to output table
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 
10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
+            List<Job> completedJobs = runScrutiny(argValues);
+            long scrutinyTimeMillis =
+                    PhoenixConfigurationUtil
+                            
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String invalidRowsQuery =
+                    IndexScrutinyTableOutput
+                            .getSqlQueryAllInvalidRows(conn, getColNames(), 
scrutinyTimeMillis);
+            ResultSet rs = 
conn.createStatement().executeQuery(invalidRowsQuery);
+            assertTrue(rs.next());
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                assertTrue(rs.next());
+                assertFalse(rs.next());
+            } else {
+                assertFalse(rs.next());
             }
-        } finally {
-            IOUtils.closeQuietly(reader);
-            IOUtils.closeQuietly(fsDataInputStream);
         }
-        Iterator<String> lineIterator = lines.iterator();
-        assertEquals(
-            "[2, name-2, " + new Timestamp(testTime).toString() + ", 
95123]\t[2, name-2, " + new Timestamp(testTime)
-                .toString() + ", 9999]", lineIterator.next());
-        assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 
95123]\tTarget row not found",
-            lineIterator.next());
 
-    }
+        private SourceTargetColumnNames getColNames() throws SQLException {
+            PTable pdataTable = PhoenixRuntime.getTable(conn, 
dataTableFullName);
+            PTable pindexTable = PhoenixRuntime.getTable(conn, 
indexTableFullName);
+            SourceTargetColumnNames columnNames =
+                    new SourceTargetColumnNames.DataSourceColNames(pdataTable, 
pindexTable);
+            return columnNames;
+        }
 
-    /**
-     * Tests writing of results to the output table
-     */
-    @Test
-    public void testOutputInvalidRowsToTable() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, 
null);
-        List<Job> completedJobs = runScrutiny(argValues);
-
-        // check that the output table contains the invalid rows
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, 
getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + 
" ORDER BY ID asc");
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(2, rs.getInt("ID"));
-        assertEquals(2, rs.getInt(":ID"));
-        assertEquals(95123, rs.getInt("ZIP"));
-        assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(3, rs.getInt("ID"));
-        assertEquals(null, rs.getObject(":ID")); // null for missing target row
-        assertFalse(rs.next());
-
-        // check that the job results were written correctly to the metadata 
table
-        assertMetadataTableValues(argValues, scrutinyTimeMillis, 
invalidRowsQuery);
-    }
+        // inserts one valid data/index row, one data row with a missing index 
row,
+        // and one data row with an index row that has a bad covered col val
+        private void insertOneValid_OneBadVal_OneMissingTarget() throws 
SQLException {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+            conn.commit();
+
+            // insert a bad index row for one of the above data rows
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+        }
+
+        private void assertMetadataTableValues(String[] argValues, long 
scrutinyTimeMillis,
+                String invalidRowsQuery) throws SQLException {
+            ResultSet rs;
+            ResultSet metadataRs =
+                    IndexScrutinyTableOutput
+                            .queryAllMetadata(conn, dataTableFullName, 
indexTableFullName,
+                                    scrutinyTimeMillis);
+            assertTrue(metadataRs.next());
+            List<? extends Object>
+                    expected =
+                    Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
+                            SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
+                            2L, 1L, 1L,
+                            "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                            "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, 
\"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+                            invalidRowsQuery);
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                expected =
+                        Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
+                                SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
+                                2L, 1L, 2L,
+                                "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, 
\"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+                                invalidRowsQuery);
+            }
 
-    /**
-     * Tests that the config for max number of output rows is observed
-     */
-    @Test
-    public void testMaxOutputRows() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        // set max to 1.  There are two bad rows, but only 1 should get 
written to output table
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, 
new Long(1));
-        List<Job> completedJobs = runScrutiny(argValues);
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, 
getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
-        assertTrue(rs.next());
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            assertRsValues(metadataRs, expected);
+            String missingTargetQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+            rs = conn.createStatement().executeQuery(missingTargetQuery);
             assertTrue(rs.next());
+            assertEquals(3, rs.getInt("ID"));
             assertFalse(rs.next());
-        } else {
+            String badCoveredColQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+            rs = conn.createStatement().executeQuery(badCoveredColQuery);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt("ID"));
             assertFalse(rs.next());
         }
-    }
 
-    private SourceTargetColumnNames getColNames() throws SQLException {
-        PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
-        PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
-        SourceTargetColumnNames columnNames =
-                new SourceTargetColumnNames.DataSourceColNames(pdataTable, 
pindexTable);
-        return columnNames;
-    }
+        // assert the result set contains the expected values in the given 
order
+        private void assertRsValues(ResultSet rs, List<? extends Object> 
expected)
+                throws SQLException {
+            for (int i = 0; i < expected.size(); i++) {
+                assertEquals(expected.get(i), rs.getObject(i + 1));
+            }
+        }
 
-    // inserts one valid data/index row, one data row with a missing index row,
-    // and one data row with an index row that has a bad covered col val
-    private void insertOneValid_OneBadVal_OneMissingTarget() throws 
SQLException {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
-        conn.commit();
-
-        // insert a bad index row for one of the above data rows
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-    }
+        private void generateUniqueTableNames() {
+            schemaName = generateUniqueName();
+            dataTableName = generateUniqueName();
+            dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+            indexTableName = generateUniqueName();
+            indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        }
 
-    private void assertMetadataTableValues(String[] argValues, long 
scrutinyTimeMillis,
-            String invalidRowsQuery) throws SQLException {
-        ResultSet rs;
-        ResultSet metadataRs =
-                IndexScrutinyTableOutput.queryAllMetadata(conn, 
dataTableFullName,
-                    indexTableFullName, scrutinyTimeMillis);
-        assertTrue(metadataRs.next());
-        List<? extends Object> expected =
-            Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            expected = Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        }
-
-        assertRsValues(metadataRs, expected);
-        String missingTargetQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
-        rs = conn.createStatement().executeQuery(missingTargetQuery);
-        assertTrue(rs.next());
-        assertEquals(3, rs.getInt("ID"));
-        assertFalse(rs.next());
-        String badCoveredColQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
-        rs = conn.createStatement().executeQuery(badCoveredColQuery);
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt("ID"));
-        assertFalse(rs.next());
-    }
+        private int countRows(String tableFullName) throws SQLException {
+            ResultSet count = conn.createStatement().executeQuery("select 
count(*) from " + tableFullName);
+            count.next();
+            int numRows = count.getInt(1);
+            return numRows;
+        }
 
-    // assert the result set contains the expected values in the given order
-    private void assertRsValues(ResultSet rs, List<? extends Object> expected) 
throws SQLException {
-        for (int i = 0; i < expected.size(); i++) {
-            assertEquals(expected.get(i), rs.getObject(i + 1));
+        private void upsertIndexRow(String name, int id, int zip) throws 
SQLException {
+            indexTableUpsertStmt.setString(1, name);
+            indexTableUpsertStmt.setInt(2, id); // id
+            indexTableUpsertStmt.setInt(3, zip); // bad zip
+            indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+            indexTableUpsertStmt.executeUpdate();
         }
-    }
 
-    private void generateUniqueTableNames() {
-        schemaName = generateUniqueName();
-        dataTableName = generateUniqueName();
-        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        indexTableName = generateUniqueName();
-        indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
-    }
+        private void disableIndex() throws SQLException {
+            conn.createStatement().execute(
+                    String.format("ALTER INDEX %s ON %S disable", 
indexTableName, dataTableFullName));
+            conn.commit();
+        }
 
-    private int countRows(String tableFullName) throws SQLException {
-        ResultSet count = conn.createStatement().executeQuery("select count(*) 
from " + tableFullName);
-        count.next();
-        int numRows = count.getInt(1);
-        return numRows;
-    }
+        private String[] getArgValues(String schemaName, String dataTable, 
String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, 
OutputFormat outputFormat, Long maxOutputRows) {
+            return getArgValues(schemaName, dataTable, indxTable, batchSize, 
sourceTable,
+                    outputInvalidRows, outputFormat, maxOutputRows, null, 
Long.MAX_VALUE);
+        }
 
-    private void upsertIndexRow(String name, int id, int zip) throws 
SQLException {
-        indexTableUpsertStmt.setString(1, name);
-        indexTableUpsertStmt.setInt(2, id); // id
-        indexTableUpsertStmt.setInt(3, zip); // bad zip
-        indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
-        indexTableUpsertStmt.executeUpdate();
-    }
+        private List<Job> runScrutinyCurrentSCN(String schemaName, String 
dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+            return runScrutiny(
+                    getArgValues(schemaName, dataTableName, indexTableName, 
null, SourceTable.BOTH,
+                            false, null, null, null, scrutinyTS));
+        }
 
-    private void disableIndex() throws SQLException {
-        conn.createStatement().execute(
-            String.format("ALTER INDEX %s ON %S disable", indexTableName, 
dataTableFullName));
-        conn.commit();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, 
null, null);
+        }
 
-    private long getCounterValue(Counters counters, 
Enum<PhoenixScrutinyJobCounters> counter) {
-        return counters.findCounter(counter).getValue();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
+                Long batchSize) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, 
batchSize, null);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String 
indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat 
outputFormat,
-            Long maxOutputRows) {
-        return getArgValues(schemaName, dataTable, indxTable, batchSize, 
sourceTable,
-            outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
+                Long batchSize, SourceTable sourceTable) throws Exception {
+            final String[] cmdArgs =
+                    getArgValues(schemaName, dataTableName, indexTableName, 
batchSize, sourceTable,
+                            false, null, null, null, Long.MAX_VALUE);
+            return runScrutiny(cmdArgs);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String 
indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat 
outputFormat,
-            Long maxOutputRows, Long scrutinyTs) {
-        final List<String> args = Lists.newArrayList();
-        if (schemaName != null) {
-            args.add("-s");
-            args.add(schemaName);
-        }
-        args.add("-dt");
-        args.add(dataTable);
-        args.add("-it");
-        args.add(indxTable);
-
-        // TODO test snapshot reads
-        // if(useSnapshot) {
-        // args.add("-snap");
-        // }
-
-        if (OutputFormat.FILE.equals(outputFormat)) {
-            args.add("-op");
-            outputDir = "/tmp/" + UUID.randomUUID().toString();
-            args.add(outputDir);
-        }
-        args.add("-t");
-        args.add(String.valueOf(scrutinyTs));
-        args.add("-run-foreground");
-        if (batchSize != null) {
-            args.add("-b");
-            args.add(String.valueOf(batchSize));
-        }
-
-        // default to using data table as the source table
-        args.add("-src");
-        if (sourceTable == null) {
-            args.add(SourceTable.DATA_TABLE_SOURCE.name());
-        } else {
-            args.add(sourceTable.name());
-        }
-        if (outputInvalidRows) {
-            args.add("-o");
-        }
-        if (outputFormat != null) {
-            args.add("-of");
-            args.add(outputFormat.name());
-        }
-        if (maxOutputRows != null) {
-            args.add("-om");
-            args.add(maxOutputRows.toString());
-        }
-        return args.toArray(new String[0]);
-    }
+        private void upsertRow(PreparedStatement stmt, int id, String name, 
int zip)
+                throws SQLException {
+            int index = 1;
+            // insert row
+            stmt.setInt(index++, id);
+            stmt.setString(index++, name);
+            stmt.setInt(index++, zip);
+            stmt.setTimestamp(index++, new Timestamp(testTime));
+            stmt.executeUpdate();
+        }
 
-    private List<Job> runScrutinyCurrentSCN(String schemaName, String 
dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
-        return runScrutiny(getArgValues(schemaName, dataTableName, 
indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
-    }
+        private int deleteRow(String fullTableName, String whereCondition) 
throws SQLException {
+            String deleteSql = String.format(DELETE_SQL, indexTableFullName) + 
whereCondition;
+            PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+            return deleteStmt.executeUpdate();
+        }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, null, 
null);
     }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
-            Long batchSize) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, 
batchSize, null);
-    }
+    public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT {
+        private Connection connGlobal = null;
+        private Connection connTenant = null;
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
-            Long batchSize, SourceTable sourceTable) throws Exception {
-        final String[] cmdArgs =
-                getArgValues(schemaName, dataTableName, indexTableName, 
batchSize, sourceTable, false,
-                    null, null, Long.MAX_VALUE);
-        return runScrutiny(cmdArgs);
-    }
+        private String tenantId;
+        private String tenantViewName;
+        private String indexNameTenant;
+        private String multiTenantTable;
+        private String viewIndexTableName;
+
+        private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM 
%s";
+        private final String
+                upsertQueryStr =
+                "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')";
+        private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+        /**
+         * Create the test data
+         */
+        @Before public void setup() throws SQLException {
+            tenantId = generateUniqueName();
+            tenantViewName = generateUniqueName();
+            indexNameTenant = generateUniqueName();
+            multiTenantTable = generateUniqueName();
+            viewIndexTableName = "_IDX_" + multiTenantTable;
+
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            String
+                    createTblStr =
+                    "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT 
NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) 
MULTI_TENANT=true";
+
+            createTestTable(getUrl(), String.format(createTblStr, 
multiTenantTable));
+
+            connTenant.createStatement().execute(
+                    String.format(createViewStr, tenantViewName, 
multiTenantTable));
 
-    private List<Job> runScrutiny(String[] cmdArgs) throws Exception {
-        IndexScrutinyTool scrutiny = new IndexScrutinyTool();
-        Configuration conf = new 
Configuration(getUtility().getConfiguration());
-        scrutiny.setConf(conf);
-        int status = scrutiny.run(cmdArgs);
-        assertEquals(0, status);
-        for (Job job : scrutiny.getJobs()) {
-            assertTrue(job.waitForCompletion(true));
+            String idxStmtTenant = String.format(createIndexStr, 
indexNameTenant, tenantViewName);
+            connTenant.createStatement().execute(idxStmtTenant);
         }
-        return scrutiny.getJobs();
-    }
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, int 
zip)
-            throws SQLException {
-        int index = 1;
-        // insert row
-        stmt.setInt(index++, id);
-        stmt.setString(index++, name);
-        stmt.setInt(index++, zip);
-        stmt.setTimestamp(index++, new Timestamp(testTime));
-        stmt.executeUpdate();
-    }
+        @After public void teardown() throws SQLException {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
 
-    private int deleteRow(String fullTableName, String whereCondition) throws 
SQLException {
-        String deleteSql = String.format(DELETE_SQL, indexTableFullName) + 
whereCondition;
-        PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
-        return deleteStmt.executeUpdate();
-    }
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test public void testTenantViewAndIndexEqual() throws Exception {
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 1, "x"));
+            connTenant.commit();
+
+            String[] argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L,
+                            SourceTable.INDEX_TABLE_SOURCE, false, null, null, 
tenantId,
+                            EnvironmentEdgeManager.currentTimeMillis());
+
+            List<Job> completedJobs = runScrutiny(argValues);
+            // Sunny case, both index and view are equal. 1 row
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
 
+        /**
+         * Tests global view on multi-tenant table should work too
+        **/
+        @Test public void testGlobalViewOnMultiTenantTable() throws Exception {
+            String globalViewName = generateUniqueName();
+            String indexNameGlobal = generateUniqueName();
+
+            connGlobal.createStatement().execute(
+                    String.format(createViewStr, globalViewName, 
multiTenantTable));
+
+            String idxStmtGlobal = String.format(createIndexStr, 
indexNameGlobal, globalViewName);
+            connGlobal.createStatement().execute(idxStmtGlobal);
+            connGlobal.createStatement()
+                    .execute(String.format(upsertQueryStr, globalViewName, 
"global", 5, "x"));
+            connGlobal.commit();
+            String[] argValues =
+                    getArgValues("", globalViewName, indexNameGlobal, 10L,
+                            SourceTable.INDEX_TABLE_SOURCE, false, null, null, 
null,
+                            EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+            // Sunny case, both index and view are equal. 1 row
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
+
+        /**
+         * Use Both as source. Add 1 row to tenant view and disable index.
+         * Add 1 more to view and add a wrong row to index.
+         * Both have 1 invalid row, 1 valid row.
+         **/
+        @Test
+        public void testOneValidOneInvalidUsingBothAsSource() throws Exception 
{
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 1, "x"));
+            connTenant.commit();
+            connTenant.createStatement().execute(
+                    String.format("ALTER INDEX %s ON %S disable", 
indexNameTenant, tenantViewName));
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 2, "x2"));
+
+            connTenant.createStatement().execute(String.format("UPSERT INTO %s 
(\":ID\", \"0:NAME\") values (%d, '%s')",
+                    indexNameTenant, 5555, "wrongName"));
+            connTenant.commit();
+
+            String[]
+                    argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L, 
SourceTable.BOTH, false,
+                            null, null, tenantId, 
EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            assertEquals(2, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
+
+        /**
+        * Add 3 rows to Tenant view.
+        * Empty index table and observe they are not equal.
+        * Use data table as source and output to file.
+        * Output to table doesn't work for tenantid connection because it 
can't create the scrutiny table as tenant.
+        **/
+        @Test public void testWithEmptyIndexTableOutputToFile() throws 
Exception{
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 1, "x"));
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 2, "x2"));
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, 
tenantId, 3, "x3"));
+            connTenant.createStatement().execute(String.format("UPSERT INTO %s 
(\":ID\", \"0:NAME\") values (%d, '%s')",
+                    indexNameTenant, 5555, "wrongName"));
+            connTenant.commit();
+
+            ConnectionQueryServices queryServices = 
connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(viewIndexTableName);
+            admin.disableTable(tableName);
+            admin.truncateTable(tableName, false);
+
+            String[]
+                    argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L, 
SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null,
+                            tenantId, 
EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 81081bf..c424787 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.base.Strings;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Pair;
@@ -93,6 +94,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, 
PhoenixIndexDBWrit
             final Properties overrideProps = new Properties();
             String scn = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
             overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+
             connection = ConnectionUtil.getOutputConnection(configuration, 
overrideProps);
             connection.setAutoCommit(false);
             batchSize = 
PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
index 411214e..ac69bd8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index c321b3a..26d7336 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -54,7 +55,10 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -66,6 +70,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.MetaDataUtil.VIEW_INDEX_TABLE_PREFIX;
+
 /**
  * An MR job to verify that the index table is in sync with the data table.
  *
@@ -111,6 +117,8 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
     private static final Option OUTPUT_PATH_OPTION =
             new Option("op", "output-path", true, "Output path where the files 
are written");
     private static final Option OUTPUT_MAX = new Option("om", "output-max", 
true, "Max number of invalid rows to output per mapper.  Defaults to 1M");
+    private static final Option TENANT_ID_OPTION = new Option("tenant", 
"tenant-id", true,
+            "If specified, uses Tenant connection for tenant view index 
scrutiny (optional)");
     public static final String INDEX_JOB_NAME_TEMPLATE = 
"PHOENIX_SCRUTINY_[%s]_[%s]";
 
     /**
@@ -145,6 +153,7 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
         options.addOption(TIMESTAMP);
         options.addOption(BATCH_SIZE_OPTION);
         options.addOption(SOURCE_TABLE_OPTION);
+        options.addOption(TENANT_ID_OPTION);
         return options;
     }
 
@@ -202,10 +211,11 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
         private String basePath;
         private long scrutinyExecuteTime;
         private long outputMaxRows; // per mapper
+        private String tenantId;
 
         public JobFactory(Connection connection, Configuration configuration, 
long batchSize,
                 boolean useSnapshot, long ts, boolean outputInvalidRows, 
OutputFormat outputFormat,
-                String basePath, long outputMaxRows) {
+                String basePath, long outputMaxRows, String tenantId) {
             this.outputInvalidRows = outputInvalidRows;
             this.outputFormat = outputFormat;
             this.basePath = basePath;
@@ -214,12 +224,16 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
             this.connection = connection;
             this.configuration = configuration;
             this.useSnapshot = useSnapshot;
+            this.tenantId = tenantId;
             this.ts = ts; // CURRENT_SCN to set
             scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); 
// time at which scrutiny was run.
                                                               // Same for
             // all jobs created from this factory
             PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
                 scrutinyExecuteTime);
+            if (!Strings.isNullOrEmpty(tenantId)) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
         }
 
         public Job createSubmittableJob(String schemaName, String indexTable, 
String dataTable,
@@ -362,10 +376,17 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
                 printHelpAndExit(e.getMessage(), getOptions());
             }
             final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
+            boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
+            String tenantId = null;
+            if (useTenantId) {
+                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+                configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                LOG.info(String.format("IndexScrutinyTool uses a tenantId %s", 
tenantId));
+            }
             connection = ConnectionUtil.getInputConnection(configuration);
             final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             String basePath = 
cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -388,7 +409,7 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
                             : EnvironmentEdgeManager.currentTimeMillis() - 
60000;
 
             if (indexTable != null) {
-                if (!isValidIndexTable(connection, qDataTable, indexTable)) {
+                if (!IndexTool.isValidIndexTable(connection, qDataTable, 
indexTable, tenantId)) {
                     throw new IllegalArgumentException(String
                             .format(" %s is not an index table for %s ", 
indexTable, qDataTable));
                 }
@@ -420,7 +441,7 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
                 outputFormat, outputMaxRows));
             JobFactory jobFactory =
                     new JobFactory(connection, configuration, batchSize, 
useSnapshot, ts,
-                            outputInvalidRows, outputFormat, basePath, 
outputMaxRows);
+                            outputInvalidRows, outputFormat, basePath, 
outputMaxRows, tenantId);
             // If we are running the scrutiny with both tables as the source, 
run two separate jobs,
             // one for each direction
             if (SourceTable.BOTH.equals(sourceTable)) {
@@ -481,38 +502,6 @@ public class IndexScrutinyTool extends Configured 
implements Tool {
         return jobs;
     }
 
-    /**
-     * Checks for the validity of the index table passed to the job.
-     * @param connection
-     * @param masterTable
-     * @param indexTable
-     * @return
-     * @throws SQLException
-     */
-    private boolean isValidIndexTable(final Connection connection, final 
String masterTable,
-            final String indexTable) throws SQLException {
-        final DatabaseMetaData dbMetaData = connection.getMetaData();
-        final String schemaName = 
SchemaUtil.getSchemaNameFromFullName(masterTable);
-        final String tableName =
-                
SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
-
-        ResultSet rs = null;
-        try {
-            rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, 
false);
-            while (rs.next()) {
-                final String indexName = rs.getString(6);
-                if (indexTable.equalsIgnoreCase(indexName)) {
-                    return true;
-                }
-            }
-        } finally {
-            if (rs != null) {
-                rs.close();
-            }
-        }
-        return false;
-    }
-
     public static void main(final String[] args) throws Exception {
         int result = ToolRunner.run(new IndexScrutinyTool(), args);
         System.exit(result);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index f5e4001..ee2ae0b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -339,7 +340,7 @@ public class IndexTool extends Configured implements Tool {
             ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
             IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, 
disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, 
indexMetaDataPtr);
-            if (tenantId != null) {
+            if (!Strings.isNullOrEmpty(tenantId)) {
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
             }
 
@@ -802,7 +803,7 @@ public class IndexTool extends Configured implements Tool {
      * @return
      * @throws SQLException
      */
-    private boolean isValidIndexTable(final Connection connection, final 
String masterTable,
+    public static boolean isValidIndexTable(final Connection connection, final 
String masterTable,
             final String indexTable, final String tenantId) throws 
SQLException {
         final DatabaseMetaData dbMetaData = connection.getMetaData();
         final String schemaName = 
SchemaUtil.getSchemaNameFromFullName(masterTable);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 6f2959f..df8c7ab 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -61,6 +61,9 @@ public class IndexColumnNames {
         if (pindexTable.getViewIndexId() != null) {
             offset++;
         }
+        if (pindexTable.isMultiTenant()) {
+            offset++;
+        }
         if (offset > 0) {
             pindexCols = pindexCols.subList(offset, pindexCols.size());
             pkColumns = pkColumns.subList(offset, pkColumns.size());

Reply via email to