HI Anton, Sounds perfect !
#1 - i will reproduce and share you the logs. i have project commitment for coming Monday #2 - I will work on this from coming Tuesday. Thanks On 11 November 2016 at 14:25, Anton Vinogradov <[email protected]> wrote: > Anil, > > >> I suspect there is a problem when node rejoins the cluster and streamer >> is already closed and not recreated. Correct ? > > > Correct, this seems to be linked somehow. I need logs and sourcess to tell > more. > > I had to implement my own kafka streamer because of >> https://issues.apache.org/jira/browse/IGNITE-4140 > > > I'd like to propose you to refactor streamer according to this issue and > contribute solution. I can help you with tips and review. > Sounds good? > > On Fri, Nov 11, 2016 at 11:41 AM, Anil <[email protected]> wrote: > >> HI Anton, >> >> Thanks for responding. i will check if i can reproduce with issue with >> reproducer. >> >> I had to implement my own kafka streamer because of >> https://issues.apache.org/jira/browse/IGNITE-4140 >> >> I suspect there is a problem when node rejoins the cluster and streamer >> is already closed and not recreated. Correct ? >> >> In the above case, kafka streamer tries to getStreamer and push the data >> but streamer is not available. >> >> Thanks. >> >> >> >> On 11 November 2016 at 14:00, Anton Vinogradov <[email protected]> wrote: >> >>> Anil, >>> >>> Unfortunately, >>> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >>> heDataStreamer.java:149) >>> does not fits on attached sources. >>> >>> But, >>> java.lang.IllegalStateException: Cache has been closed or destroyed: >>> PERSON_CACHE >>> is a reason of closed datastreamer. >>> >>> It it possible to write reproducible example or to attach both (full, >>> all) logs and sourcess? >>> >>> >>> BTW, we already have Kafka streamer, why you decided to reimplement it? >>> >>> >>> >>> On Wed, Nov 9, 2016 at 5:39 PM, Anil <[email protected]> wrote: >>> >>>> Would there be any issues because of size of data ? >>>> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 >>>> GB RAM configuration. >>>> >>>> and cache configuration - >>>> >>>> CacheConfiguration<String, Person> pConfig = new >>>> CacheConfiguration<String, Person>(); >>>> pConfig.setName("Person_Cache"); >>>> pConfig.setIndexedTypes(String.class, Person.class); >>>> pConfig.setBackups(1); >>>> pConfig.setCacheMode(CacheMode.PARTITIONED); >>>> pConfig.setCopyOnRead(false); >>>> pConfig.setSwapEnabled(true); >>>> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); >>>> pConfig.setSqlOnheapRowCacheSize(100_000); >>>> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024); >>>> pConfig.setStartSize(2000000); >>>> pConfig.setStatisticsEnabled(true); >>>> >>>> Thanks for your help. >>>> >>>> On 9 November 2016 at 19:56, Anil <[email protected]> wrote: >>>> >>>>> HI, >>>>> >>>>> Data streamer closed exception is very frequent. I did not see any >>>>> explicit errors/exception about data streamer close. the excption i see >>>>> only when message is getting added. >>>>> >>>>> I have 4 node ignite cluster and each node have consumer to connection >>>>> and push the message received to streamer. >>>>> >>>>> What if the node is down and re-joined when message is getting added >>>>> cache. >>>>> >>>>> Following is the exception from logs - >>>>> >>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 - >>>>> Exception while adding to streamer >>>>> java.lang.IllegalStateException: Data streamer has been closed. >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer >>>>> Impl.java:360) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>> pl.java:507) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>> pl.java:498) >>>>> at com.test.cs.cache.KafkaCacheDa >>>>> taStreamer.addMessage(KafkaCacheDataStreamer.java:140) >>>>> at com.test.cs.cache.KafkaCacheDa >>>>> taStreamer$1.run(KafkaCacheDataStreamer.java:197) >>>>> at java.util.concurrent.Executors >>>>> $RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 - >>>>> Message is ignored due to an error >>>>> [msg=MessageAndMetadata(TestTopic,1,Message(magic >>>>> = 0, attributes = 0, crc = 2111790081, key = null, payload = >>>>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ >>>>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)] >>>>> java.lang.IllegalStateException: Cache has been closed or destroyed: >>>>> PERSON_CACHE >>>>> at org.apache.ignite.internal.pro >>>>> cessors.cache.GridCacheGateway.enter(GridCacheGateway.java:160) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.cache.IgniteCacheProxy.onEnter(IgniteCacheProxy.java:2103) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.cache.IgniteCacheProxy.size(IgniteCacheProxy.java:826) >>>>> at com.test.cs.cache.KafkaCacheDa >>>>> taStreamer.addMessage(KafkaCacheDataStreamer.java:149) >>>>> at com.test.cs.cache.KafkaCacheDa >>>>> taStreamer$1.run(KafkaCacheDataStreamer.java:197) >>>>> at java.util.concurrent.Executors >>>>> $RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> I have attached the KafkaCacheDataStreamer class and let me know if >>>>> you need any additional details. thanks. >>>>> >>>>> >>>> >>> >> >
