Hello vbm,
I am working on the exact same problem. Did you find the solution for the
same.
I am using following code in my client application which will listen to
kafka connect (confluent).
I have one to one mapping for kafka topic and ignite cache. When there is an
insert into db, the kafka listener listens that and using gson library i am
converting json to object and the stmr.addData() works fine. But while
updating the value in db, i am facing marshller error.I tried to use
cache.put() method ,but it gives me cachewriteexception .
@KafkaListener(topics = { "kafka-Users" })
public void listenUsers(String message) {
logger.error(message);
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode;
try {
rootNode = mapper.readTree(message);
Users user = new Users();
IgniteDataStreamer<Long, Users> stmr =
ignite.dataStreamer(IgniteProperties.USERS_CACHE.getName());
// stmr.allowOverwrite(true);
/*
* stmr.receiver(new StreamTransformer<Long, Users>() {
*
* @Override public Object process(MutableEntry<Long,
Users> entry,
Object...
* arguments) throws EntryProcessorException { return
null; }
*
* });
*/
/*
* stmr.receiver(StreamTransformer.from((e, arg) -> {
Users val =
e.getValue();
* System.out.println(val+" user from reciever
$$$$$$$$$"); return null;
}));
*/
Gson gson = new
GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE).create();
user =
gson.fromJson(rootNode.get("payload").toString(), Users.class);
stmr.addData(rootNode.get("payload").get("UsersKey").asLong(), user);
stmr.flush(); //
// stmr.allowOverwrite(true);
} catch (Exception e) {
e.printStackTrace();
}
}
can you please share your solution for the same.
Thanks,
Om Thacker
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/