keith-turner commented on code in PR #5957:
URL: https://github.com/apache/accumulo/pull/5957#discussion_r2700128027
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java:
##########
@@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter,
Range range, ScanParamet
iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()),
true);
}
+ skipReturnedDuplicates(iter, duplicatesToSkip, range);
+
+ Key rangeStartKey = range.getStartKey();
+ Key currentKey = null;
+ boolean resumingOnSameKey =
+ iter.hasTop() && rangeStartKey != null &&
rangeStartKey.equals(iter.getTopKey());
+ int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
+ int duplicatesReturnedForCurrentKey = 0;
+ Key cutKey = null;
+ boolean cutPending = false;
+
while (iter.hasTop()) {
if (yield.hasYielded()) {
throw new IOException(
"Coding error: hasTop returned true but has yielded at " +
yield.getPositionAndReset());
}
value = iter.getTopValue();
key = iter.getTopKey();
+ if (cutPending && !key.equals(cutKey)) {
+ continueKey = copyResumeKey(cutKey);
+ resumeOnSameKey = true;
+ skipContinueKey = false;
+ break;
+ }
+ if (!key.equals(currentKey)) {
+ currentKey = copyResumeKey(key);
+ if (resumingOnSameKey && key.equals(rangeStartKey)) {
+ duplicatesReturnedForCurrentKey = previousDuplicates;
+ } else {
+ duplicatesReturnedForCurrentKey = 0;
+ resumingOnSameKey = false;
+ }
+ }
KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
+ duplicatesReturnedForCurrentKey++;
+
+ if (cutPending && (resultSize >= maxResultsSizeWithDuplicates
+ || results.size() >= maxEntriesWithDuplicates)) {
+ throw new IllegalStateException("Duplicate key run exceeded scan batch
growth limit for "
+ + cutKey + ". Increase " +
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+ + " or reduce duplicates for this key.");
+ }
+
boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos)
>= timeToRun;
if (resultSize >= maxResultsSize || results.size() >=
scanParams.getMaxEntries() || timesUp) {
- continueKey = new Key(key);
- skipContinueKey = true;
- break;
+ if (!cutPending) {
+ cutPending = true;
+ cutKey = currentKey;
+ } else if (timesUp) {
+ throw new IllegalStateException("Duplicate key run exceeded scan
batch timeout for "
+ + cutKey + ". Increase " +
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+ + " or batch timeout, or reduce duplicates for this key.");
+ }
Review Comment:
This if stmt used to have a break in it, seems like that will still be
needed. When in here we have finished batch and need to break out of the loop.
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java:
##########
@@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter,
Range range, ScanParamet
iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()),
true);
}
+ skipReturnedDuplicates(iter, duplicatesToSkip, range);
+
+ Key rangeStartKey = range.getStartKey();
+ Key currentKey = null;
+ boolean resumingOnSameKey =
+ iter.hasTop() && rangeStartKey != null &&
rangeStartKey.equals(iter.getTopKey());
+ int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
+ int duplicatesReturnedForCurrentKey = 0;
+ Key cutKey = null;
+ boolean cutPending = false;
+
while (iter.hasTop()) {
if (yield.hasYielded()) {
throw new IOException(
"Coding error: hasTop returned true but has yielded at " +
yield.getPositionAndReset());
}
value = iter.getTopValue();
key = iter.getTopKey();
+ if (cutPending && !key.equals(cutKey)) {
+ continueKey = copyResumeKey(cutKey);
+ resumeOnSameKey = true;
+ skipContinueKey = false;
+ break;
+ }
+ if (!key.equals(currentKey)) {
+ currentKey = copyResumeKey(key);
+ if (resumingOnSameKey && key.equals(rangeStartKey)) {
+ duplicatesReturnedForCurrentKey = previousDuplicates;
+ } else {
+ duplicatesReturnedForCurrentKey = 0;
+ resumingOnSameKey = false;
+ }
+ }
KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
+ duplicatesReturnedForCurrentKey++;
+
+ if (cutPending && (resultSize >= maxResultsSizeWithDuplicates
+ || results.size() >= maxEntriesWithDuplicates)) {
+ throw new IllegalStateException("Duplicate key run exceeded scan batch
growth limit for "
+ + cutKey + ". Increase " +
Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+ + " or reduce duplicates for this key.");
+ }
+
boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos)
>= timeToRun;
if (resultSize >= maxResultsSize || results.size() >=
scanParams.getMaxEntries() || timesUp) {
- continueKey = new Key(key);
- skipContinueKey = true;
- break;
+ if (!cutPending) {
Review Comment:
These changes are adding a lot of new work per key/value read. Normally
that work will be wasted effort and never needed. Instead of doing that, could
instead do a check when closing off batch to see if we are in this situation
where the keys are the same. Maybe something like the following, not sure how
this would change the rest of the code. But hopefully this strategy could lead
to not adding any new work for each key/value.
```java
if (resultSize >= maxResultsSize || results.size() >=
scanParams.getMaxEntries() || timesUp) {
// closing off a batch, need to see if the last key in the batch and the
next unread key are the same
if(iter.hasTop()) {
iter.next();
if(iter.hasTop() &&
results.get(results.size()-1).getKey().equals(iter.getTopKey())) {
// the last key in the batch and the next key match, so can not
terminate batch yet...
// not sure what this should do... it could start looping here
or set a boolean that
// modifies the outer loop behavior... could also read the
config here to determine what to
// do... in any case do not need to do extra work until now
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]