I'm trying to implement an on-disk queue, which contains avro records,
SpecificRecord

my queue implementation basically contains a
SpecificDatumWriter, and a SpecificDatumReader  pointing to the same file .

the problem is, that when the reader reaches the EOF, I can no longer
use it again,
even after I append more records to the file,  if I call the same
SpecificDatumReader.read() again,
it gave me exceptions:


-------------------------------------------------------------------------------
Test set: blah.MyTest
-------------------------------------------------------------------------------
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.257
sec <<< FAILURE!
testBasic(blah.MyTest)  Time elapsed: 0.24 sec  <<< ERROR!
java.lang.ArrayIndexOutOfBoundsException
    at java.lang.System.arraycopy(Native Method)
    at 
org.apache.avro.io.BinaryDecoder$ByteSource.compactAndFill(BinaryDecoder.java:670)
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:453)
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:120)
    at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:405)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:229)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:166)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:138)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:129)
    at blah.DiskEventsQueue.dequeue2(MyTest.java:55)
    at blah.MyTest.testBasic(MyTest.java:85)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)




Thanks
Yang
package blah;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


import java.io.*;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecord;
import org.junit.*;
import static org.junit.Assert.*;

class DiskEventsQueue {
    SpecificDatumWriter<Basket> writer ;
    Encoder enc ;
    SpecificDatumReader<Basket> reader ;
    Decoder dec ;

    public DiskEventsQueue() throws Exception {
    writer = new SpecificDatumWriter<Basket>(new Basket().getSchema());
    enc = EncoderFactory.get().binaryEncoder(new FileOutputStream("/tmp/blah"),null);
    reader = new SpecificDatumReader<Basket>(new Basket().getSchema());
    dec = DecoderFactory.get().binaryDecoder(new FileInputStream("/tmp/blah"),null);
    }

    public synchronized void enqueue(Basket ev) throws Exception {
        writer.write(ev, enc);
        enc.flush();
        this.notifyAll();
    }

    public synchronized Basket dequeue() throws Exception {
        try {
        reader.read(null,dec);
        } catch ( EOFException e ) {
            System.err.println("ddid reach end of file ");
            e.printStackTrace();
        }
        this.wait();
       return reader.read(null , dec);
    }

    public synchronized Basket dequeue2() throws Exception {
        try {
        return reader.read(null,dec);
        } catch ( EOFException e ) {
            System.err.println("ddid reach end of file ");
            e.printStackTrace();
        }
        return null;
    }


}

public class MyTest {

    @Test
    public void testBasic() throws Exception {
        final DiskEventsQueue q = new DiskEventsQueue();
        final Basket ev = new Basket();
        Orange o = new Orange();
        ev.single_field = o;
        o.color = "yellow";
        

        q.enqueue(ev);
        q.dequeue2();

        System.err.println("OKOK");

        q.dequeue2();
        q.enqueue(ev);
        System.err.println("before last dequeue");
        q.dequeue2();
        

        
    }
}

Reply via email to