An update: I got it to work in a pretty hacky way with this:
/**
* Code copied from SnappyCodec.decompress
*/
public ByteBuffer decompress(ByteBuffer in) throws IOException {
CRC32 crc32 = new CRC32();
ByteBuffer out = ByteBuffer.allocate
(Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
int size =
Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
out.array(), 0);
out.limit(size);
crc32.reset();
crc32.update(out.array(), 0, size);
if (in.getInt(in.limit()-4) != (int)crc32.getValue())
throw new IOException("Checksum failure");
return out;
}
public void forEachRecordInBlock(
ByteString rawBlockBytes, Schema schema, Consumer<GenericRecord>
consumer) throws IOException {
GenericDatumReader<GenericRecord> datumReader = new
GenericDatumReader<>(schema);
ByteBuffer buffer = ByteBuffer.wrap(rawBlockBytes.toByteArray());
ByteBuffer decompressed = null;
// Hack: Skip some bytes until we can decompress the data
for (int skip = 1; skip < 20; skip++) {
// Skip a byte
buffer.get();
try {
decompressed = decompress(buffer);
System.out.println("XXX skip: " + skip);
} catch (Exception e) {
// Ignore
}
}
if (decompressed == null) {
throw new RuntimeException("Failed reading block data");
}
else {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
decompressed.array(), 0, decompressed.remaining(), null);
// Loop through all records inside the block
while(true) {
try {
GenericRecord record = datumReader.read(null, decoder);
consumer.accept(record);
} catch (EOFException e) {
// Finished reading all records in the block
return;
}
}
}
}
I had a few questions:
(1) Notice the hacky byte skipping. From my testing so far, I find that it
is required to skip 3 or 5 bytes before landing on the beginning of the
compressed data. My understanding was that each block would start with two
longs: one for the block count and one for the block size. So I would have
expected to instead do 2xbuffer.getLong(). Could you explain why I have to
skip fewer bytes than 16, why it's sometimes 3 and sometimes 5, and what
those bytes actually contain?
(2) I had to copy the code from the SnappyCodec.decompress() method, as
that doesn't appear to be publicly accessible. Is that intentional, or is
there maybe a way to access it?
Thank you!
Julien
On Fri, Aug 14, 2020 at 11:25 AM Julien Phalip <[email protected]> wrote:
> Hi,
>
> My app receives Avro blocks in raw byte form, and I'm having trouble
> reading them.
>
> My function's input is a ByteString that contains all the bytes for a
> given Avro block, i.e all the bytes contained between 2 sync points of an
> Avro file.
>
> This is what I've got so far:
>
> public void forEachRecordInBlock(
> ByteString rawBlockBytes, Schema schema, Consumer<GenericRecord>
> consumer) throws IOException {
> GenericDatumReader<GenericRecord> datumReader = new
> GenericDatumReader<>(schema);
> BinaryDecoder decoder =
> DecoderFactory.get().binaryDecoder(rawBlockBytes.toByteArray(), null);
> while(true) {
> try {
> GenericRecord record = datumReader.read(null, decoder);
> consumer.accept(record);
> } catch (EOFException e) {
> // Finished reading all records in the block
> return;
> }
> }
> }
>
> However, it doesn't work as I'm getting this error:
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative:
> -39
>
> Do you know what I might be missing? How could I make this work?
>
> Thanks!
>
> Julien
>