*Code : *
public static void main(String[] args) throws InterruptedException {
//Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("ignite.xml")) {
System.out.println();
System.out.println(">>> Cache query example started.");
CacheConfiguration<String, AllEventsAttributes> kafkaCache = new
CacheConfiguration<>(UA_Cache);
kafkaCache.setCacheMode(CacheMode.PARTITIONED);
kafkaCache.setIndexedTypes(AffinityKey.class,AllEventsAttributes.class);
KafkaStreamer<String, AllEventsAttributes> kafkaStreamer = new
KafkaStreamer<>();
ignite.getOrCreateCache(kafkaCache);
IgniteDataStreamer<String, AllEventsAttributes> stmr
=Ignition.ignite().dataStreamer(UA_Cache);
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
List<String> topics= new ArrayList<String>();
topics.add("allEvents");
// set the topic
kafkaStreamer.setTopic(topics);
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(20);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "allEvents");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
kafkaStreamer.setConsumerConfig(props);
final CountDownLatch latch = new CountDownLatch(40);
kafkaStreamer.setMultipleTupleExtractor(
record ->{
Map<String, AllEventsAttributes> entries = new HashMap<>();
try{
ObjectMapper mapper = new ObjectMapper();
AllEvents allEvents =
mapper.readValue(record.value().toString(), AllEvents.class);
if(!(allEvents.UserId.equals("0")) &&
!(allEvents.UserId.equals("")) && !allEvents.UserId.isEmpty()){
AllEventsAttributes allEventsAttributes = new
AllEventsAttributes(allEvents.UserId,
allEvents.RecUpdatedAt,allEvents.RecUpdatedAt);
entries.put(allEventsAttributes.UserId,allEventsAttributes);
}
/* String val = record.value().toString();
}catch (Exception ex) {
System.out.println("Unexpected error." + ex);
}
return entries;
}
);
kafkaStreamer.start();
System.out.println("Kafka streamer started!");
latch.await();
}
}
}
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/