Hi Julien, To me it looks like `rawBlockBytes` doesn't contain what you think it contains. It is not Avro record (datum) nor it is compressed avro record. And it is not a block between sync markers.
If the original data is in Avro object container file format (as I guess it is since you are talking about data between sync markers) you should use `DataFileReader` to read it. -Mika On Aug 15 2020, at 5:25 am, Julien Phalip <[email protected]> wrote: > An update: I got it to work in a pretty hacky way with this: > > /** > * Code copied from SnappyCodec.decompress > */ > public ByteBuffer decompress(ByteBuffer in) throws IOException { > CRC32 crc32 = new CRC32(); > ByteBuffer out = ByteBuffer.allocate > > (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4)); > int size = > Snappy.uncompress(in.array(),in.position(),in.remaining()-4, > out.array(), 0); > out.limit(size); > crc32.reset(); > crc32.update(out.array(), 0, size); > if (in.getInt(in.limit()-4) != (int)crc32.getValue()) > throw new IOException("Checksum failure"); > return out; > } > > public void forEachRecordInBlock( > ByteString rawBlockBytes, Schema schema, > Consumer<GenericRecord> consumer) throws IOException { > GenericDatumReader<GenericRecord> datumReader = new > GenericDatumReader<>(schema); > ByteBuffer buffer = ByteBuffer.wrap(rawBlockBytes.toByteArray()); > ByteBuffer decompressed = null; > > // Hack: Skip some bytes until we can decompress the data > for (int skip = 1; skip < 20; skip++) { > // Skip a byte > buffer.get(); > try { > decompressed = decompress(buffer); > System.out.println("XXX skip: " + skip); > } catch (Exception e) { > // Ignore > } > } > > if (decompressed == null) { > throw new RuntimeException("Failed reading block data"); > } > else { > BinaryDecoder decoder = DecoderFactory.get().binaryDecoder( > decompressed.array(), 0, decompressed.remaining(), null); > // Loop through all records inside the block > while(true) { > try { > GenericRecord record = datumReader.read(null, decoder); > consumer.accept(record); > } catch (EOFException e) { > // Finished reading all records in the block > return; > } > } > } > } > > > I had a few questions: > > (1) Notice the hacky byte skipping. From my testing so far, I find > that it is required to skip 3 or 5 bytes before landing on the > beginning of the compressed data. My understanding was that each block > would start with two longs: one for the block count and one for the > block size. So I would have expected to instead do 2xbuffer.getLong(). > Could you explain why I have to skip fewer bytes than 16, why it's > sometimes 3 and sometimes 5, and what those bytes actually contain? > > (2) I had to copy the code from the SnappyCodec.decompress() method, > as that doesn't appear to be publicly accessible. Is that intentional, > or is there maybe a way to access it? > > Thank you! > > Julien > > >> On Fri, Aug 14, 2020 at 11:25 AM Julien Phalip <[email protected]> wrote: >> >>> Hi, >>> >>> My app receives Avro blocks in raw byte form, and I'm having trouble >>> reading them. >>> >>> My function's input is a ByteString that contains all the bytes for >>> a given Avro block, i.e all the bytes contained between 2 sync >>> points of an Avro file. >>> >>> This is what I've got so far: >>> >>> public void forEachRecordInBlock( >>> ByteString rawBlockBytes, Schema schema, >>> Consumer<GenericRecord> consumer) throws IOException { >>> GenericDatumReader<GenericRecord> datumReader = new >>> GenericDatumReader<>(schema); >>> BinaryDecoder decoder = >>> DecoderFactory.get().binaryDecoder(rawBlockBytes.toByteArray(), null); >>> while(true) { >>> try { >>> GenericRecord record = datumReader.read(null, decoder); >>> consumer.accept(record); >>> } catch (EOFException e) { >>> // Finished reading all records in the block >>> return; >>> } >>> } >>> } >>> >>> However, it doesn't work as I'm getting this error: >>> >>> org.apache.avro.AvroRuntimeException: Malformed data. Length is >>> negative: -39 >>> >>> Do you know what I might be missing? How could I make this work? >>> >>> Thanks! >>> >>> Julien
