keith-turner commented on code in PR #5957:
URL: https://github.com/apache/accumulo/pull/5957#discussion_r2700128027


##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java:
##########
@@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, 
Range range, ScanParamet
       iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), 
true);
     }
 
+    skipReturnedDuplicates(iter, duplicatesToSkip, range);
+
+    Key rangeStartKey = range.getStartKey();
+    Key currentKey = null;
+    boolean resumingOnSameKey =
+        iter.hasTop() && rangeStartKey != null && 
rangeStartKey.equals(iter.getTopKey());
+    int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
+    int duplicatesReturnedForCurrentKey = 0;
+    Key cutKey = null;
+    boolean cutPending = false;
+
     while (iter.hasTop()) {
       if (yield.hasYielded()) {
         throw new IOException(
             "Coding error: hasTop returned true but has yielded at " + 
yield.getPositionAndReset());
       }
       value = iter.getTopValue();
       key = iter.getTopKey();
+      if (cutPending && !key.equals(cutKey)) {
+        continueKey = copyResumeKey(cutKey);
+        resumeOnSameKey = true;
+        skipContinueKey = false;
+        break;
+      }
+      if (!key.equals(currentKey)) {
+        currentKey = copyResumeKey(key);
+        if (resumingOnSameKey && key.equals(rangeStartKey)) {
+          duplicatesReturnedForCurrentKey = previousDuplicates;
+        } else {
+          duplicatesReturnedForCurrentKey = 0;
+          resumingOnSameKey = false;
+        }
+      }
 
       KVEntry kvEntry = new KVEntry(key, value); // copies key and value
       results.add(kvEntry);
       resultSize += kvEntry.estimateMemoryUsed();
       resultBytes += kvEntry.numBytes();
 
+      duplicatesReturnedForCurrentKey++;
+
+      if (cutPending && (resultSize >= maxResultsSizeWithDuplicates
+          || results.size() >= maxEntriesWithDuplicates)) {
+        throw new IllegalStateException("Duplicate key run exceeded scan batch 
growth limit for "
+            + cutKey + ". Increase " + 
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+            + " or reduce duplicates for this key.");
+      }
+
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) 
>= timeToRun;
 
       if (resultSize >= maxResultsSize || results.size() >= 
scanParams.getMaxEntries() || timesUp) {
-        continueKey = new Key(key);
-        skipContinueKey = true;
-        break;
+        if (!cutPending) {
+          cutPending = true;
+          cutKey = currentKey;
+        } else if (timesUp) {
+          throw new IllegalStateException("Duplicate key run exceeded scan 
batch timeout for "
+              + cutKey + ". Increase " + 
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+              + " or batch timeout, or reduce duplicates for this key.");
+        }

Review Comment:
   This if stmt used to have a break in it, seems like that will still be 
needed.  When in here we have finished  batch and need to break out of the loop.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java:
##########
@@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, 
Range range, ScanParamet
       iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), 
true);
     }
 
+    skipReturnedDuplicates(iter, duplicatesToSkip, range);
+
+    Key rangeStartKey = range.getStartKey();
+    Key currentKey = null;
+    boolean resumingOnSameKey =
+        iter.hasTop() && rangeStartKey != null && 
rangeStartKey.equals(iter.getTopKey());
+    int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
+    int duplicatesReturnedForCurrentKey = 0;
+    Key cutKey = null;
+    boolean cutPending = false;
+
     while (iter.hasTop()) {
       if (yield.hasYielded()) {
         throw new IOException(
             "Coding error: hasTop returned true but has yielded at " + 
yield.getPositionAndReset());
       }
       value = iter.getTopValue();
       key = iter.getTopKey();
+      if (cutPending && !key.equals(cutKey)) {
+        continueKey = copyResumeKey(cutKey);
+        resumeOnSameKey = true;
+        skipContinueKey = false;
+        break;
+      }
+      if (!key.equals(currentKey)) {
+        currentKey = copyResumeKey(key);
+        if (resumingOnSameKey && key.equals(rangeStartKey)) {
+          duplicatesReturnedForCurrentKey = previousDuplicates;
+        } else {
+          duplicatesReturnedForCurrentKey = 0;
+          resumingOnSameKey = false;
+        }
+      }
 
       KVEntry kvEntry = new KVEntry(key, value); // copies key and value
       results.add(kvEntry);
       resultSize += kvEntry.estimateMemoryUsed();
       resultBytes += kvEntry.numBytes();
 
+      duplicatesReturnedForCurrentKey++;
+
+      if (cutPending && (resultSize >= maxResultsSizeWithDuplicates
+          || results.size() >= maxEntriesWithDuplicates)) {
+        throw new IllegalStateException("Duplicate key run exceeded scan batch 
growth limit for "
+            + cutKey + ". Increase " + 
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+            + " or reduce duplicates for this key.");
+      }
+
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) 
>= timeToRun;
 
       if (resultSize >= maxResultsSize || results.size() >= 
scanParams.getMaxEntries() || timesUp) {
-        continueKey = new Key(key);
-        skipContinueKey = true;
-        break;
+        if (!cutPending) {

Review Comment:
   These changes are adding a lot of new work per key/value read.  Normally 
that work will be wasted effort and never needed.  Instead of doing that, could 
instead do a check when closing off batch to see if we are in this situation 
where the keys are the same.  Maybe something like the following, not sure how 
this would change the rest of the code.  But hopefully this strategy could lead 
to not adding any new work for each key/value.
   
   ```java
   if (resultSize >= maxResultsSize || results.size() >= 
scanParams.getMaxEntries() || timesUp) {
      // closing off a batch, need to see if the last key in the batch and the 
next unread key are the same
      if(iter.hasTop()) {
             iter.next();
             if(iter.hasTop() && 
results.get(results.size()-1).getKey().equals(iter.getTopKey())) {
               // the last key in the batch and the next key match, so can not 
terminate batch yet...
   
               // not sure what this should do... it could start looping here 
or set a boolean that
               // modifies the outer loop behavior... could also read the 
config here to determine what to
               // do... in any case do not need to do extra work until now
             }
           }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to