Hello!

You can supply StreamReceiver to DataStreamer which will be used for
resolving upsert conflicts.

However, I have no idea about KafkaStreamer.

Regards,
-- 
Ilya Kasnacheev


вт, 21 янв. 2020 г. в 12:14, v-shaal <[email protected]>:

> Hi
>
> I have a kafka streamer to load data into a cache.  while loading I want to
> upsert the data in a way that there can be more than one stream updating
> the
> value existing in cache.
>
> for example
>
> KafkaStreamer<AffinityKey&lt;Long>, UWTransactions> kafkaStreamer = new
> KafkaStreamer<>();
> IgniteDataStreamer<AffinityKey&lt;Long>, UWTransactions> stmr
> =Ignition.ignite().dataStreamer(KAFKA_CACHE);
>
> kafkaStreamer.setIgnite(ignite);
> kafkaStreamer.setStreamer(stmr);
>
> kafkaStreamer.setMultipleTupleExtractor(
>             record -> {
>               Map<Long, UWTransactions> entries = new HashMap<>();
>               try {
>                 ObjectMapper mapper = new ObjectMapper();
>
>                Person person = mapper.readValue(record.value().toString(),
> Person.class);
>                 entries.put(person.userId, person);
>               }
>               catch (Exception ex) {
>                 fail("Unexpected error." + ex);
>               }
>               return entries;
>             });
>
> now say Person Class has following attribute:
> 1) name
> 2) salary
> 3) age
> 4) address
> 5)userId
>
> and there is another stream, and another kafkaStreamer which only has
> userId
> and salary info
> ie. say Salary class with attribute
> 1)Salary
> 2) userId
>
> and I want to check if the UserId key exist and update only Salary
> component
> , keeping (name,age,address) as pervious
>
> How can I achieve it . and what is the best way?
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Reply via email to