Yes,. that is only exception i see in logs. i will try the debug option.
thanks.

though data streamer is not returning exception all the time,
IgniteCache#size() remains empty all the time. It weird.

1.
for (Map.Entry<K, V> entry : m.entrySet()){
                getStreamer().addData(entry.getKey(), entry.getValue());

                }

2.

for (Map.Entry<K, V> entry : m.entrySet()){
                 cache.put((String)entry.getKey(), (Person)
entry.getValue());
                }

3.
for (Map.Entry<K, V> entry : m.entrySet()){
                 cache.replace((String)entry.getKey(), (Person)
entry.getValue());
                 }


cache size with #1 & #3  is 0
cache size with #2 is 1 as expected.

Have you see similar issue before ?

Thanks



On 3 November 2016 at 16:33, Anton Vinogradov <[email protected]>
wrote:

> Anil,
>
> Is it first and only exception at logs?
>
> Is it possible to debud this?
> You can set breakpoint at first line of org.apache.ignite.internal.
> processors.datastreamer.DataStreamerImpl#closeEx(boolean,
> org.apache.ignite.IgniteCheckedException)
> This will give you information who stopping the datastreamer.
>
> On Thu, Nov 3, 2016 at 1:41 PM, Anil <[email protected]> wrote:
>
>> Hi Anton,
>> No. ignite nodes looks good.
>>
>> I have attached my KafkaCacheDataStreamer class and following is the code
>> to listen to the kafka topic. IgniteCache is created using java
>> configuration.
>>
>> I see cache size is zero after adding the entries to cache as well from
>> KafkaCacheDataStreamer. Not sure how to log whether the entries added to
>> cache or not.
>>
>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
>> KafkaCacheDataStreamer<String, String, Person>();
>>
>>  Properites pros = new Properites() // kafka properties
>>  ConsumerConfig consumerConfig = new ConsumerConfig(props);
>>
>>     try {
>>     IgniteDataStreamer<String, Person> stmr =
>> ignite.dataStreamer(CacheManager.PERSON_CACHE);
>>        // allow overwriting cache data
>>        stmr.allowOverwrite(true);
>>
>>        kafkaStreamer.setIgnite(ignite);
>>        kafkaStreamer.setStreamer(stmr);
>>
>>        // set the topic
>>        kafkaStreamer.setTopic(kafkaConfig.getString("topic",
>> "TestTopic"));
>>
>>        // set the number of threads to process Kafka streams
>>        kafkaStreamer.setThreads(1);
>>
>>        // set Kafka consumer configurations
>>        kafkaStreamer.setConsumerConfig(consumerConfig);
>>
>>        // set decoders
>>        kafkaStreamer.setKeyDecoder(new StringDecoder(new
>> VerifiableProperties()));
>>        kafkaStreamer.setValueDecoder(new StringDecoder(new
>> VerifiableProperties()));
>>        kafkaStreamer.setMultipleTupleExtractor(new
>> StreamMultipleTupleExtractor<String, String, Person>() {
>>     @Override
>>     public Map<String, Person> extract(String msg) {
>>     Map<String, Person> entries = new HashMap<>();
>>     try {
>>     KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
>>     IgniteCache<String, Person> cache = CacheManager.getCache();
>>
>>     if (CollectionUtils.isNotEmpty(request.getPersons())){
>>     String id = null;
>>     for (Person ib : request.getPersons()){
>>     if (StringUtils.isNotBlank(ib.getId())){
>>     id = ib.getId();
>>     if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
>>     cache.remove(id);
>>     }else {
>>     // no need to store the id. so setting null.
>>     ib.setId(null);
>>     entries.put(id, ib);
>>     }
>>     }
>>     }
>>     }else {
>>
>>     }
>>     }catch (Exception ex){
>>     logger.error("Error while updating the cache - {} {} " ,msg, ex);
>>     }
>>
>>     return entries;
>>     }
>>     });
>>
>>        kafkaStreamer.start();
>>     }catch (Exception ex){
>>     logger.error("Error in kafka data streamer ", ex);
>>     }
>>
>>
>> Please let me know if you see any issues. thanks.
>>
>> On 3 November 2016 at 15:59, Anton Vinogradov <[email protected]>
>> wrote:
>>
>>> Anil,
>>>
>>> Could you provide getStreamer() code and full logs?
>>> Possible, ignite node was disconnected and this cause DataStreamer
>>> closure.
>>>
>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:
>>>
>>>> HI,
>>>>
>>>> I have created custom kafka data streamer for my use case and i see
>>>> following exception.
>>>>
>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:507)
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:498)
>>>>         at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka
>>>> CacheDataStreamer.java:128)
>>>>         at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache
>>>> DataStreamer.java:176)
>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>>> s.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>
>>>> addMessage method is
>>>>
>>>>  @Override
>>>>     protected void addMessage(T msg) {
>>>>     if (getMultipleTupleExtractor() == null){
>>>>             Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>>>>
>>>>             if (e != null)
>>>>                 getStreamer().addData(e);
>>>>
>>>>         } else {
>>>>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>>>             if (m != null && !m.isEmpty()){
>>>>                 getStreamer().addData(m);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>>
>>>> Do you see any issue ? Please let me know if you need any additional
>>>> information. thanks.
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>

Reply via email to