Hi Anton, No. ignite nodes looks good. I have attached my KafkaCacheDataStreamer class and following is the code to listen to the kafka topic. IgniteCache is created using java configuration.
I see cache size is zero after adding the entries to cache as well from
KafkaCacheDataStreamer. Not sure how to log whether the entries added to
cache or not.
KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
KafkaCacheDataStreamer<String, String, Person>();
Properites pros = new Properites() // kafka properties
ConsumerConfig consumerConfig = new ConsumerConfig(props);
try {
IgniteDataStreamer<String, Person> stmr =
ignite.dataStreamer(CacheManager.PERSON_CACHE);
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic(kafkaConfig.getString("topic", "TestTopic"));
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(consumerConfig);
// set decoders
kafkaStreamer.setKeyDecoder(new StringDecoder(new
VerifiableProperties()));
kafkaStreamer.setValueDecoder(new StringDecoder(new
VerifiableProperties()));
kafkaStreamer.setMultipleTupleExtractor(new
StreamMultipleTupleExtractor<String, String, Person>() {
@Override
public Map<String, Person> extract(String msg) {
Map<String, Person> entries = new HashMap<>();
try {
KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
IgniteCache<String, Person> cache = CacheManager.getCache();
if (CollectionUtils.isNotEmpty(request.getPersons())){
String id = null;
for (Person ib : request.getPersons()){
if (StringUtils.isNotBlank(ib.getId())){
id = ib.getId();
if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
cache.remove(id);
}else {
// no need to store the id. so setting null.
ib.setId(null);
entries.put(id, ib);
}
}
}
}else {
}
}catch (Exception ex){
logger.error("Error while updating the cache - {} {} " ,msg, ex);
}
return entries;
}
});
kafkaStreamer.start();
}catch (Exception ex){
logger.error("Error in kafka data streamer ", ex);
}
Please let me know if you see any issues. thanks.
On 3 November 2016 at 15:59, Anton Vinogradov <[email protected]>
wrote:
> Anil,
>
> Could you provide getStreamer() code and full logs?
> Possible, ignite node was disconnected and this cause DataStreamer closure.
>
> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:
>
>> HI,
>>
>> I have created custom kafka data streamer for my use case and i see
>> following exception.
>>
>> java.lang.IllegalStateException: Data streamer has been closed.
>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.enterBusy(DataStreamerImpl.java:360)
>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:507)
>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:498)
>> at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka
>> CacheDataStreamer.java:128)
>> at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache
>> DataStreamer.java:176)
>> at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> addMessage method is
>>
>> @Override
>> protected void addMessage(T msg) {
>> if (getMultipleTupleExtractor() == null){
>> Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>>
>> if (e != null)
>> getStreamer().addData(e);
>>
>> } else {
>> Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>> if (m != null && !m.isEmpty()){
>> getStreamer().addData(m);
>> }
>> }
>> }
>>
>>
>> Do you see any issue ? Please let me know if you need any additional
>> information. thanks.
>>
>> Thanks.
>>
>
>
KafkaDataSteamer.java
Description: Binary data
