HI,
I have created custom kafka data streamer for my use case and i see
following exception.
java.lang.IllegalStateException: Data streamer has been closed.
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:507)
at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:498)
at
net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:128)
at
net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:176)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
addMessage method is
@Override
protected void addMessage(T msg) {
if (getMultipleTupleExtractor() == null){
Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
if (e != null)
getStreamer().addData(e);
} else {
Map<K, V> m = getMultipleTupleExtractor().extract(msg);
if (m != null && !m.isEmpty()){
getStreamer().addData(m);
}
}
}
Do you see any issue ? Please let me know if you need any additional
information. thanks.
Thanks.