Hi

I have a kafka streamer to load data into a cache.  while loading I want to
upsert the data in a way that there can be more than one stream updating the
value existing in cache.

for example

KafkaStreamer<AffinityKey&lt;Long>, UWTransactions> kafkaStreamer = new
KafkaStreamer<>();
IgniteDataStreamer<AffinityKey&lt;Long>, UWTransactions> stmr
=Ignition.ignite().dataStreamer(KAFKA_CACHE);

kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);

kafkaStreamer.setMultipleTupleExtractor(
            record -> {
              Map<Long, UWTransactions> entries = new HashMap<>();
              try {
                ObjectMapper mapper = new ObjectMapper();
               
               Person person = mapper.readValue(record.value().toString(),
Person.class);
                entries.put(person.userId, person);
              }
              catch (Exception ex) {
                fail("Unexpected error." + ex);
              }
              return entries;
            });

now say Person Class has following attribute:
1) name
2) salary
3) age
4) address
5)userId

and there is another stream, and another kafkaStreamer which only has userId
and salary info
ie. say Salary class with attribute
1)Salary
2) userId

and I want to check if the UserId key exist and update only Salary component 
, keeping (name,age,address) as pervious

How can I achieve it . and what is the best way?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to