Hello! You can supply StreamReceiver to DataStreamer which will be used for resolving upsert conflicts.
However, I have no idea about KafkaStreamer. Regards, -- Ilya Kasnacheev вт, 21 янв. 2020 г. в 12:14, v-shaal <[email protected]>: > Hi > > I have a kafka streamer to load data into a cache. while loading I want to > upsert the data in a way that there can be more than one stream updating > the > value existing in cache. > > for example > > KafkaStreamer<AffinityKey<Long>, UWTransactions> kafkaStreamer = new > KafkaStreamer<>(); > IgniteDataStreamer<AffinityKey<Long>, UWTransactions> stmr > =Ignition.ignite().dataStreamer(KAFKA_CACHE); > > kafkaStreamer.setIgnite(ignite); > kafkaStreamer.setStreamer(stmr); > > kafkaStreamer.setMultipleTupleExtractor( > record -> { > Map<Long, UWTransactions> entries = new HashMap<>(); > try { > ObjectMapper mapper = new ObjectMapper(); > > Person person = mapper.readValue(record.value().toString(), > Person.class); > entries.put(person.userId, person); > } > catch (Exception ex) { > fail("Unexpected error." + ex); > } > return entries; > }); > > now say Person Class has following attribute: > 1) name > 2) salary > 3) age > 4) address > 5)userId > > and there is another stream, and another kafkaStreamer which only has > userId > and salary info > ie. say Salary class with attribute > 1)Salary > 2) userId > > and I want to check if the UserId key exist and update only Salary > component > , keeping (name,age,address) as pervious > > How can I achieve it . and what is the best way? > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
