I need a bit more help. I really appreciate the help already provided by Kevin
and Eric.
We've been testing Accumulo 1.4.0 on additional hardware platforms and have hit
an unexpected issue. The compaction auto test (test/system/auth) fails.
Interestingly, it fails every time on one machine and intermittently on another
which makes me suspect it is some kind of race condition. At this point I can
easily reproduce the problem and what I observe is that when the failure
occurs, it always occurs in the same block of code but not on the same file.
To be clear, when I run the following test:
/run.py -t compact -d
I get this exception in the tserver log:
02 08:41:15,944 [tabletserver.TabletServer] WARN : exception while scanning
tablet 1<<
java.io.IOException: invalid distance too far back
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221)
at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:63)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.accumulo.core.data.Value.readFields(Value.java:161)
at
org.apache.accumulo.core.file.map.MySequenceFile$Reader.getCurrentValue(MySequenceFile.java:1773)
at
org.apache.accumulo.core.file.map.MySequenceFile$Reader.next(MySequenceFile.java:1893)
at
org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:678)
at
org.apache.accumulo.core.file.map.MyMapFile$Reader.next(MyMapFile.java:799)
at
org.apache.accumulo.core.file.map.MapFileOperations$RangeIterator.next(MapFileOperations.java:111)
at
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
at
org.apache.accumulo.core.iterators.SkippingIterator.next(SkippingIterator.java:29)
at
org.apache.accumulo.server.problems.ProblemReportingIterator.next(ProblemReportingIterator.java:77)
at
org.apache.accumulo.core.iterators.system.HeapIterator.next(HeapIterator.java:88)
at
org.apache.accumulo.core.iterators.system.DeletingIterator.next(DeletingIterator.java:58)
at
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
at
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
at org.apache.accumulo.core.iterators.Filter.next(Filter.java:58)
at
org.apache.accumulo.core.iterators.WrappingIterator.next(WrappingIterator.java:87)
at
org.apache.accumulo.core.iterators.user.VersioningIterator.skipRowColumn(VersioningIterator.java:103)
at
org.apache.accumulo.core.iterators.user.VersioningIterator.next(VersioningIterator.java:53)
at
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.readNext(SourceSwitchingIterator.java:120)
at
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.next(SourceSwitchingIterator.java:105)
at
org.apache.accumulo.server.tabletserver.Tablet.nextBatch(Tablet.java:1766)
at
org.apache.accumulo.server.tabletserver.Tablet.access$3200(Tablet.java:143)
at
org.apache.accumulo.server.tabletserver.Tablet$Scanner.read(Tablet.java:1883)
at
org.apache.accumulo.server.tabletserver.TabletServer$ThriftClientHandler$NextBatchTask.run(TabletServer.java:905)
at
org.apache.accumulo.cloudtrace.instrument.TraceRunnable.run(TraceRunnable.java:47)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Don't worry about the line numbers being slightly off from the official source,
that's an artifact of my having added a lot of comments and trace code to
figure out what is happening. I've added debug statements as follows to the
code in MySequenceFile.java and Value.java.
Value
public void readFields(final DataInput in) throws IOException {
System.err.println(new java.util.Date() + "KDB: " +
java.lang.Thread.currentThread().getId() + ": object id " + in.hashCode() +",
readInt()...");
int len;
try {
len = in.readInt();
} catch (IOException e) {
System.err.println(new java.util.Date() + "KDB: " +
java.lang.Thread.currentThread().getId() + ": readInt() resulted in FAILURE");
throw e;
}
System.err.println(new java.util.Date() + "KDB: " +
java.lang.Thread.currentThread().getId() + ": readInt() resulted in SUCCESS");
this.value = new byte[len];
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
readFully() value of this many bytes: " + len);System.err.flush();
in.readFully(this.value, 0, this.value.length);
}
MySequenceFile
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream:
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
dataBuffer.getLength() " + dataBuffer.getLength() + ", buffer.getLength() = "
+ buffer.getLength() + ", buffer.getPosition() = " + buffer.getPosition());
} finally {
dataBuffer.close();
}
// Reset the codec
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
resetState()");
filter.resetState();
}
private synchronized void seekToCurrentValue() throws IOException {
if (!blockCompressed) {
if (decompress) {
valInFilter.resetState();
}
valBuffer.reset();
} else {
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
seekToCurrentValue calling readBuffer ");
// Read the value lengths and values
readBuffer(valLenBuffer, valLenInFilter);
readBuffer(valBuffer, valInFilter);
noBufferedValues = noBufferedRecords;
valuesDecompressed = true;
// KDB hack
//valIn.reset();
}
// Calculate the no. of bytes to skip
// Note: 'current' key has already been read!
int skipValBytes = 0;
int currentKey = noBufferedKeys + 1;
if (noBufferedValues <= noBufferedKeys) {
throw new IOException("Cannot seek to current value twice");
}
for (int i = noBufferedValues; i > currentKey; --i) {
skipValBytes += WritableUtils.readVInt(valLenIn);
--noBufferedValues;
}
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
seekToCurrentValue skipValBytes = " + skipValBytes +", valBuffer.getPosition()
= " + valBuffer.getPosition());
// Skip to the 'val' corresponding to 'current' key
if (skipValBytes > 0) {
if (valIn.skipBytes(skipValBytes) != skipValBytes) {
throw new IOException("Failed to seek to " + currentKey + "(th)
value!");
}
}
}
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() + ":
seekToCurrentValue valIn.available() = " + valIn.available() + ",
valBuffer.getPosition() = " + valBuffer.getPosition() + ", getLength() = " +
valBuffer.getLength());
}
public synchronized void getCurrentValue(Writable val) throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val.readFields(valIn);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val + " read " + (valBuffer.getPosition() -
keyLength) + " bytes, should read " + (valBuffer.getLength() - keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
String str = "KDB: " + java.lang.Thread.currentThread().getId() + ":
Attempt readFields(). valLength = " + valLength + ", valIn.available() is " +
valIn.available() ;
System.err.println(str);
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() +
": " + file);
System.err.println("KDB: " + java.lang.Thread.currentThread().getId() +
" valBuffer.getPosition() = " + valBuffer.getPosition() + ", getLength() = " +
valBuffer.getLength());
val.readFields(new DataInputStream(valInFilter));
//val.readFields(valIn);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if (valLength < 0) {
LOG.debug(val + " is a zero-length value");
}
}
}
When run, this is the sequence that yields a failure. The thread and file
change every time.
KDB: 195: dataBuffer.getLength() 18864, buffer.getLength() = 18864,
buffer.getPosition() = 0
KDB: 195: resetState()
KDB: 195: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 0
KDB: 195: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() =
0, getLength() = 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 152878657, readInt()...
KDB: 195: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 195:
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I000001j.map/data
KDB: 195 valBuffer.getPosition() = 0, getLength() = 18864
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
Wed May 02 08:41:15 PDT 2012KDB: 195: object id 1041146387, readInt()...
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387268 colf:col_00000
[L1&L2&G1&GROUP2] 1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() =
40960, getLength() = 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174:
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 850570553, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
KDB: 174: readVInt
KDB: 174: readFields KeyIn
KDB: 174: done with next(key), more = true key=row_0000387269 colf:col_00000
[L1&L2&G1&GROUP2] 1 false
KDB: 174: seekToCurrentValue skipValBytes = 0, valBuffer.getPosition() = 40960
KDB: 174: seekToCurrentValue valIn.available() = 1, valBuffer.getPosition() =
40960, getLength() = 42744
KDB: 174: Attempt readFields(). valLength = 54, valIn.available() is 1
KDB: 174:
/user/mapr/accumulo-SE-test-04-19165/tables/1/b-0000000/I0000017.map/data
KDB: 174 valBuffer.getPosition() = 40960, getLength() = 42744
Wed May 02 08:41:15 PDT 2012KDB: 174: object id 1888129839, readInt()...
Wed May 02 08:41:15 PDT 2012KDB: 174: readInt() resulted in SUCCESS
KDB: 174: readFully() value of this many bytes: 50
KDB: 174: call next(key)
Wed May 02 08:41:15 PDT 2012KDB: 195: readInt() resulted in FAILURE
I'm trying to eliminate compression as part of the issue, but I haven't been
able to. I disabled compression by editing TestUtils.py and I can see this in
the tserver log:
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize = 100K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.blocksize.index =
128K
02 08:40:48,103 [server.Accumulo] INFO : table.file.compress.type = none
But it's pretty clear the files are still compressed. I'd like to disable
compression if you think it would be useful to get us a clearer picture of the
issue. Is there a way to do that?
I've tried a number of things that have lead to dead ends, but basically I
think this comes down to the following:
- next() is being used to get the next chunk of data
- we successfully obtain the keys
- we successfully read value data from the input stream as raw bytes (still
compressed) into the internal buffer (valBuffer) which is indirectly referenced
by the valIn stream
- as soon as we try to read the first integer from the valIn stream, we get the
exception above.
- I can tell from my trace that the valBuffer has seems to have plenty of
bytes, so I don't think this is something as simple as reading past the end of
the buffer, but I could be wrong.
- the failure occurs "randomly" in the sense that the thread, buffer size, and
even file being read when this fails, varies.
It's not clear to me if the problem is that the file is corrupted already or
that the reading of the file is hitting a timing condition.
Any ideas much appreciated. I can of course provide any tracing you desire or
make code changes.
Thank you,
Keys
________________________________
Keys Botzum
Senior Principal Technologist
WW Systems Engineering
[email protected]
443-718-0098
MapR Technologies
http://www.mapr.com