Anil, Is it first and only exception at logs?
Is it possible to debud this? You can set breakpoint at first line of org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl#closeEx(boolean, org.apache.ignite.IgniteCheckedException) This will give you information who stopping the datastreamer. On Thu, Nov 3, 2016 at 1:41 PM, Anil <anilk...@gmail.com> wrote: > Hi Anton, > No. ignite nodes looks good. > > I have attached my KafkaCacheDataStreamer class and following is the code > to listen to the kafka topic. IgniteCache is created using java > configuration. > > I see cache size is zero after adding the entries to cache as well from > KafkaCacheDataStreamer. Not sure how to log whether the entries added to > cache or not. > > KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new > KafkaCacheDataStreamer<String, String, Person>(); > > Properites pros = new Properites() // kafka properties > ConsumerConfig consumerConfig = new ConsumerConfig(props); > > try { > IgniteDataStreamer<String, Person> stmr = ignite.dataStreamer( > CacheManager.PERSON_CACHE); > // allow overwriting cache data > stmr.allowOverwrite(true); > > kafkaStreamer.setIgnite(ignite); > kafkaStreamer.setStreamer(stmr); > > // set the topic > kafkaStreamer.setTopic(kafkaConfig.getString("topic", > "TestTopic")); > > // set the number of threads to process Kafka streams > kafkaStreamer.setThreads(1); > > // set Kafka consumer configurations > kafkaStreamer.setConsumerConfig(consumerConfig); > > // set decoders > kafkaStreamer.setKeyDecoder(new StringDecoder(new > VerifiableProperties())); > kafkaStreamer.setValueDecoder(new StringDecoder(new > VerifiableProperties())); > kafkaStreamer.setMultipleTupleExtractor(new > StreamMultipleTupleExtractor<String, String, Person>() { > @Override > public Map<String, Person> extract(String msg) { > Map<String, Person> entries = new HashMap<>(); > try { > KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class); > IgniteCache<String, Person> cache = CacheManager.getCache(); > > if (CollectionUtils.isNotEmpty(request.getPersons())){ > String id = null; > for (Person ib : request.getPersons()){ > if (StringUtils.isNotBlank(ib.getId())){ > id = ib.getId(); > if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){ > cache.remove(id); > }else { > // no need to store the id. so setting null. > ib.setId(null); > entries.put(id, ib); > } > } > } > }else { > > } > }catch (Exception ex){ > logger.error("Error while updating the cache - {} {} " ,msg, ex); > } > > return entries; > } > }); > > kafkaStreamer.start(); > }catch (Exception ex){ > logger.error("Error in kafka data streamer ", ex); > } > > > Please let me know if you see any issues. thanks. > > On 3 November 2016 at 15:59, Anton Vinogradov <avinogra...@gridgain.com> > wrote: > >> Anil, >> >> Could you provide getStreamer() code and full logs? >> Possible, ignite node was disconnected and this cause DataStreamer >> closure. >> >> On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilk...@gmail.com> wrote: >> >>> HI, >>> >>> I have created custom kafka data streamer for my use case and i see >>> following exception. >>> >>> java.lang.IllegalStateException: Data streamer has been closed. >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.enterBusy(DataStreamerImpl.java:360) >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.addData(DataStreamerImpl.java:507) >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.addData(DataStreamerImpl.java:498) >>> at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka >>> CacheDataStreamer.java:128) >>> at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache >>> DataStreamer.java:176) >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>> s.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> addMessage method is >>> >>> @Override >>> protected void addMessage(T msg) { >>> if (getMultipleTupleExtractor() == null){ >>> Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg); >>> >>> if (e != null) >>> getStreamer().addData(e); >>> >>> } else { >>> Map<K, V> m = getMultipleTupleExtractor().extract(msg); >>> if (m != null && !m.isEmpty()){ >>> getStreamer().addData(m); >>> } >>> } >>> } >>> >>> >>> Do you see any issue ? Please let me know if you need any additional >>> information. thanks. >>> >>> Thanks. >>> >> >> >