Regarding data streamer - i will reproduce and share the complete code.
Regarding cache size() - i understood it completely now. i set auto flush
interval and it looks good now.

Thanks to both of you.

On 3 November 2016 at 17:41, Vladislav Pyatkov <[email protected]> wrote:

> Hi Anil,
>
> Please attach whole example.
> I can not found where is DataStreamer was closed in your case (Where is
> method o.a.i.IgniteDataStreamer#close() was invoked?).
>
> About cache size, It does not means anything, because Streamer caches
> entries and sends as batch.
> And yes, check each future, as Anton said, is a good point.
>
> On Thu, Nov 3, 2016 at 3:05 PM, Anton Vinogradov <[email protected]
> > wrote:
>
>> Anil,
>>
>> getStreamer().addData() will return you IgniteFuture. You can check it
>> result by fut.get().
>> get() will give you null in case data streamed and stored or throw an
>> exception.
>>
>> On Thu, Nov 3, 2016 at 2:32 PM, Anil <[email protected]> wrote:
>>
>>>
>>> Yes,. that is only exception i see in logs. i will try the debug option.
>>> thanks.
>>>
>>> though data streamer is not returning exception all the time,
>>> IgniteCache#size() remains empty all the time. It weird.
>>>
>>> 1.
>>> for (Map.Entry<K, V> entry : m.entrySet()){
>>>                 getStreamer().addData(entry.getKey(), entry.getValue());
>>>
>>>                 }
>>>
>>> 2.
>>>
>>> for (Map.Entry<K, V> entry : m.entrySet()){
>>>                  cache.put((String)entry.getKey(), (Person)
>>> entry.getValue());
>>>                 }
>>>
>>> 3.
>>> for (Map.Entry<K, V> entry : m.entrySet()){
>>>                  cache.replace((String)entry.getKey(), (Person)
>>> entry.getValue());
>>>                  }
>>>
>>>
>>> cache size with #1 & #3  is 0
>>> cache size with #2 is 1 as expected.
>>>
>>> Have you see similar issue before ?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On 3 November 2016 at 16:33, Anton Vinogradov <[email protected]>
>>> wrote:
>>>
>>>> Anil,
>>>>
>>>> Is it first and only exception at logs?
>>>>
>>>> Is it possible to debud this?
>>>> You can set breakpoint at first line of org.apache.ignite.internal.pro
>>>> cessors.datastreamer.DataStreamerImpl#closeEx(boolean,
>>>> org.apache.ignite.IgniteCheckedException)
>>>> This will give you information who stopping the datastreamer.
>>>>
>>>> On Thu, Nov 3, 2016 at 1:41 PM, Anil <[email protected]> wrote:
>>>>
>>>>> Hi Anton,
>>>>> No. ignite nodes looks good.
>>>>>
>>>>> I have attached my KafkaCacheDataStreamer class and following is the
>>>>> code to listen to the kafka topic. IgniteCache is created using java
>>>>> configuration.
>>>>>
>>>>> I see cache size is zero after adding the entries to cache as well
>>>>> from KafkaCacheDataStreamer. Not sure how to log whether the entries added
>>>>> to cache or not.
>>>>>
>>>>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
>>>>> KafkaCacheDataStreamer<String, String, Person>();
>>>>>
>>>>>  Properites pros = new Properites() // kafka properties
>>>>>  ConsumerConfig consumerConfig = new ConsumerConfig(props);
>>>>>
>>>>>     try {
>>>>>     IgniteDataStreamer<String, Person> stmr =
>>>>> ignite.dataStreamer(CacheManager.PERSON_CACHE);
>>>>>        // allow overwriting cache data
>>>>>        stmr.allowOverwrite(true);
>>>>>
>>>>>        kafkaStreamer.setIgnite(ignite);
>>>>>        kafkaStreamer.setStreamer(stmr);
>>>>>
>>>>>        // set the topic
>>>>>        kafkaStreamer.setTopic(kafkaConfig.getString("topic",
>>>>> "TestTopic"));
>>>>>
>>>>>        // set the number of threads to process Kafka streams
>>>>>        kafkaStreamer.setThreads(1);
>>>>>
>>>>>        // set Kafka consumer configurations
>>>>>        kafkaStreamer.setConsumerConfig(consumerConfig);
>>>>>
>>>>>        // set decoders
>>>>>        kafkaStreamer.setKeyDecoder(new StringDecoder(new
>>>>> VerifiableProperties()));
>>>>>        kafkaStreamer.setValueDecoder(new StringDecoder(new
>>>>> VerifiableProperties()));
>>>>>        kafkaStreamer.setMultipleTupleExtractor(new
>>>>> StreamMultipleTupleExtractor<String, String, Person>() {
>>>>>     @Override
>>>>>     public Map<String, Person> extract(String msg) {
>>>>>     Map<String, Person> entries = new HashMap<>();
>>>>>     try {
>>>>>     KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
>>>>>     IgniteCache<String, Person> cache = CacheManager.getCache();
>>>>>
>>>>>     if (CollectionUtils.isNotEmpty(request.getPersons())){
>>>>>     String id = null;
>>>>>     for (Person ib : request.getPersons()){
>>>>>     if (StringUtils.isNotBlank(ib.getId())){
>>>>>     id = ib.getId();
>>>>>     if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDelet
>>>>> ed())){
>>>>>     cache.remove(id);
>>>>>     }else {
>>>>>     // no need to store the id. so setting null.
>>>>>     ib.setId(null);
>>>>>     entries.put(id, ib);
>>>>>     }
>>>>>     }
>>>>>     }
>>>>>     }else {
>>>>>
>>>>>     }
>>>>>     }catch (Exception ex){
>>>>>     logger.error("Error while updating the cache - {} {} " ,msg, ex);
>>>>>     }
>>>>>
>>>>>     return entries;
>>>>>     }
>>>>>     });
>>>>>
>>>>>        kafkaStreamer.start();
>>>>>     }catch (Exception ex){
>>>>>     logger.error("Error in kafka data streamer ", ex);
>>>>>     }
>>>>>
>>>>>
>>>>> Please let me know if you see any issues. thanks.
>>>>>
>>>>> On 3 November 2016 at 15:59, Anton Vinogradov <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Anil,
>>>>>>
>>>>>> Could you provide getStreamer() code and full logs?
>>>>>> Possible, ignite node was disconnected and this cause DataStreamer
>>>>>> closure.
>>>>>>
>>>>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:
>>>>>>
>>>>>>> HI,
>>>>>>>
>>>>>>> I have created custom kafka data streamer for my use case and i see
>>>>>>> following exception.
>>>>>>>
>>>>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>>>>>         at org.apache.ignite.internal.pro
>>>>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer
>>>>>>> Impl.java:360)
>>>>>>>         at org.apache.ignite.internal.pro
>>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>>>> pl.java:507)
>>>>>>>         at org.apache.ignite.internal.pro
>>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>>>> pl.java:498)
>>>>>>>         at net.juniper.cs.cache.KafkaCach
>>>>>>> eDataStreamer.addMessage(KafkaCacheDataStreamer.java:128)
>>>>>>>         at net.juniper.cs.cache.KafkaCach
>>>>>>> eDataStreamer$1.run(KafkaCacheDataStreamer.java:176)
>>>>>>>         at java.util.concurrent.Executors
>>>>>>> $RunnableAdapter.call(Executors.java:511)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> addMessage method is
>>>>>>>
>>>>>>>  @Override
>>>>>>>     protected void addMessage(T msg) {
>>>>>>>     if (getMultipleTupleExtractor() == null){
>>>>>>>             Map.Entry<K, V> e = getSingleTupleExtractor().extr
>>>>>>> act(msg);
>>>>>>>
>>>>>>>             if (e != null)
>>>>>>>                 getStreamer().addData(e);
>>>>>>>
>>>>>>>         } else {
>>>>>>>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>>>>>>             if (m != null && !m.isEmpty()){
>>>>>>>                 getStreamer().addData(m);
>>>>>>>             }
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>> Do you see any issue ? Please let me know if you need any additional
>>>>>>> information. thanks.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Vladislav Pyatkov
>

Reply via email to