keith-turner commented on code in PR #3418:
URL: https://github.com/apache/accumulo/pull/3418#discussion_r1214827264


##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1553,5 +1558,213 @@ private void setInterruptFlagInternal(AtomicBoolean 
flag) {
     public void setCacheProvider(CacheProvider cacheProvider) {
       reader.setCacheProvider(cacheProvider);
     }
+
+    @Override
+    public void reset() {
+      clear();
+    }
+  }
+
+  public interface RFileSKVIterator extends FileSKVIterator {
+    FileSKVIterator getIndex() throws IOException;
+
+    void reset();
+  }
+
+  static abstract class FencedFileSKVIterator implements FileSKVIterator {
+
+    private final FileSKVIterator reader;
+    protected final Range fence;
+
+    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
+      this.reader = Objects.requireNonNull(reader);
+      this.fence = fence;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return reader.hasTop();
+    }
+
+    @Override
+    public void next() throws IOException {
+      reader.next();
+    }
+
+    @Override
+    public Key getTopKey() {
+      return reader.getTopKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return reader.getTopValue();
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {

Review Comment:
   I ran some of the unit test w/ coverage and it seems llke this method and 
getLastKey() were not covered in unit test.



##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java:
##########
@@ -76,6 +78,38 @@ public interface InputArguments {
      * @return this
      */
     ScannerFSOptions from(String... files);
+
+    /**
+     * Specify FencedRfiles to read from. When multiple are specified the 
{@link Scanner}
+     * constructed will present a merged view.
+     *
+     * @param files one or more FencedRfiles to read.
+     * @return this
+     *
+     * @since 3.1.0
+     */
+    ScannerFSOptions from(FencedRfile... files);
+
+    /**
+     * @since 3.1.0
+     */
+    class FencedRfile {

Review Comment:
   Could change this to match the camel case of the other code.
   
   ```suggestion
       class FencedRFile {
   ```
   
   Since this class is an inner class of RFile, including RFile in the name may 
be redundant.  Could name it something like
   
   ```suggestion
       class FencedPath {
   ```
   



##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java:
##########
@@ -29,10 +32,16 @@
 public class RFileSource {
   private final InputStream in;
   private final long len;
+  private final Range range;
 
   public RFileSource(InputStream in, long len) {
-    this.in = in;
+    this(in, len, new Range());
+  }
+
+  public RFileSource(InputStream in, long len, Range range) {

Review Comment:
   ```suggestion
     /**
      * @since 3.1.0
      */
     public RFileSource(InputStream in, long len, Range range) {
   ```



##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1553,5 +1558,213 @@ private void setInterruptFlagInternal(AtomicBoolean 
flag) {
     public void setCacheProvider(CacheProvider cacheProvider) {
       reader.setCacheProvider(cacheProvider);
     }
+
+    @Override
+    public void reset() {
+      clear();
+    }
+  }
+
+  public interface RFileSKVIterator extends FileSKVIterator {
+    FileSKVIterator getIndex() throws IOException;
+
+    void reset();
+  }
+
+  static abstract class FencedFileSKVIterator implements FileSKVIterator {
+
+    private final FileSKVIterator reader;
+    protected final Range fence;
+
+    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
+      this.reader = Objects.requireNonNull(reader);
+      this.fence = fence;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return reader.hasTop();
+    }
+
+    @Override
+    public void next() throws IOException {
+      reader.next();
+    }
+
+    @Override
+    public Key getTopKey() {
+      return reader.getTopKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return reader.getTopValue();
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {
+      var rfk = reader.getFirstKey();
+      if (fence.beforeStartKey(rfk)) {
+        return fence.getStartKey();
+      } else {
+        return rfk;
+      }
+    }
+
+    @Override
+    public Key getLastKey() throws IOException {
+      var rlk = reader.getLastKey();
+      if (fence.afterEndKey(rlk)) {
+        return fence.getEndKey();
+      } else {
+        return rlk;
+      }
+    }
+
+    @Override
+    public boolean isRunningLowOnMemory() {
+      return reader.isRunningLowOnMemory();
+    }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      reader.setInterruptFlag(flag);
+    }
+
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      return reader.getMetaStore(name);
+    }
+
+    @Override
+    public void closeDeepCopies() throws IOException {
+      reader.closeDeepCopies();
+    }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+
+  static class FencedIndex extends FencedFileSKVIterator {
+    private final FileSKVIterator source;
+
+    public FencedIndex(FileSKVIterator source, Range seekFence) {
+      super(source, seekFence);
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasTop() {
+      // this code filters out data because the rfile index iterators do not 
support seek
+
+      // If startKey is set then discard everything until we reach the start
+      // of the range
+      if (fence.getStartKey() != null) {
+
+        while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) {
+          try {
+            source.next();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+      }
+
+      // If endKey is set then ensure that the current key is not passed the 
end of the range
+      return source.hasTop() && !fence.afterEndKey(source.getTopKey());
+    }
+
+    @Override
+    public void next() throws IOException {
+      super.next();
+    }

Review Comment:
   Can this be removed?
   
   ```suggestion
   ```



##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1553,5 +1558,213 @@ private void setInterruptFlagInternal(AtomicBoolean 
flag) {
     public void setCacheProvider(CacheProvider cacheProvider) {
       reader.setCacheProvider(cacheProvider);
     }
+
+    @Override
+    public void reset() {
+      clear();
+    }
+  }
+
+  public interface RFileSKVIterator extends FileSKVIterator {
+    FileSKVIterator getIndex() throws IOException;
+
+    void reset();
+  }
+
+  static abstract class FencedFileSKVIterator implements FileSKVIterator {
+
+    private final FileSKVIterator reader;
+    protected final Range fence;
+
+    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
+      this.reader = Objects.requireNonNull(reader);
+      this.fence = fence;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return reader.hasTop();
+    }
+
+    @Override
+    public void next() throws IOException {
+      reader.next();
+    }
+
+    @Override
+    public Key getTopKey() {
+      return reader.getTopKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return reader.getTopValue();
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {
+      var rfk = reader.getFirstKey();
+      if (fence.beforeStartKey(rfk)) {
+        return fence.getStartKey();
+      } else {
+        return rfk;
+      }
+    }
+
+    @Override
+    public Key getLastKey() throws IOException {
+      var rlk = reader.getLastKey();
+      if (fence.afterEndKey(rlk)) {
+        return fence.getEndKey();
+      } else {
+        return rlk;
+      }
+    }
+
+    @Override
+    public boolean isRunningLowOnMemory() {
+      return reader.isRunningLowOnMemory();
+    }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      reader.setInterruptFlag(flag);
+    }
+
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      return reader.getMetaStore(name);
+    }
+
+    @Override
+    public void closeDeepCopies() throws IOException {
+      reader.closeDeepCopies();
+    }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+
+  static class FencedIndex extends FencedFileSKVIterator {
+    private final FileSKVIterator source;
+
+    public FencedIndex(FileSKVIterator source, Range seekFence) {
+      super(source, seekFence);
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasTop() {
+      // this code filters out data because the rfile index iterators do not 
support seek
+
+      // If startKey is set then discard everything until we reach the start
+      // of the range
+      if (fence.getStartKey() != null) {
+
+        while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) {
+          try {
+            source.next();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+      }
+
+      // If endKey is set then ensure that the current key is not passed the 
end of the range
+      return source.hasTop() && !fence.afterEndKey(source.getTopKey());
+    }
+
+    @Override
+    public void next() throws IOException {
+      super.next();
+    }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) 
{
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class FencedReader extends FencedFileSKVIterator implements 
RFileSKVIterator {
+
+    private final Reader reader;
+
+    public FencedReader(Reader reader, Range seekFence) {
+      super(reader, seekFence);
+      this.reader = reader;
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      reader.reset();
+
+      if (fence != null) {
+        range = fence.clip(range, true);
+        if (range == null) {
+          return;
+        }
+      }
+
+      reader.seek(range, columnFamilies, inclusive);
+    }
+
+    @Override
+    public FencedReader deepCopy(IteratorEnvironment env) {
+      return new FencedReader(reader.deepCopy(env), fence);
+    }
+
+    @Override
+    public FileSKVIterator getIndex() throws IOException {
+      return new FencedIndex(reader.getIndex(), fence);
+    }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      final Reader sample = reader.getSample(sampleConfig);

Review Comment:
   Didn't see test coverage on this either



##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1553,5 +1558,213 @@ private void setInterruptFlagInternal(AtomicBoolean 
flag) {
     public void setCacheProvider(CacheProvider cacheProvider) {
       reader.setCacheProvider(cacheProvider);
     }
+
+    @Override
+    public void reset() {
+      clear();
+    }
+  }
+
+  public interface RFileSKVIterator extends FileSKVIterator {
+    FileSKVIterator getIndex() throws IOException;
+
+    void reset();
+  }
+
+  static abstract class FencedFileSKVIterator implements FileSKVIterator {
+
+    private final FileSKVIterator reader;
+    protected final Range fence;
+
+    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
+      this.reader = Objects.requireNonNull(reader);
+      this.fence = fence;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return reader.hasTop();
+    }
+
+    @Override
+    public void next() throws IOException {
+      reader.next();
+    }
+
+    @Override
+    public Key getTopKey() {
+      return reader.getTopKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return reader.getTopValue();
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {
+      var rfk = reader.getFirstKey();
+      if (fence.beforeStartKey(rfk)) {
+        return fence.getStartKey();
+      } else {
+        return rfk;
+      }
+    }
+
+    @Override
+    public Key getLastKey() throws IOException {
+      var rlk = reader.getLastKey();
+      if (fence.afterEndKey(rlk)) {
+        return fence.getEndKey();
+      } else {
+        return rlk;
+      }
+    }
+
+    @Override
+    public boolean isRunningLowOnMemory() {
+      return reader.isRunningLowOnMemory();
+    }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      reader.setInterruptFlag(flag);
+    }
+
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      return reader.getMetaStore(name);
+    }
+
+    @Override
+    public void closeDeepCopies() throws IOException {
+      reader.closeDeepCopies();
+    }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+
+  static class FencedIndex extends FencedFileSKVIterator {
+    private final FileSKVIterator source;
+
+    public FencedIndex(FileSKVIterator source, Range seekFence) {
+      super(source, seekFence);
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasTop() {
+      // this code filters out data because the rfile index iterators do not 
support seek
+
+      // If startKey is set then discard everything until we reach the start
+      // of the range
+      if (fence.getStartKey() != null) {
+
+        while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) {
+          try {
+            source.next();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+      }
+
+      // If endKey is set then ensure that the current key is not passed the 
end of the range
+      return source.hasTop() && !fence.afterEndKey(source.getTopKey());
+    }
+
+    @Override
+    public void next() throws IOException {
+      super.next();
+    }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) 
{
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class FencedReader extends FencedFileSKVIterator implements 
RFileSKVIterator {
+
+    private final Reader reader;
+
+    public FencedReader(Reader reader, Range seekFence) {
+      super(reader, seekFence);
+      this.reader = reader;
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      reader.reset();
+
+      if (fence != null) {
+        range = fence.clip(range, true);
+        if (range == null) {
+          return;
+        }
+      }
+
+      reader.seek(range, columnFamilies, inclusive);
+    }
+
+    @Override
+    public FencedReader deepCopy(IteratorEnvironment env) {
+      return new FencedReader(reader.deepCopy(env), fence);

Review Comment:
   Did not see this covered in unit test.  Would be nice to cover this and 
esnure the deep copy is independent (seeking it should not affect the original 
and visa versa) and the fence is properly passed.



##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1553,5 +1558,213 @@ private void setInterruptFlagInternal(AtomicBoolean 
flag) {
     public void setCacheProvider(CacheProvider cacheProvider) {
       reader.setCacheProvider(cacheProvider);
     }
+
+    @Override
+    public void reset() {
+      clear();
+    }
+  }
+
+  public interface RFileSKVIterator extends FileSKVIterator {
+    FileSKVIterator getIndex() throws IOException;
+
+    void reset();
+  }
+
+  static abstract class FencedFileSKVIterator implements FileSKVIterator {
+
+    private final FileSKVIterator reader;
+    protected final Range fence;
+
+    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
+      this.reader = Objects.requireNonNull(reader);
+      this.fence = fence;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return reader.hasTop();
+    }
+
+    @Override
+    public void next() throws IOException {
+      reader.next();
+    }
+
+    @Override
+    public Key getTopKey() {
+      return reader.getTopKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return reader.getTopValue();
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {
+      var rfk = reader.getFirstKey();
+      if (fence.beforeStartKey(rfk)) {
+        return fence.getStartKey();
+      } else {
+        return rfk;
+      }
+    }
+
+    @Override
+    public Key getLastKey() throws IOException {
+      var rlk = reader.getLastKey();
+      if (fence.afterEndKey(rlk)) {
+        return fence.getEndKey();
+      } else {
+        return rlk;
+      }
+    }
+
+    @Override
+    public boolean isRunningLowOnMemory() {
+      return reader.isRunningLowOnMemory();
+    }
+
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      reader.setInterruptFlag(flag);
+    }
+
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      return reader.getMetaStore(name);
+    }
+
+    @Override
+    public void closeDeepCopies() throws IOException {
+      reader.closeDeepCopies();
+    }
+
+    @Override
+    public void setCacheProvider(CacheProvider cacheProvider) {
+      reader.setCacheProvider(cacheProvider);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+
+  static class FencedIndex extends FencedFileSKVIterator {
+    private final FileSKVIterator source;
+
+    public FencedIndex(FileSKVIterator source, Range seekFence) {
+      super(source, seekFence);
+      this.source = source;
+    }
+
+    @Override
+    public boolean hasTop() {
+      // this code filters out data because the rfile index iterators do not 
support seek
+
+      // If startKey is set then discard everything until we reach the start
+      // of the range
+      if (fence.getStartKey() != null) {
+
+        while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) {
+          try {
+            source.next();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+      }
+
+      // If endKey is set then ensure that the current key is not passed the 
end of the range
+      return source.hasTop() && !fence.afterEndKey(source.getTopKey());
+    }
+
+    @Override
+    public void next() throws IOException {
+      super.next();
+    }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) 
{
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class FencedReader extends FencedFileSKVIterator implements 
RFileSKVIterator {
+
+    private final Reader reader;
+
+    public FencedReader(Reader reader, Range seekFence) {
+      super(reader, seekFence);
+      this.reader = reader;
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+        throws IOException {
+      reader.reset();
+
+      if (fence != null) {
+        range = fence.clip(range, true);
+        if (range == null) {
+          return;
+        }
+      }
+
+      reader.seek(range, columnFamilies, inclusive);
+    }
+
+    @Override
+    public FencedReader deepCopy(IteratorEnvironment env) {
+      return new FencedReader(reader.deepCopy(env), fence);
+    }
+
+    @Override
+    public FileSKVIterator getIndex() throws IOException {
+      return new FencedIndex(reader.getIndex(), fence);
+    }
+
+    @Override
+    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
+      final Reader sample = reader.getSample(sampleConfig);
+      return sample != null ? new FencedReader(sample, fence) : null;
+    }
+
+    @Override
+    public void reset() {
+      reader.reset();
+    }
+  }
+
+  public static RFileSKVIterator getReader(final CachableBuilder cb,
+      final AbstractTabletFile<?> dataFile) throws IOException {
+    final RFile.Reader reader = new RFile.Reader(Objects.requireNonNull(cb));
+    return dataFile.hasRange() ? new FencedReader(reader, dataFile.getRange()) 
: reader;
+  }
+
+  public static RFileSKVIterator getReader(final CachableBuilder cb, Range 
range)

Review Comment:
   If possible it would be nice if the unit test called this.  If its not 
workable don't worry about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to