Anil,
> I suspect there is a problem when node rejoins the cluster and streamer is > already closed and not recreated. Correct ? Correct, this seems to be linked somehow. I need logs and sourcess to tell more. I had to implement my own kafka streamer because of > https://issues.apache.org/jira/browse/IGNITE-4140 I'd like to propose you to refactor streamer according to this issue and contribute solution. I can help you with tips and review. Sounds good? On Fri, Nov 11, 2016 at 11:41 AM, Anil <anilk...@gmail.com> wrote: > HI Anton, > > Thanks for responding. i will check if i can reproduce with issue with > reproducer. > > I had to implement my own kafka streamer because of > https://issues.apache.org/jira/browse/IGNITE-4140 > > I suspect there is a problem when node rejoins the cluster and streamer is > already closed and not recreated. Correct ? > > In the above case, kafka streamer tries to getStreamer and push the data > but streamer is not available. > > Thanks. > > > > On 11 November 2016 at 14:00, Anton Vinogradov <a...@apache.org> wrote: > >> Anil, >> >> Unfortunately, >> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >> heDataStreamer.java:149) >> does not fits on attached sources. >> >> But, >> java.lang.IllegalStateException: Cache has been closed or destroyed: >> PERSON_CACHE >> is a reason of closed datastreamer. >> >> It it possible to write reproducible example or to attach both (full, >> all) logs and sourcess? >> >> >> BTW, we already have Kafka streamer, why you decided to reimplement it? >> >> >> >> On Wed, Nov 9, 2016 at 5:39 PM, Anil <anilk...@gmail.com> wrote: >> >>> Would there be any issues because of size of data ? >>> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 GB >>> RAM configuration. >>> >>> and cache configuration - >>> >>> CacheConfiguration<String, Person> pConfig = new >>> CacheConfiguration<String, Person>(); >>> pConfig.setName("Person_Cache"); >>> pConfig.setIndexedTypes(String.class, Person.class); >>> pConfig.setBackups(1); >>> pConfig.setCacheMode(CacheMode.PARTITIONED); >>> pConfig.setCopyOnRead(false); >>> pConfig.setSwapEnabled(true); >>> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); >>> pConfig.setSqlOnheapRowCacheSize(100_000); >>> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024); >>> pConfig.setStartSize(2000000); >>> pConfig.setStatisticsEnabled(true); >>> >>> Thanks for your help. >>> >>> On 9 November 2016 at 19:56, Anil <anilk...@gmail.com> wrote: >>> >>>> HI, >>>> >>>> Data streamer closed exception is very frequent. I did not see any >>>> explicit errors/exception about data streamer close. the excption i see >>>> only when message is getting added. >>>> >>>> I have 4 node ignite cluster and each node have consumer to connection >>>> and push the message received to streamer. >>>> >>>> What if the node is down and re-joined when message is getting added >>>> cache. >>>> >>>> Following is the exception from logs - >>>> >>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 - >>>> Exception while adding to streamer >>>> java.lang.IllegalStateException: Data streamer has been closed. >>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>>> merImpl.enterBusy(DataStreamerImpl.java:360) >>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>>> merImpl.addData(DataStreamerImpl.java:507) >>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>>> merImpl.addData(DataStreamerImpl.java:498) >>>> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >>>> heDataStreamer.java:140) >>>> at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat >>>> aStreamer.java:197) >>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>>> s.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>>> Executor.java:1142) >>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>>> lExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 - >>>> Message is ignored due to an error >>>> [msg=MessageAndMetadata(TestTopic,1,Message(magic >>>> = 0, attributes = 0, crc = 2111790081, key = null, payload = >>>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ >>>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)] >>>> java.lang.IllegalStateException: Cache has been closed or destroyed: >>>> PERSON_CACHE >>>> at org.apache.ignite.internal.processors.cache.GridCacheGateway >>>> .enter(GridCacheGateway.java:160) >>>> at org.apache.ignite.internal.processors.cache.IgniteCacheProxy >>>> .onEnter(IgniteCacheProxy.java:2103) >>>> at org.apache.ignite.internal.processors.cache.IgniteCacheProxy >>>> .size(IgniteCacheProxy.java:826) >>>> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >>>> heDataStreamer.java:149) >>>> at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat >>>> aStreamer.java:197) >>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>>> s.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>>> Executor.java:1142) >>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>>> lExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I have attached the KafkaCacheDataStreamer class and let me know if you >>>> need any additional details. thanks. >>>> >>>> >>> >> >