Anil, Could you provide getStreamer() code and full logs? Possible, ignite node was disconnected and this cause DataStreamer closure.
On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote: > HI, > > I have created custom kafka data streamer for my use case and i see > following exception. > > java.lang.IllegalStateException: Data streamer has been closed. > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.enterBusy(DataStreamerImpl.java:360) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.addData(DataStreamerImpl.java:507) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.addData(DataStreamerImpl.java:498) > at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage( > KafkaCacheDataStreamer.java:128) > at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run( > KafkaCacheDataStreamer.java:176) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > addMessage method is > > @Override > protected void addMessage(T msg) { > if (getMultipleTupleExtractor() == null){ > Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg); > > if (e != null) > getStreamer().addData(e); > > } else { > Map<K, V> m = getMultipleTupleExtractor().extract(msg); > if (m != null && !m.isEmpty()){ > getStreamer().addData(m); > } > } > } > > > Do you see any issue ? Please let me know if you need any additional > information. thanks. > > Thanks. >
