I'm trying to implement an on-disk queue, which contains avro records,
SpecificRecord
my queue implementation basically contains a
SpecificDatumWriter, and a SpecificDatumReader pointing to the same file .
the problem is, that when the reader reaches the EOF, I can no longer
use it again,
even after I append more records to the file, if I call the same
SpecificDatumReader.read() again,
it gave me exceptions:
-------------------------------------------------------------------------------
Test set: blah.MyTest
-------------------------------------------------------------------------------
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.257
sec <<< FAILURE!
testBasic(blah.MyTest) Time elapsed: 0.24 sec <<< ERROR!
java.lang.ArrayIndexOutOfBoundsException
at java.lang.System.arraycopy(Native Method)
at
org.apache.avro.io.BinaryDecoder$ByteSource.compactAndFill(BinaryDecoder.java:670)
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:453)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:120)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:405)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:229)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:166)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:138)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:129)
at blah.DiskEventsQueue.dequeue2(MyTest.java:55)
at blah.MyTest.testBasic(MyTest.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
Thanks
Yang
package blah;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.io.*;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecord;
import org.junit.*;
import static org.junit.Assert.*;
class DiskEventsQueue {
SpecificDatumWriter<Basket> writer ;
Encoder enc ;
SpecificDatumReader<Basket> reader ;
Decoder dec ;
public DiskEventsQueue() throws Exception {
writer = new SpecificDatumWriter<Basket>(new Basket().getSchema());
enc = EncoderFactory.get().binaryEncoder(new FileOutputStream("/tmp/blah"),null);
reader = new SpecificDatumReader<Basket>(new Basket().getSchema());
dec = DecoderFactory.get().binaryDecoder(new FileInputStream("/tmp/blah"),null);
}
public synchronized void enqueue(Basket ev) throws Exception {
writer.write(ev, enc);
enc.flush();
this.notifyAll();
}
public synchronized Basket dequeue() throws Exception {
try {
reader.read(null,dec);
} catch ( EOFException e ) {
System.err.println("ddid reach end of file ");
e.printStackTrace();
}
this.wait();
return reader.read(null , dec);
}
public synchronized Basket dequeue2() throws Exception {
try {
return reader.read(null,dec);
} catch ( EOFException e ) {
System.err.println("ddid reach end of file ");
e.printStackTrace();
}
return null;
}
}
public class MyTest {
@Test
public void testBasic() throws Exception {
final DiskEventsQueue q = new DiskEventsQueue();
final Basket ev = new Basket();
Orange o = new Orange();
ev.single_field = o;
o.color = "yellow";
q.enqueue(ev);
q.dequeue2();
System.err.println("OKOK");
q.dequeue2();
q.enqueue(ev);
System.err.println("before last dequeue");
q.dequeue2();
}
}