Hi Ryan, I think you could achieve the behavior you described more simply by overriding hasTop() and returning `false` once your iterator has seen and emitted N entries. No need to re-seek the parent iterator to a singleton range, since that will have the same effect as hasTop() == false. Also, you're not guaranteed that the singleton range is valid, if the seek range has an exclusive end key.
I couldn't follow the meaning behind the `numEntriesPerRange` and `numScans` variables, but hopefully the above advice helps. I'm also not sure about the IOException. Keep in mind that Accumulo can take down your iterator and re-create it at any point. When it does so, it re-inits and then re-seeks your iterator to a position immediately after the last key returned. If this happens in the middle of your iterator counting N entries then it will start counting again from 0. See the iterator design section <https://accumulo.apache.org/1.7/accumulo_user_manual#_iterator_design> in the manual for more info on when this happens. Cheers, Dylan On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham <[email protected]> wrote: > Hello, > > > > I'm trying to write an iterator that gets the top N sorted entries for a > given range over sharded data. I created a custom iterator that extends > SkippingIterator and made it so that it will return the first N entries for > each tablet. After N entries, I have the source iterator seek to the end > key of the specific range since it shouldn't return any other entries for > that tablet. > > > > @Override > > public void init(SortedKeyValueIterator<Key,Value> source, > Map<String,String> options, IteratorEnvironment env) throws IOException { > > super.init(source, options, env); > > String o = options.get(NUM_SCANS_STRING_NAME); > > numScans = o == null ? 10 : Integer.parseInt(o); > > String n = options.get(NUM_ENTRIES_STRING_NAME); > > numEntriesPerRange = n == null ? Integer.MAX_VALUE : > Integer.parseInt(n); > > numEntries = 0; > > } > > > > // this is only ever called immediately after getting "next" entry > > @Override > > protected void consume() throws IOException { > > if (numEntries < numEntriesPerRange) { > > ++numEntries; > > return; > > } > > int count = 0; > > while (getSource().hasTop()) { > > if (count < numScans) { > > ++count; > > getSource().next(); // scan > > } else { > > // too many scans, just seek to end of range > > Key lastKey = latestRange.getEndKey() == null ? new Key(new > Text(String.valueOf(Character.MAX_VALUE))) : > latestRange.getEndKey().followingKey(PartialKey.ROW); > > getSource().seek(new Range(lastKey, true, lastKey, > false), latestColumnFamilies, latestInclusive); > > } > > } > > } > > > > @Override > > public void seek(Range range, Collection<ByteSequence> columnFamilies, > boolean inclusive) throws IOException { > > // save parameters for future internal seeks > > latestRange = range; > > latestColumnFamilies = columnFamilies; > > latestInclusive = inclusive; > > > > super.seek(range, columnFamilies, inclusive); > > > > if (getSource().hasTop()) { > > if (range.beforeStartKey(getSource().getTopKey())) > > consume(); > > } > > } > > > > I did some initial testing and it seems to work as expected, bringing back > N * number of tablets results. However, when I increase the limit past a > certain point something seems to be messing up and I get all entries back > instead of the limited count. I also sometimes see this error but I looked > online and I'm not sure if it's related: > > > > 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output > stream. > > java.io.IOException: The stream is closed > > at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS > tream.java:118) > > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea > m.java:82) > > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java > :140) > > at java.io.FilterOutputStream.close(FilterOutputStream.java:158) > > at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre > amTransport.java:110) > > at org.apache.thrift.transport.TFramedTransport.close(TFramedTr > ansport.java:89) > > at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac > hedTTransport.close(ThriftTransportPool.java:312) > > at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret > urnTransport(ThriftTransportPool.java:584) > > at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift > Util.java:134) > > at org.apache.accumulo.core.client.impl.TabletServerBatchReader > Iterator.doLookup(TabletServerBatchReaderIterator.java:714) > > at org.apache.accumulo.core.client.impl.TabletServerBatchReader > Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376) > > at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace > Runnable.java:47) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > Executor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > lExecutor.java:617) > > at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace > Runnable.java:47) > > at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun > nable.java:34) > > at java.lang.Thread.run(Thread.java:745) > > > > Does anyone have any idea why the iterator would work for lower values of > N but not higher ones? Also, I don’t have a lot of experience with > iterators and am not confident that the seek in consume() is right. What is > the best way to skip the rest of a range in an iterator? Or is this not > feasible? > > > > Any help would be greatly appreciated! > > Thanks, > > Ryan >
