Hi Julien,

To me it looks like `rawBlockBytes` doesn't contain what you think it
contains. It is not Avro record (datum) nor it is compressed avro
record. And it is not a block between sync markers.

If the original data is in Avro  object container file format (as I
guess it is since you are talking about data between sync markers) you
should use `DataFileReader` to read it.

-Mika

On Aug 15 2020, at 5:25 am, Julien Phalip <[email protected]> wrote:

> An update: I got it to work in a pretty hacky way with this:
>  
>     /**
>      * Code copied from SnappyCodec.decompress
>      */
>     public ByteBuffer decompress(ByteBuffer in) throws IOException {
>         CRC32 crc32 = new CRC32();
>         ByteBuffer out = ByteBuffer.allocate
>             
> (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
>         int size = 
> Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
>             out.array(), 0);
>         out.limit(size);
>         crc32.reset();
>         crc32.update(out.array(), 0, size);
>         if (in.getInt(in.limit()-4) != (int)crc32.getValue())
>             throw new IOException("Checksum failure");
>         return out;
>     }
>  
>     public void forEachRecordInBlock(
>         ByteString rawBlockBytes, Schema schema,
> Consumer<GenericRecord> consumer) throws IOException {
>         GenericDatumReader<GenericRecord> datumReader = new 
> GenericDatumReader<>(schema);
>         ByteBuffer buffer = ByteBuffer.wrap(rawBlockBytes.toByteArray());
>         ByteBuffer decompressed = null;
>  
>         // Hack: Skip some bytes until we can decompress the data
>         for (int skip = 1; skip < 20; skip++) {
>             // Skip a byte
>             buffer.get();
>             try {
>                 decompressed = decompress(buffer);
>                 System.out.println("XXX skip: " + skip);
>             } catch (Exception e) {
>                 // Ignore
>             }
>         }
>  
>         if (decompressed == null) {
>             throw new RuntimeException("Failed reading block data");
>         }
>         else {
>             BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(
>                 decompressed.array(), 0, decompressed.remaining(), null);
>             // Loop through all records inside the block
>             while(true) {
>                 try {
>                     GenericRecord record = datumReader.read(null, decoder);
>                     consumer.accept(record);
>                 } catch (EOFException e) {
>                     // Finished reading all records in the block
>                     return;
>                 }
>             }
>         }
>     }
>  
>  
> I had a few questions:
>  
> (1) Notice the hacky byte skipping. From my testing so far, I find
> that it is required to skip 3 or 5 bytes before landing on the
> beginning of the compressed data. My understanding was that each block
> would start with two longs: one for the block count and one for the
> block size. So I would have expected to instead do 2xbuffer.getLong().
> Could you explain why I have to skip fewer bytes than 16, why it's
> sometimes 3 and sometimes 5, and what those bytes actually contain?
>  
> (2) I had to copy the code from the SnappyCodec.decompress() method,
> as that doesn't appear to be publicly accessible. Is that intentional,
> or is there maybe a way to access it?
>  
> Thank you!
>  
> Julien
>  
>  
>> On Fri, Aug 14, 2020 at 11:25 AM Julien Phalip <[email protected]> wrote:
>>  
>>> Hi,
>>>  
>>> My app receives Avro blocks in raw byte form, and I'm having trouble
>>> reading them.
>>>  
>>> My function's input is a ByteString that contains all the bytes for
>>> a given Avro block, i.e all the bytes contained between 2 sync
>>> points of an Avro file.
>>>  
>>> This is what I've got so far:
>>>  
>>>     public void forEachRecordInBlock(
>>>         ByteString rawBlockBytes, Schema schema,
>>> Consumer<GenericRecord> consumer) throws IOException {
>>>         GenericDatumReader<GenericRecord> datumReader = new 
>>> GenericDatumReader<>(schema);
>>>         BinaryDecoder decoder =
>>> DecoderFactory.get().binaryDecoder(rawBlockBytes.toByteArray(), null);
>>>         while(true) {
>>>             try {
>>>                 GenericRecord record = datumReader.read(null, decoder);
>>>                 consumer.accept(record);
>>>             } catch (EOFException e) {
>>>                 // Finished reading all records in the block
>>>                 return;
>>>             }
>>>         }
>>>     }
>>>  
>>> However, it doesn't work as I'm getting this error:
>>>  
>>> org.apache.avro.AvroRuntimeException: Malformed data. Length is
>>> negative: -39
>>>  
>>> Do you know what I might be missing? How could I make this work?
>>>  
>>> Thanks!
>>>  
>>> Julien

Reply via email to