Hi Anton,
No. ignite nodes looks good.

I have attached my KafkaCacheDataStreamer class and following is the code
to listen to the kafka topic. IgniteCache is created using java
configuration.

I see cache size is zero after adding the entries to cache as well from
KafkaCacheDataStreamer. Not sure how to log whether the entries added to
cache or not.

KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
KafkaCacheDataStreamer<String, String, Person>();

 Properites pros = new Properites() // kafka properties
 ConsumerConfig consumerConfig = new ConsumerConfig(props);

    try {
    IgniteDataStreamer<String, Person> stmr =
ignite.dataStreamer(CacheManager.PERSON_CACHE);
       // allow overwriting cache data
       stmr.allowOverwrite(true);

       kafkaStreamer.setIgnite(ignite);
       kafkaStreamer.setStreamer(stmr);

       // set the topic
       kafkaStreamer.setTopic(kafkaConfig.getString("topic", "TestTopic"));

       // set the number of threads to process Kafka streams
       kafkaStreamer.setThreads(1);

       // set Kafka consumer configurations
       kafkaStreamer.setConsumerConfig(consumerConfig);

       // set decoders
       kafkaStreamer.setKeyDecoder(new StringDecoder(new
VerifiableProperties()));
       kafkaStreamer.setValueDecoder(new StringDecoder(new
VerifiableProperties()));
       kafkaStreamer.setMultipleTupleExtractor(new
StreamMultipleTupleExtractor<String, String, Person>() {
    @Override
    public Map<String, Person> extract(String msg) {
    Map<String, Person> entries = new HashMap<>();
    try {
    KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
    IgniteCache<String, Person> cache = CacheManager.getCache();

    if (CollectionUtils.isNotEmpty(request.getPersons())){
    String id = null;
    for (Person ib : request.getPersons()){
    if (StringUtils.isNotBlank(ib.getId())){
    id = ib.getId();
    if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
    cache.remove(id);
    }else {
    // no need to store the id. so setting null.
    ib.setId(null);
    entries.put(id, ib);
    }
    }
    }
    }else {

    }
    }catch (Exception ex){
    logger.error("Error while updating the cache - {} {} " ,msg, ex);
    }

    return entries;
    }
    });

       kafkaStreamer.start();
    }catch (Exception ex){
    logger.error("Error in kafka data streamer ", ex);
    }


Please let me know if you see any issues. thanks.

On 3 November 2016 at 15:59, Anton Vinogradov <[email protected]>
wrote:

> Anil,
>
> Could you provide getStreamer() code and full logs?
> Possible, ignite node was disconnected and this cause DataStreamer closure.
>
> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:
>
>> HI,
>>
>> I have created custom kafka data streamer for my use case and i see
>> following exception.
>>
>> java.lang.IllegalStateException: Data streamer has been closed.
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:507)
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:498)
>>         at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka
>> CacheDataStreamer.java:128)
>>         at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache
>> DataStreamer.java:176)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> addMessage method is
>>
>>  @Override
>>     protected void addMessage(T msg) {
>>     if (getMultipleTupleExtractor() == null){
>>             Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>>
>>             if (e != null)
>>                 getStreamer().addData(e);
>>
>>         } else {
>>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>             if (m != null && !m.isEmpty()){
>>                 getStreamer().addData(m);
>>             }
>>         }
>>     }
>>
>>
>> Do you see any issue ? Please let me know if you need any additional
>> information. thanks.
>>
>> Thanks.
>>
>
>

Attachment: KafkaDataSteamer.java
Description: Binary data

Reply via email to