Well, I got it to work -- I just don't know why it's working.  Or, more 
specifically, why it wasn't working before.

The way I got it to work was simply setting the consumer's deserializer to our 
custom codec class instead of trying to pass the Exchange body as a byte[] to a 
Processor that called the custom codec.

Basically, as soon as I did the following, everything worked:

in the properties:

         kafka.consumer.deserializer = <our.org>.MyObjectCodec

in the route:

         .process(new Processor() {
             public void process (Exchange e) throws Exception {
                 MyObject obj = e.getIn().getBody(MyObject.class);
         [...]

Anyway............

Thanks, Camel.  It's all quite simple... once I know what I'm doing.

> On 07/22/2021 8:07 PM Ron Cecchini <roncecch...@comcast.net> wrote:
> 
>  
> Hi, all.
> 
> There is a non-Camel producer that takes a Java object and serializes & 
> Snappy-compresses it and writes it directly to a Kafka topic.
> 
> I have a Camel-based consumer which is trying to take the raw byte[] off the 
> Kafka topic and pass it to a custom method that does both the Snappy 
> decompression and deserialization.
> 
> The consumer route basically looks like:
> 
>     
> from("kafka:<topic>?brokers=<brokers>&groupId=<groupid>&autoOffsetReset=earliest&consumersCount=1")
>         .convertBodyTo(byte[].class)
>         .process(new Processor() {
>             public void process (Exchange e) throws Exception {
>                 byte[] kryoSer = (byte[]) e.getIn().getBody(byte[].class);
>                 // pass 'kryoSer' to custom decompress/deserialize method
>         [...]
> 
> When I try it, I'm getting a "FAILED_TO_UNCOMPRESS" error:
> 
>     java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>         at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>       at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
> [...]
> 
> When I print out the body of the Exchange before doing the .process(), I get 
> mixture of binary gibberish, but at the top of it is "SNAPPY", following by 
> lots of binary, and then a human-readable string that clearly looks like the 
> original object that was written.  
> 
> It's my understanding that if the string was still truly compressed that 
> there is no way there should be any readable text whatsoever.  So now I'm 
> wondering,
> 
> Is Camal:Kafka perhaps automatically doing *some* amount of decompression 
> before I get to the .process()?
> 
> I'm reading through the component doc but nothing is jumping out at me yet. 
> (there's a lot there to process...)
> 
> Thanks for any help.
> 
> Ron

Reply via email to