Regarding data streamer - i will reproduce and share the complete code. Regarding cache size() - i understood it completely now. i set auto flush interval and it looks good now.
Thanks to both of you. On 3 November 2016 at 17:41, Vladislav Pyatkov <[email protected]> wrote: > Hi Anil, > > Please attach whole example. > I can not found where is DataStreamer was closed in your case (Where is > method o.a.i.IgniteDataStreamer#close() was invoked?). > > About cache size, It does not means anything, because Streamer caches > entries and sends as batch. > And yes, check each future, as Anton said, is a good point. > > On Thu, Nov 3, 2016 at 3:05 PM, Anton Vinogradov <[email protected] > > wrote: > >> Anil, >> >> getStreamer().addData() will return you IgniteFuture. You can check it >> result by fut.get(). >> get() will give you null in case data streamed and stored or throw an >> exception. >> >> On Thu, Nov 3, 2016 at 2:32 PM, Anil <[email protected]> wrote: >> >>> >>> Yes,. that is only exception i see in logs. i will try the debug option. >>> thanks. >>> >>> though data streamer is not returning exception all the time, >>> IgniteCache#size() remains empty all the time. It weird. >>> >>> 1. >>> for (Map.Entry<K, V> entry : m.entrySet()){ >>> getStreamer().addData(entry.getKey(), entry.getValue()); >>> >>> } >>> >>> 2. >>> >>> for (Map.Entry<K, V> entry : m.entrySet()){ >>> cache.put((String)entry.getKey(), (Person) >>> entry.getValue()); >>> } >>> >>> 3. >>> for (Map.Entry<K, V> entry : m.entrySet()){ >>> cache.replace((String)entry.getKey(), (Person) >>> entry.getValue()); >>> } >>> >>> >>> cache size with #1 & #3 is 0 >>> cache size with #2 is 1 as expected. >>> >>> Have you see similar issue before ? >>> >>> Thanks >>> >>> >>> >>> On 3 November 2016 at 16:33, Anton Vinogradov <[email protected]> >>> wrote: >>> >>>> Anil, >>>> >>>> Is it first and only exception at logs? >>>> >>>> Is it possible to debud this? >>>> You can set breakpoint at first line of org.apache.ignite.internal.pro >>>> cessors.datastreamer.DataStreamerImpl#closeEx(boolean, >>>> org.apache.ignite.IgniteCheckedException) >>>> This will give you information who stopping the datastreamer. >>>> >>>> On Thu, Nov 3, 2016 at 1:41 PM, Anil <[email protected]> wrote: >>>> >>>>> Hi Anton, >>>>> No. ignite nodes looks good. >>>>> >>>>> I have attached my KafkaCacheDataStreamer class and following is the >>>>> code to listen to the kafka topic. IgniteCache is created using java >>>>> configuration. >>>>> >>>>> I see cache size is zero after adding the entries to cache as well >>>>> from KafkaCacheDataStreamer. Not sure how to log whether the entries added >>>>> to cache or not. >>>>> >>>>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new >>>>> KafkaCacheDataStreamer<String, String, Person>(); >>>>> >>>>> Properites pros = new Properites() // kafka properties >>>>> ConsumerConfig consumerConfig = new ConsumerConfig(props); >>>>> >>>>> try { >>>>> IgniteDataStreamer<String, Person> stmr = >>>>> ignite.dataStreamer(CacheManager.PERSON_CACHE); >>>>> // allow overwriting cache data >>>>> stmr.allowOverwrite(true); >>>>> >>>>> kafkaStreamer.setIgnite(ignite); >>>>> kafkaStreamer.setStreamer(stmr); >>>>> >>>>> // set the topic >>>>> kafkaStreamer.setTopic(kafkaConfig.getString("topic", >>>>> "TestTopic")); >>>>> >>>>> // set the number of threads to process Kafka streams >>>>> kafkaStreamer.setThreads(1); >>>>> >>>>> // set Kafka consumer configurations >>>>> kafkaStreamer.setConsumerConfig(consumerConfig); >>>>> >>>>> // set decoders >>>>> kafkaStreamer.setKeyDecoder(new StringDecoder(new >>>>> VerifiableProperties())); >>>>> kafkaStreamer.setValueDecoder(new StringDecoder(new >>>>> VerifiableProperties())); >>>>> kafkaStreamer.setMultipleTupleExtractor(new >>>>> StreamMultipleTupleExtractor<String, String, Person>() { >>>>> @Override >>>>> public Map<String, Person> extract(String msg) { >>>>> Map<String, Person> entries = new HashMap<>(); >>>>> try { >>>>> KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class); >>>>> IgniteCache<String, Person> cache = CacheManager.getCache(); >>>>> >>>>> if (CollectionUtils.isNotEmpty(request.getPersons())){ >>>>> String id = null; >>>>> for (Person ib : request.getPersons()){ >>>>> if (StringUtils.isNotBlank(ib.getId())){ >>>>> id = ib.getId(); >>>>> if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDelet >>>>> ed())){ >>>>> cache.remove(id); >>>>> }else { >>>>> // no need to store the id. so setting null. >>>>> ib.setId(null); >>>>> entries.put(id, ib); >>>>> } >>>>> } >>>>> } >>>>> }else { >>>>> >>>>> } >>>>> }catch (Exception ex){ >>>>> logger.error("Error while updating the cache - {} {} " ,msg, ex); >>>>> } >>>>> >>>>> return entries; >>>>> } >>>>> }); >>>>> >>>>> kafkaStreamer.start(); >>>>> }catch (Exception ex){ >>>>> logger.error("Error in kafka data streamer ", ex); >>>>> } >>>>> >>>>> >>>>> Please let me know if you see any issues. thanks. >>>>> >>>>> On 3 November 2016 at 15:59, Anton Vinogradov < >>>>> [email protected]> wrote: >>>>> >>>>>> Anil, >>>>>> >>>>>> Could you provide getStreamer() code and full logs? >>>>>> Possible, ignite node was disconnected and this cause DataStreamer >>>>>> closure. >>>>>> >>>>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote: >>>>>> >>>>>>> HI, >>>>>>> >>>>>>> I have created custom kafka data streamer for my use case and i see >>>>>>> following exception. >>>>>>> >>>>>>> java.lang.IllegalStateException: Data streamer has been closed. >>>>>>> at org.apache.ignite.internal.pro >>>>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer >>>>>>> Impl.java:360) >>>>>>> at org.apache.ignite.internal.pro >>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>>>> pl.java:507) >>>>>>> at org.apache.ignite.internal.pro >>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>>>> pl.java:498) >>>>>>> at net.juniper.cs.cache.KafkaCach >>>>>>> eDataStreamer.addMessage(KafkaCacheDataStreamer.java:128) >>>>>>> at net.juniper.cs.cache.KafkaCach >>>>>>> eDataStreamer$1.run(KafkaCacheDataStreamer.java:176) >>>>>>> at java.util.concurrent.Executors >>>>>>> $RunnableAdapter.call(Executors.java:511) >>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>> at java.util.concurrent.ThreadPoo >>>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> at java.util.concurrent.ThreadPoo >>>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> >>>>>>> >>>>>>> addMessage method is >>>>>>> >>>>>>> @Override >>>>>>> protected void addMessage(T msg) { >>>>>>> if (getMultipleTupleExtractor() == null){ >>>>>>> Map.Entry<K, V> e = getSingleTupleExtractor().extr >>>>>>> act(msg); >>>>>>> >>>>>>> if (e != null) >>>>>>> getStreamer().addData(e); >>>>>>> >>>>>>> } else { >>>>>>> Map<K, V> m = getMultipleTupleExtractor().extract(msg); >>>>>>> if (m != null && !m.isEmpty()){ >>>>>>> getStreamer().addData(m); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Do you see any issue ? Please let me know if you need any additional >>>>>>> information. thanks. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > > > -- > Vladislav Pyatkov >
