Hi, I am playing with kafka streamer for my use case and noticed that message has to value of the ignite cache.
getStreamer().addData(msg.key(), msg.message()); ( https://github.com/apache/ignite/blob/master/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ) i tried with stream receiver to covert incoming kafka message to number of cache entries and did not help. Seems like stream receiver is not pre-process of cache entry. Correct ? To allow client to add its own way of processing, Kafka streamer must provide a way to transform kafka message into cache entries. what do you say ? code would be like below (just giving a try with sudo code) kafkaStreamer.registerTransformer(<some transformer>) if (null !=transformer){ getStreamer().addData(transformer.transform(msg.message())); }else { getStreamer().addData(msg.key(), msg.message()); } Thanks for your help.
