This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 557125539c Allow ScanServers to scan offline tables (#3082)
557125539c is described below

commit 557125539cf35362c69002ad72b00af5dd69e672
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Nov 17 11:50:22 2022 -0500

    Allow ScanServers to scan offline tables (#3082)
    
    Moved the offline table check from ClientContext to when the iterator() 
method
    is called on ScannerImpl and TabletServerBatchReader.
---
 .../apache/accumulo/core/clientImpl/ClientContext.java  |  7 +++----
 .../apache/accumulo/core/clientImpl/ScannerImpl.java    | 11 +++++++++++
 .../core/clientImpl/TabletServerBatchReader.java        |  4 ++++
 .../gc/src/main/java/org/apache/accumulo/gc/GCRun.java  |  4 +++-
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java       |  4 +++-
 .../java/org/apache/accumulo/test/ScanServerIT.java     | 17 ++++++++---------
 6 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 67d24ab448..eec80a569d 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -704,8 +704,8 @@ public class ClientContext implements AccumuloClient {
       int numQueryThreads) throws TableNotFoundException {
     ensureOpen();
     checkArgument(authorizations != null, "authorizations is null");
-    return new TabletServerBatchReader(this, 
requireNotOffline(getTableId(tableName), tableName),
-        tableName, authorizations, numQueryThreads);
+    return new TabletServerBatchReader(this, getTableId(tableName), tableName, 
authorizations,
+        numQueryThreads);
   }
 
   @Override
@@ -792,8 +792,7 @@ public class ClientContext implements AccumuloClient {
       throws TableNotFoundException {
     ensureOpen();
     checkArgument(authorizations != null, "authorizations is null");
-    Scanner scanner =
-        new ScannerImpl(this, requireNotOffline(getTableId(tableName), 
tableName), authorizations);
+    Scanner scanner = new ScannerImpl(this, getTableId(tableName), 
authorizations);
     Integer batchSize = 
ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
     if (batchSize != null) {
       scanner.setBatchSize(batchSize);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
index c48585e935..2d9bcdb4c8 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
@@ -150,6 +151,16 @@ public class ScannerImpl extends ScannerOptions implements 
Scanner {
   @Override
   public synchronized Iterator<Entry<Key,Value>> iterator() {
     ensureOpen();
+
+    if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+      try {
+        String tableName = context.getTableName(tableId);
+        context.requireNotOffline(tableId, tableName);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException("Table not found", e);
+      }
+    }
+
     ScannerIterator iter = new ScannerIterator(context, tableId, 
authorizations, range, size,
         getTimeout(SECONDS), this, isolated, readaheadThreshold, new 
Reporter());
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index c0de759516..9508f06aa6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -115,6 +115,10 @@ public class TabletServerBatchReader extends 
ScannerOptions implements BatchScan
       throw new IllegalStateException("batch reader closed");
     }
 
+    if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+      context.requireNotOffline(tableId, tableName);
+    }
+
     return new TabletServerBatchReaderIterator(context, tableId, tableName, 
authorizations, ranges,
         numThreads, queryThreadPool, this, timeOut);
   }
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 7de534398a..ab7d800df3 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
@@ -383,7 +384,8 @@ public class GCRun implements GarbageCollectionEnvironment {
         }
         return Maps.immutableEntry(file, stat);
       });
-    } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException e) {
+    } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException
+        | TableOfflineException e) {
       // No elements that we need to preclude
       return Collections.emptyIterator();
     }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index bedfb54569..f6efa8e03a 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -364,7 +365,8 @@ public class GarbageCollectWriteAheadLogs {
           candidates.remove(id);
           log.info("Ignore closed log " + id + " because it is being 
replicated");
         }
-      } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException ex) {
+      } catch 
(org.apache.accumulo.core.replication.ReplicationTableOfflineException
+          | TableOfflineException ex) {
         return candidates.size();
       }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 2080e3196a..4f16882aa9 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
@@ -150,19 +149,19 @@ public class ScanServerIT extends SharedMiniClusterBase {
 
   @Test
   public void testScanOfflineTable() throws Exception {
+
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       String tableName = getUniqueNames(1)[0];
 
-      createTableAndIngest(client, tableName, null, 10, 10, "colf");
+      final int ingestedEntryCount = createTableAndIngest(client, tableName, 
null, 10, 10, "colf");
       client.tableOperations().offline(tableName, true);
 
-      assertThrows(TableOfflineException.class, () -> {
-        try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
-          scanner.setRange(new Range());
-          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
-          assertEquals(100, Iterables.size(scanner));
-        } // when the scanner is closed, all open sessions should be closed
-      });
+      try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed 
entries");
+      } // when the scanner is closed, all open sessions should be closed
     }
   }
 

Reply via email to