Got into a problem when testing a vnode setup. I'm using a byteordered partitioner, linux, code version 2.0.4, replication factor 1, 4 machine All goes ok until I run cleanup, and gets worse when adding / decommissioning nodes.
In my opinion the problem can be found in the SSTableScanner:: KeyScanningIterator::computeNext routine at the lines currentRange = rangeIterator.next(); seekToCurrentRangeStart(); if (ifile.isEOF())return endOfData(); To see what is wrong, think of having 3 ranges in the list, and both the first and second range will not produce a valid currentKey. The first time in the loop we get the first range, and then call seekToCurrentRangeStart(). That routine doesn't do anything in that case, so then the first key is read from the sstable. But this first key does not match the first range, so we loop again. We get the second range and call seekToCurrentRangeStart() again. Again this does not do anything, leaving all file pointers. So then a new currentKey is read from the sstable BUT that should not be the case. We should, in that case, continue to test with the 'old' currentKey. So in that case we are SKIPPING (possible) VALID RECORDS !!! To make things worse, in my test case, I only had one key. So when I get into the second loop, the test isEOF() was true, so the routine stopped immediately having 100 ranges still to test. Anyway, attached a new version of the SSTableScanner.java file. Seems to work for me, but I'm sure a more experienced eye should have a look at this problem (and/or possible other scanners and/or situations like scrub, range queries ...?). Well, I hope I'm wrong about this.... Regards, Ignace Desimpel
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.cassandra.io.sstable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import com.google.common.collect.AbstractIterator; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; import org.apache.cassandra.db.columniterator.LazyColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.compaction.ICompactionScanner; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; public class SSTableScanner implements ICompactionScanner { protected final RandomAccessReader dfile; protected final RandomAccessReader ifile; public final SSTableReader sstable; private final Iterator<AbstractBounds<RowPosition>> rangeIterator; private AbstractBounds<RowPosition> currentRange; private final DataRange dataRange; protected Iterator<OnDiskAtomIterator> iterator; /** * @param sstable SSTable to scan; must not be null * @param dataRange a single range to scan; must not be null * @param limiter background i/o RateLimiter; may be null */ SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) { assert sstable != null; this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); this.sstable = sstable; this.dataRange = dataRange; List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2); if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner)) { // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and // 2) the part that comes before the wrap-around boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey(), sstable.partitioner)); boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner)); } else { boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey(), sstable.partitioner)); } this.rangeIterator = boundsList.iterator(); } /** * @param sstable SSTable to scan; must not be null * @param tokenRanges A set of token ranges to scan * @param limiter background i/o RateLimiter; may be null */ SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) { assert sstable != null; this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); this.sstable = sstable; this.dataRange = null; List<Range<Token>> normalized = Range.normalize(tokenRanges); List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size()); for (Range<Token> range : normalized) boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner), range.right.maxKeyBound(sstable.partitioner), sstable.partitioner)); this.rangeIterator = boundsList.iterator(); } private boolean seekToCurrentRangeStart() { if (currentRange.left.isMinimum(sstable.partitioner)) return false; long indexPosition = sstable.getIndexScanPosition(currentRange.left); // -1 means the key is before everything in the sstable. So just start from the beginning. if (indexPosition == -1) return false; ifile.seek(indexPosition); try { while (!ifile.isEOF()) { indexPosition = ifile.getFilePointer(); DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); int comparison = indexDecoratedKey.compareTo(currentRange.left); // because our range start may be inclusive or exclusive, we need to also contains() // instead of just checking (comparison >= 0) if (comparison > 0 || currentRange.contains(indexDecoratedKey)) { // Found, just read the dataPosition and seek into index and data files long dataPosition = ifile.readLong(); ifile.seek(indexPosition); dfile.seek(dataPosition); return true; } else { RowIndexEntry.serializer.skip(ifile); } } return true; } catch (IOException e) { sstable.markSuspect(); throw new CorruptSSTableException(e, sstable.getFilename()); } } public void close() throws IOException { FileUtils.close(dfile, ifile); } public long getLengthInBytes() { return dfile.length(); } public long getCurrentPosition() { return dfile.getFilePointer(); } public String getBackingFiles() { return sstable.toString(); } public boolean hasNext() { if (iterator == null) iterator = createIterator(); return iterator.hasNext(); } public OnDiskAtomIterator next() { if (iterator == null) iterator = createIterator(); return iterator.next(); } public void remove() { throw new UnsupportedOperationException(); } private Iterator<OnDiskAtomIterator> createIterator() { return new KeyScanningIterator(); } protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator> { private DecoratedKey nextKey; private RowIndexEntry nextEntry; private DecoratedKey currentKey; private RowIndexEntry currentEntry; protected OnDiskAtomIterator computeNext() { try { if (nextEntry == null) { currentKey = null; currentEntry = null; do { // we're starting the first range or we just passed the end of the previous range if (!rangeIterator.hasNext()) return endOfData(); currentRange = rangeIterator.next(); if( currentKey != null && currentRange.contains(currentKey) ) { break; } else { //Did we actually do a seek ? if ( seekToCurrentRangeStart() || currentKey == null ){ if (ifile.isEOF()) return endOfData(); currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version); nextKey = null; nextEntry = null; } } } while (!currentRange.contains(currentKey)); } else { // we're in the middle of a range currentKey = nextKey; currentEntry = nextEntry; nextKey = null; nextEntry = null; } long readEnd; if ( nextKey == null ) { if (ifile.isEOF()) { readEnd = dfile.length(); } else { nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); nextEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version); readEnd = nextEntry.position; if (!currentRange.contains(nextKey)) { nextKey = null; nextEntry = null; } } } else { readEnd = nextEntry.position; } if (dataRange == null || dataRange.selectsFullRowFor(currentKey.key)) { dfile.seek(currentEntry.position); ByteBufferUtil.readWithShortLength(dfile); // key if (sstable.descriptor.version.hasRowSizeAndColumnCount) dfile.readLong(); long dataSize = readEnd - dfile.getFilePointer(); return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize); } return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() { public OnDiskAtomIterator create() { return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); } }); } catch (IOException e) { sstable.markSuspect(); throw new CorruptSSTableException(e, sstable.getFilename()); } } } @Override public String toString() { return getClass().getSimpleName() + "(" + "dfile=" + dfile + " ifile=" + ifile + " sstable=" + sstable + ")"; } }