An update: I got it to work in a pretty hacky way with this:

    /**
     * Code copied from SnappyCodec.decompress
     */
    public ByteBuffer decompress(ByteBuffer in) throws IOException {
        CRC32 crc32 = new CRC32();
        ByteBuffer out = ByteBuffer.allocate

(Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
        int size =
Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
            out.array(), 0);
        out.limit(size);
        crc32.reset();
        crc32.update(out.array(), 0, size);
        if (in.getInt(in.limit()-4) != (int)crc32.getValue())
            throw new IOException("Checksum failure");
        return out;
    }

    public void forEachRecordInBlock(
        ByteString rawBlockBytes, Schema schema, Consumer<GenericRecord>
consumer) throws IOException {
        GenericDatumReader<GenericRecord> datumReader = new
GenericDatumReader<>(schema);
        ByteBuffer buffer = ByteBuffer.wrap(rawBlockBytes.toByteArray());
        ByteBuffer decompressed = null;

        // Hack: Skip some bytes until we can decompress the data
        for (int skip = 1; skip < 20; skip++) {
            // Skip a byte
            buffer.get();
            try {
                decompressed = decompress(buffer);
                System.out.println("XXX skip: " + skip);
            } catch (Exception e) {
                // Ignore
            }
        }

        if (decompressed == null) {
            throw new RuntimeException("Failed reading block data");
        }
        else {
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
                decompressed.array(), 0, decompressed.remaining(), null);
            // Loop through all records inside the block
            while(true) {
                try {
                    GenericRecord record = datumReader.read(null, decoder);
                    consumer.accept(record);
                } catch (EOFException e) {
                    // Finished reading all records in the block
                    return;
                }
            }
        }
    }


I had a few questions:

(1) Notice the hacky byte skipping. From my testing so far, I find that it
is required to skip 3 or 5 bytes before landing on the beginning of the
compressed data. My understanding was that each block would start with two
longs: one for the block count and one for the block size. So I would have
expected to instead do 2xbuffer.getLong().  Could you explain why I have to
skip fewer bytes than 16, why it's sometimes 3 and sometimes 5, and what
those bytes actually contain?

(2) I had to copy the code from the SnappyCodec.decompress() method, as
that doesn't appear to be publicly accessible. Is that intentional, or is
there maybe a way to access it?

Thank you!

Julien

On Fri, Aug 14, 2020 at 11:25 AM Julien Phalip <[email protected]> wrote:

> Hi,
>
> My app receives Avro blocks in raw byte form, and I'm having trouble
> reading them.
>
> My function's input is a ByteString that contains all the bytes for a
> given Avro block, i.e all the bytes contained between 2 sync points of an
> Avro file.
>
> This is what I've got so far:
>
>     public void forEachRecordInBlock(
>         ByteString rawBlockBytes, Schema schema, Consumer<GenericRecord>
> consumer) throws IOException {
>         GenericDatumReader<GenericRecord> datumReader = new
> GenericDatumReader<>(schema);
>         BinaryDecoder decoder =
> DecoderFactory.get().binaryDecoder(rawBlockBytes.toByteArray(), null);
>         while(true) {
>             try {
>                 GenericRecord record = datumReader.read(null, decoder);
>                 consumer.accept(record);
>             } catch (EOFException e) {
>                 // Finished reading all records in the block
>                 return;
>             }
>         }
>     }
>
> However, it doesn't work as I'm getting this error:
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative:
> -39
>
> Do you know what I might be missing? How could I make this work?
>
> Thanks!
>
> Julien
>

Reply via email to