I need a bit more help. I really appreciate the help already provided by Kevin 
and Eric.

We've been testing Accumulo 1.4.0 on additional hardware platforms and have hit 
an unexpected issue. The compaction auto test (test/system/auth) fails. 
Interestingly, it fails every time on one machine and intermittently on another 
which makes me suspect it is some kind of race condition. At this point I can 
easily reproduce the problem and what I observe is that when the failure 
occurs, it always occurs in the same block of code but not on the same file. 

To be clear, when I run the following test:

        /run.py -t compact -d

I get this exception in the tserver log:

02 08:41:15,944 [tabletserver.TabletServer] WARN : exception while scanning 
tablet 1<<
java.io.IOException: invalid distance too far back
        at 
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native 
Method)
        at 
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
        at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
        at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
        at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:63)
        at java.io.DataInputStream.readInt(DataInputStream.java:370)
        at org.apache.accumulo.core.data.Value.readFields(Value.java:161)
        at 
org.apache.accumulo.core.file.map.MySequenceFile$Reader.getCurrentValue(MySequenceFile.java:1773)
        at 
org.apache.accumulo.core.file.map.MySequenceFile$Reader.next(MySequenceFile.java:1893)
        at 
org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:678)
        at 
org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:799)
        at 
org.apache.accumulo.core.file.map.MapFileOperations$RangeIterator.next(MapFileOperations.java:111)
        at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at 
org.apache.accumulo.core.iterators.SkippingIterator.next(SkippingIterator.java:29)
        at 
org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:77)
        at 
org.apache.accumulo.core.iterators.system.HeapIterator.next(HeapIterator.java:88)
        at 
org.apache.accumulo.core.iterators.system.DeletingIterator.next(DeletingIterator.java:58)
        at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
        at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
        at 
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
        at 
org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
        at 
org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:53)
        at 
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.readNext(SourceSwitchingIterator.java:120)
        at 
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.next(SourceSwitchingIterator.java:105)
        at 
org.apache.accumulo.server.tabletserver.Tablet.nextBatch(Tablet.java:1766)
        at 
org.apache.accumulo.server.tabletserver.Tablet.access$3200(Tablet.java:143)
        at 
org.apache.accumulo.server.tabletserver.Tablet$Scanner.read(Tablet.java:1883)
        at 
org.apache.accumulo.server.tabletserver.TabletServer$ThriftClientHandler$NextBatchTask.run(TabletServer.java:905)
        at 
org.apache.accumulo.cloudtrace.instrument.TraceRunnable.run(TraceRunnable.java:47)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

Don't worry about the line numbers being slightly off from the official source, 
that's an artifact of my having added a lot of comments and trace code to 
figure out what is happening. I've added debug statements as follows to the 
code in MySequenceFile.java and Value.java.

Value
public void readFields(final DataInput in) throws IOException {
System.err.println(new java.util.Date() + "KDB: " + 
java.lang.Thread.currentThread().getId() + ": object id " + in.hashCode() +", 
readInt()...");
    int len;
    try {
        len = in.readInt();
        } catch (IOException e) {

System.err.println(new java.util.Date() + "KDB: " + 
java.lang.Thread.currentThread().getId() + ": readInt() resulted in FAILURE");
        throw e;
        }
System.err.println(new java.util.Date() + "KDB: " + 
java.lang.Thread.currentThread().getId() + ": readInt() resulted in SUCCESS");
    this.value = new byte[len];
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
readFully() value of this many bytes: " + len);System.err.flush();
    in.readFully(this.value, 0, this.value.length);
  }

MySequenceFile
    /** Read a compressed buffer */
    private synchronized void readBuffer(DataInputBuffer buffer, 
CompressionInputStream filter) throws IOException {
      // Read data into a temporary buffer
      DataOutputBuffer dataBuffer = new DataOutputBuffer();

      try {
        int dataBufferLength = WritableUtils.readVInt(in);
        dataBuffer.write(in, dataBufferLength);

        // Set up 'buffer' connected to the input-stream:
        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
dataBuffer.getLength() " + dataBuffer.getLength() +  ", buffer.getLength() = " 
+ buffer.getLength() + ", buffer.getPosition() = " + buffer.getPosition());

      } finally {
        dataBuffer.close();
      }

      // Reset the codec
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
resetState()");
      filter.resetState();
    }

    private synchronized void seekToCurrentValue() throws IOException {
      if (!blockCompressed) {
        if (decompress) {
          valInFilter.resetState();
        }
        valBuffer.reset();
      } else {
        // Check if this is the first value in the 'block' to be read
        if (lazyDecompress && !valuesDecompressed) {
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
seekToCurrentValue calling readBuffer ");
          // Read the value lengths and values
          readBuffer(valLenBuffer, valLenInFilter);
          readBuffer(valBuffer, valInFilter);
          noBufferedValues = noBufferedRecords;
          valuesDecompressed = true;
// KDB hack
          //valIn.reset();
        }

        // Calculate the no. of bytes to skip
        // Note: 'current' key has already been read!
        int skipValBytes = 0;
        int currentKey = noBufferedKeys + 1;
        if (noBufferedValues <= noBufferedKeys) {
          throw new IOException("Cannot seek to current value twice");
        }

        for (int i = noBufferedValues; i > currentKey; --i) {
          skipValBytes += WritableUtils.readVInt(valLenIn);
          --noBufferedValues;
        }
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
seekToCurrentValue skipValBytes = " + skipValBytes +", valBuffer.getPosition() 
= " + valBuffer.getPosition());

        // Skip to the 'val' corresponding to 'current' key
        if (skipValBytes > 0) {
          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
            throw new IOException("Failed to seek to " + currentKey + "(th) 
value!");
          }
        }
      }
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ": 
seekToCurrentValue valIn.available() = " + valIn.available() + ", 
valBuffer.getPosition() = " + valBuffer.getPosition() + ", getLength() = " + 
valBuffer.getLength());
    }

    public synchronized void getCurrentValue(Writable val) throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val.readFields(valIn);

        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val + " read " + (valBuffer.getPosition() - 
keyLength) + " bytes, should read " + (valBuffer.getLength() - keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        String str = "KDB: " + java.lang.Thread.currentThread().getId() + ": 
Attempt readFields(). valLength = " + valLength + ", valIn.available() is " + 
valIn.available() ;
        System.err.println(str);
        System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + 
": " + file);
        System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + 
" valBuffer.getPosition() = " + valBuffer.getPosition() + ", getLength() = " + 
valBuffer.getLength());

        val.readFields(new DataInputStream(valInFilter));

        //val.readFields(valIn);

        // Read another compressed 'value'
        --noBufferedValues;

        // Sanity check
        if (valLength < 0) {
          LOG.debug(val + " is a zero-length value");
        }
      }

    }



When run, this is the sequence that yields a failure. The thread and file 
change every time.

KDB: 195: dataBuffer.getLength() 18864, buffer.getLength() = 18864, 
buffer.getPosition() = 0
KDB: 195: resetState()
KDB: 195: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 0
KDB: 195: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 
0, getLength() = 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 152878657, readInt()...
KDB: 195: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 195: 
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I000001j.map/data
KDB: 195 valBuffer.getPosition() = 0, getLength() = 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
Wed May 02 08:41:15 PDT 2012KDB: 195: object id 1041146387, readInt()...
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387268 colf:col_00000 
[L1&L2&G1&GROUP2] 1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 
40960, getLength() = 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174: 
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 850570553, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387269 colf:col_00000 
[L1&L2&G1&GROUP2] 1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() = 
40960, getLength() = 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174: 
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 1888129839, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
Wed May 02 08:41:15 PDT 2012KDB: 195: readInt() resulted in FAILURE

I'm trying to eliminate compression as part of the issue, but I haven't been 
able to. I disabled compression by editing TestUtils.py and I can see this in 
the tserver log:

02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize = 100K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize.index = 
128K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.type = none

But it's pretty clear the files are still compressed. I'd like to disable 
compression if you think it would be useful to get us a clearer picture of the 
issue. Is there a way to do that?

I've tried a number of things that have lead to dead ends, but basically I 
think this comes down to the following:
- next() is being used to get the next chunk of data
- we successfully obtain the keys
- we successfully read value data from the input stream as raw bytes (still 
compressed) into the internal buffer (valBuffer) which is indirectly referenced 
by the valIn stream
- as soon as we try to read the first integer from the valIn stream, we get the 
exception above.
- I can tell from my trace that the valBuffer has seems to have plenty of 
bytes, so I don't think this is something as simple as reading past the end of 
the buffer, but I could be wrong.
- the failure occurs "randomly" in the sense that the thread, buffer size, and 
even file being read when this fails, varies.

It's not clear to me if the problem is that the file is corrupted already or 
that the reading of the file is hitting a timing condition.


Any ideas much appreciated. I can of course provide any tracing you desire or 
make code changes.

Thank you,
Keys
________________________________
Keys Botzum
Senior Principal Technologist
WW Systems Engineering
[email protected]
443-718-0098
MapR Technologies
http://www.mapr.com

Reply via email to