Anil,

Could you provide getStreamer() code and full logs?
Possible, ignite node was disconnected and this cause DataStreamer closure.

On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:

> HI,
>
> I have created custom kafka data streamer for my use case and i see
> following exception.
>
> java.lang.IllegalStateException: Data streamer has been closed.
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamerImpl.enterBusy(DataStreamerImpl.java:360)
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamerImpl.addData(DataStreamerImpl.java:507)
>         at org.apache.ignite.internal.processors.datastreamer.
> DataStreamerImpl.addData(DataStreamerImpl.java:498)
>         at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(
> KafkaCacheDataStreamer.java:128)
>         at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(
> KafkaCacheDataStreamer.java:176)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
> addMessage method is
>
>  @Override
>     protected void addMessage(T msg) {
>     if (getMultipleTupleExtractor() == null){
>             Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>
>             if (e != null)
>                 getStreamer().addData(e);
>
>         } else {
>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>             if (m != null && !m.isEmpty()){
>                 getStreamer().addData(m);
>             }
>         }
>     }
>
>
> Do you see any issue ? Please let me know if you need any additional
> information. thanks.
>
> Thanks.
>

Reply via email to