Hi, Sorry for the late reply, here is the command:
kafka-console-consumer.cmd --consumer.config my-config.properties --bootstrap-server kafka-host:9443 --topic my-topic --value-deserializer my.BasicDeserializer --group my-console-group --property print.key=true my-config.properties only contains SSL related config. Jorg On 2019/11/12 15:06:02, "M. Manna" <manme...@gmail.com> wrote: > HI, > > On Tue, 12 Nov 2019 at 14:37, Jorg Heymans <jorg.heym...@gmail.com> wrote: > > > Thanks for helping debugging this. You can reproduce the issue using below > > deserializer, and invoking kafka-console-consumer with > > --value-deserializer=my.BasicDeserializer . As you will see, when the > > consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed > > to the console. > > > > Thanks, > > Jorg > > > In the above, what command have you put exactly from command prompt ? can > you share this with us? > > Thanks, > > > > > public class BasicDeserializer implements Deserializer<String> { > > > > @Override > > public void configure(Map<String, ?> configs, boolean isKey) { > > System.out.println("CONFIGURE"); > > } > > > > @Override > > public String deserialize(String topic, byte[] data) { > > System.out.println("SERDE WITHOUT HEADERS"); > > return new String(data); > > } > > > > @Override > > public String deserialize(String topic, Headers headers, byte[] data) { > > System.out.println("SERDE WITH HEADERS"); > > return new String(data); > > } > > > > @Override > > public void close() { > > System.out.println("CLOSE"); > > } > > } > > > > > > > > > > On 2019/11/12 12:57:21, "M. Manna" <manme...@gmail.com> wrote: > > > HI again, > > > > > > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jorg.heym...@gmail.com> > > wrote: > > > > > > > Hi, > > > > > > > > The issue is not that i cannot get a custom deserializer working, it's > > > > that the custom deserializer i provide implements the default method > > from > > > > the Deserializer interface > > > > > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 > > > > that gives access to record Headers. > > > > > > > > The kafka console consumer never calls this method, it will only call > > the > > > > variant without Headers > > > > > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 > > > > > > > > I'm using kafka 2.3.0 btw. > > > > > > > > Jorg > > > > > > > > > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher > > > is calling default implementation of Deserializer.deserialize() with > > > header. The default implementation returns the implementation of > > > deserialize() with header. If you provide overridden version of > > > deserializer (for both header/non-header) it will be called. > > > > > > > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 > > > > > > > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 > > > > > > Console consumer simply puts a consumer wrapper around KafkaConsumer. > > There > > > is no change in behaviour otherwise. I take it that you've debugged and > > > confirmed that it's not calling your overridden deserialize() with > > headers? > > > If so, can you link it here for everyone's benefit? > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > On 2019/11/12 11:58:26, "M. Manna" <manme...@gmail.com> wrote: > > > > > > > > > > I think you can try the following to get your implementation working > > > > > > > > > > 1) Provide the SerDe classes into classpath > > > > > 2) Provide your consumer config file > > > > > 3) Provide key/value Deserializer props via --consumer-property arg. > > > > > > > > > > > > > > > > > > >