Hi
I have a kafka streamer to load data into a cache. while loading I want to
upsert the data in a way that there can be more than one stream updating the
value existing in cache.
for example
KafkaStreamer<AffinityKey<Long>, UWTransactions> kafkaStreamer = new
KafkaStreamer<>();
IgniteDataStreamer<AffinityKey<Long>, UWTransactions> stmr
=Ignition.ignite().dataStreamer(KAFKA_CACHE);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
kafkaStreamer.setMultipleTupleExtractor(
record -> {
Map<Long, UWTransactions> entries = new HashMap<>();
try {
ObjectMapper mapper = new ObjectMapper();
Person person = mapper.readValue(record.value().toString(),
Person.class);
entries.put(person.userId, person);
}
catch (Exception ex) {
fail("Unexpected error." + ex);
}
return entries;
});
now say Person Class has following attribute:
1) name
2) salary
3) age
4) address
5)userId
and there is another stream, and another kafkaStreamer which only has userId
and salary info
ie. say Salary class with attribute
1)Salary
2) userId
and I want to check if the UserId key exist and update only Salary component
, keeping (name,age,address) as pervious
How can I achieve it . and what is the best way?
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/