Well, I got it to work -- I just don't know why it's working. Or, more specifically, why it wasn't working before.
The way I got it to work was simply setting the consumer's deserializer to our custom codec class instead of trying to pass the Exchange body as a byte[] to a Processor that called the custom codec. Basically, as soon as I did the following, everything worked: in the properties: kafka.consumer.deserializer = <our.org>.MyObjectCodec in the route: .process(new Processor() { public void process (Exchange e) throws Exception { MyObject obj = e.getIn().getBody(MyObject.class); [...] Anyway............ Thanks, Camel. It's all quite simple... once I know what I'm doing. > On 07/22/2021 8:07 PM Ron Cecchini <roncecch...@comcast.net> wrote: > > > Hi, all. > > There is a non-Camel producer that takes a Java object and serializes & > Snappy-compresses it and writes it directly to a Kafka topic. > > I have a Camel-based consumer which is trying to take the raw byte[] off the > Kafka topic and pass it to a custom method that does both the Snappy > decompression and deserialization. > > The consumer route basically looks like: > > > from("kafka:<topic>?brokers=<brokers>&groupId=<groupid>&autoOffsetReset=earliest&consumersCount=1") > .convertBodyTo(byte[].class) > .process(new Processor() { > public void process (Exchange e) throws Exception { > byte[] kryoSer = (byte[]) e.getIn().getBody(byte[].class); > // pass 'kryoSer' to custom decompress/deserialize method > [...] > > When I try it, I'm getting a "FAILED_TO_UNCOMPRESS" error: > > java.io.IOException: FAILED_TO_UNCOMPRESS(5) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > [...] > > When I print out the body of the Exchange before doing the .process(), I get > mixture of binary gibberish, but at the top of it is "SNAPPY", following by > lots of binary, and then a human-readable string that clearly looks like the > original object that was written. > > It's my understanding that if the string was still truly compressed that > there is no way there should be any readable text whatsoever. So now I'm > wondering, > > Is Camal:Kafka perhaps automatically doing *some* amount of decompression > before I get to the .process()? > > I'm reading through the component doc but nothing is jumping out at me yet. > (there's a lot there to process...) > > Thanks for any help. > > Ron