HI Anton,

Thanks for responding. i will check if i can reproduce with issue with
reproducer.

I had to implement my own kafka streamer because of
https://issues.apache.org/jira/browse/IGNITE-4140

I suspect there is a problem when node rejoins the cluster and streamer is
already closed and not recreated. Correct ?

In the above case, kafka streamer tries to getStreamer and push the data
but streamer is not available.

Thanks.



On 11 November 2016 at 14:00, Anton Vinogradov <a...@apache.org> wrote:

> Anil,
>
> Unfortunately,
>   at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
> heDataStreamer.java:149)
> does not fits on attached sources.
>
> But,
> java.lang.IllegalStateException: Cache has been closed or destroyed:
> PERSON_CACHE
> is a reason of closed datastreamer.
>
> It it possible to write reproducible example or to attach both (full, all)
> logs and sourcess?
>
>
> BTW, we already have Kafka streamer, why you decided to reimplement it?
>
>
>
> On Wed, Nov 9, 2016 at 5:39 PM, Anil <anilk...@gmail.com> wrote:
>
>> Would there be any issues because of size of data ?
>> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 GB
>> RAM configuration.
>>
>> and cache configuration -
>>
>> CacheConfiguration<String, Person> pConfig = new
>> CacheConfiguration<String, Person>();
>> pConfig.setName("Person_Cache");
>> pConfig.setIndexedTypes(String.class, Person.class);
>> pConfig.setBackups(1);
>> pConfig.setCacheMode(CacheMode.PARTITIONED);
>> pConfig.setCopyOnRead(false);
>> pConfig.setSwapEnabled(true);
>> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
>> pConfig.setSqlOnheapRowCacheSize(100_000);
>> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024);
>> pConfig.setStartSize(2000000);
>> pConfig.setStatisticsEnabled(true);
>>
>> Thanks for your help.
>>
>> On 9 November 2016 at 19:56, Anil <anilk...@gmail.com> wrote:
>>
>>> HI,
>>>
>>> Data streamer closed exception is very frequent. I did not see any
>>> explicit errors/exception about data streamer close. the excption i see
>>> only when message is getting added.
>>>
>>> I have 4 node ignite cluster and each node have consumer to connection
>>> and push the message received to streamer.
>>>
>>> What if the node is down and re-joined when message is getting added
>>> cache.
>>>
>>> Following is the exception from logs -
>>>
>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 -
>>> Exception while adding to streamer
>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.addData(DataStreamerImpl.java:507)
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.addData(DataStreamerImpl.java:498)
>>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>>> heDataStreamer.java:140)
>>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>>> aStreamer.java:197)
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 -
>>> Message is ignored due to an error 
>>> [msg=MessageAndMetadata(TestTopic,1,Message(magic
>>> = 0, attributes = 0, crc = 2111790081, key = null, payload =
>>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ
>>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)]
>>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>>> PERSON_CACHE
>>>         at org.apache.ignite.internal.processors.cache.GridCacheGateway
>>> .enter(GridCacheGateway.java:160)
>>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>>> .onEnter(IgniteCacheProxy.java:2103)
>>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>>> .size(IgniteCacheProxy.java:826)
>>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>>> heDataStreamer.java:149)
>>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>>> aStreamer.java:197)
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have attached the KafkaCacheDataStreamer class and let me know if you
>>> need any additional details. thanks.
>>>
>>>
>>
>

Reply via email to