Got into a problem when testing a vnode setup.
I'm using a byteordered partitioner, linux, code version 2.0.4, replication 
factor 1, 4 machine
All goes ok until I run cleanup, and gets worse when adding / decommissioning 
nodes.

In my opinion the problem can be found in the SSTableScanner:: 
KeyScanningIterator::computeNext routine at the lines

                        currentRange = rangeIterator.next();
                        seekToCurrentRangeStart();
                        if (ifile.isEOF())return endOfData();

To see what is wrong, think of having 3 ranges in the list, and both the first 
and second range will not produce a valid currentKey. The first time in the 
loop we get the first range, and then call seekToCurrentRangeStart(). That 
routine doesn't do anything in that case, so then the first key is read from 
the sstable. But this first key does not match the first range, so we loop 
again. We get the second range and call seekToCurrentRangeStart() again. Again 
this does not do anything, leaving all file pointers. So then a new currentKey 
is read from the sstable BUT that should not be the case. We should, in that 
case, continue to test with the 'old' currentKey. So in that case we are 
SKIPPING (possible) VALID RECORDS !!!

To make things worse, in my test case, I only had one key. So when I get into 
the second loop, the test isEOF() was true, so the routine stopped immediately 
having 100 ranges still to test.

Anyway, attached a new version of the SSTableScanner.java file. Seems to work 
for me, but I'm sure a more experienced eye should have a look at this problem 
(and/or possible other scanners and/or situations like scrub, range queries 
...?).

Well, I hope I'm wrong about this....

Regards,

Ignace Desimpel




/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.io.sstable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;

public class SSTableScanner implements ICompactionScanner
{
    protected final RandomAccessReader dfile;
    protected final RandomAccessReader ifile;
    public final SSTableReader sstable;

    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
    private AbstractBounds<RowPosition> currentRange;

    private final DataRange dataRange;

    protected Iterator<OnDiskAtomIterator> iterator;

    /**
     * @param sstable SSTable to scan; must not be null
     * @param dataRange a single range to scan; must not be null
     * @param limiter background i/o RateLimiter; may be null
     */
    SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter 
limiter)
    {
        assert sstable != null;

        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
        this.ifile = sstable.openIndexReader();
        this.sstable = sstable;
        this.dataRange = dataRange;

        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
        if (dataRange.isWrapAround() && 
!dataRange.stopKey().isMinimum(sstable.partitioner))
        {
            // split the wrapping range into two parts: 1) the part that starts 
at the beginning of the sstable, and
            // 2) the part that comes before the wrap-around
            boundsList.add(new 
Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), 
dataRange.stopKey(), sstable.partitioner));
            boundsList.add(new Bounds<>(dataRange.startKey(), 
sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner));
        }
        else
        {
            boundsList.add(new Bounds<>(dataRange.startKey(), 
dataRange.stopKey(), sstable.partitioner));
        }
        this.rangeIterator = boundsList.iterator();
    }

    /**
     * @param sstable SSTable to scan; must not be null
     * @param tokenRanges A set of token ranges to scan
     * @param limiter background i/o RateLimiter; may be null
     */
    SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, 
RateLimiter limiter)
    {
        assert sstable != null;

        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
        this.ifile = sstable.openIndexReader();
        this.sstable = sstable;
        this.dataRange = null;

        List<Range<Token>> normalized = Range.normalize(tokenRanges);
        List<AbstractBounds<RowPosition>> boundsList = new 
ArrayList<>(normalized.size());
        for (Range<Token> range : normalized)
            boundsList.add(new 
Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner),
                                                  
range.right.maxKeyBound(sstable.partitioner),
                                                  sstable.partitioner));

        this.rangeIterator = boundsList.iterator();
    }

    private boolean seekToCurrentRangeStart()
    {
        if (currentRange.left.isMinimum(sstable.partitioner))
            return false;

        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
        // -1 means the key is before everything in the sstable. So just start 
from the beginning.
        if (indexPosition == -1)
            return false;
        
        ifile.seek(indexPosition);
        try
        {

            while (!ifile.isEOF())
            {
                indexPosition = ifile.getFilePointer();
                DecoratedKey indexDecoratedKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                int comparison = indexDecoratedKey.compareTo(currentRange.left);
                // because our range start may be inclusive or exclusive, we 
need to also contains()
                // instead of just checking (comparison >= 0)
                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
                {
                    // Found, just read the dataPosition and seek into index 
and data files
                    long dataPosition = ifile.readLong();
                    ifile.seek(indexPosition);
                    dfile.seek(dataPosition);
                    return true;
                }
                else
                {
                    RowIndexEntry.serializer.skip(ifile);
                }
            }
            return true;
        }
        catch (IOException e)
        {
            sstable.markSuspect();
            throw new CorruptSSTableException(e, sstable.getFilename());
        }        
    }

    public void close() throws IOException
    {
        FileUtils.close(dfile, ifile);
    }

    public long getLengthInBytes()
    {
        return dfile.length();
    }

    public long getCurrentPosition()
    {
        return dfile.getFilePointer();
    }

    public String getBackingFiles()
    {
        return sstable.toString();
    }

    public boolean hasNext()
    {
        if (iterator == null)
            iterator = createIterator();
        return iterator.hasNext();
    }

    public OnDiskAtomIterator next()
    {
        if (iterator == null)
            iterator = createIterator();
        return iterator.next();
    }

    public void remove()
    {
        throw new UnsupportedOperationException();
    }

    private Iterator<OnDiskAtomIterator> createIterator()
    {
        return new KeyScanningIterator();
    }
    
    protected class KeyScanningIterator extends 
AbstractIterator<OnDiskAtomIterator>
    {
        private DecoratedKey nextKey;
        private RowIndexEntry nextEntry;
        private DecoratedKey currentKey;
        private RowIndexEntry currentEntry;
        protected OnDiskAtomIterator computeNext()
        {
            try
            {
                if (nextEntry == null)
                {
                        currentKey = null;
                        currentEntry = null;
                    do
                    {
                        // we're starting the first range or we just passed the 
end of the previous range
                        if (!rangeIterator.hasNext())
                            return endOfData();

                        currentRange = rangeIterator.next();
                        
                        if( currentKey != null && 
currentRange.contains(currentKey) ) {
                                break;
                        } else {                                
                                //Did we actually do a seek ?
                                if ( seekToCurrentRangeStart() || currentKey == 
null ){
                                        if (ifile.isEOF())
                                                return endOfData();
                                        currentKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                                    currentEntry = 
RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
                                    nextKey = null;
                                    nextEntry = null;
                                }
                        
                        }

                    } while (!currentRange.contains(currentKey));
                }
                else
                {
                    // we're in the middle of a range
                    currentKey = nextKey;
                    currentEntry = nextEntry;
                    nextKey = null; 
                    nextEntry = null;
                }

                long readEnd;
                if ( nextKey == null ) {
                        if (ifile.isEOF())
                    {
                        readEnd = dfile.length();
                    } else {
                                nextKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
                            nextEntry = 
RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);        
                    
                            readEnd = nextEntry.position;
                            if (!currentRange.contains(nextKey))
                            {
                                nextKey = null;
                                nextEntry = null;
                            }
                    }
                } else {
                        readEnd = nextEntry.position;
                }

                if (dataRange == null || 
dataRange.selectsFullRowFor(currentKey.key))
                {
                    dfile.seek(currentEntry.position);
                    ByteBufferUtil.readWithShortLength(dfile); // key
                    if (sstable.descriptor.version.hasRowSizeAndColumnCount)
                        dfile.readLong();
                    long dataSize = readEnd - dfile.getFilePointer();
                    return new SSTableIdentityIterator(sstable, dfile, 
currentKey, dataSize);
                }

                return new LazyColumnIterator(currentKey, new 
IColumnIteratorFactory()
                {
                    public OnDiskAtomIterator create()
                    {
                        return 
dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, 
currentKey, currentEntry);
                    }
                });

            }
            catch (IOException e)
            {
                sstable.markSuspect();
                throw new CorruptSSTableException(e, sstable.getFilename());
            }
        }
    }

    @Override
    public String toString()
    {
        return getClass().getSimpleName() + "(" +
               "dfile=" + dfile +
               " ifile=" + ifile +
               " sstable=" + sstable +
               ")";
    }
}

Reply via email to