Anil,

> I suspect there is a problem when node rejoins the cluster and streamer is
> already closed and not recreated. Correct ?


Correct, this seems to be linked somehow. I need logs and sourcess to tell
more.

I had to implement my own kafka streamer because of
> https://issues.apache.org/jira/browse/IGNITE-4140


I'd like to propose you to refactor streamer according to this issue and
contribute solution. I can help you with tips and review.
Sounds good?

On Fri, Nov 11, 2016 at 11:41 AM, Anil <anilk...@gmail.com> wrote:

> HI Anton,
>
> Thanks for responding. i will check if i can reproduce with issue with
> reproducer.
>
> I had to implement my own kafka streamer because of
> https://issues.apache.org/jira/browse/IGNITE-4140
>
> I suspect there is a problem when node rejoins the cluster and streamer is
> already closed and not recreated. Correct ?
>
> In the above case, kafka streamer tries to getStreamer and push the data
> but streamer is not available.
>
> Thanks.
>
>
>
> On 11 November 2016 at 14:00, Anton Vinogradov <a...@apache.org> wrote:
>
>> Anil,
>>
>> Unfortunately,
>>   at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>> heDataStreamer.java:149)
>> does not fits on attached sources.
>>
>> But,
>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>> PERSON_CACHE
>> is a reason of closed datastreamer.
>>
>> It it possible to write reproducible example or to attach both (full,
>> all) logs and sourcess?
>>
>>
>> BTW, we already have Kafka streamer, why you decided to reimplement it?
>>
>>
>>
>> On Wed, Nov 9, 2016 at 5:39 PM, Anil <anilk...@gmail.com> wrote:
>>
>>> Would there be any issues because of size of data ?
>>> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 GB
>>> RAM configuration.
>>>
>>> and cache configuration -
>>>
>>> CacheConfiguration<String, Person> pConfig = new
>>> CacheConfiguration<String, Person>();
>>> pConfig.setName("Person_Cache");
>>> pConfig.setIndexedTypes(String.class, Person.class);
>>> pConfig.setBackups(1);
>>> pConfig.setCacheMode(CacheMode.PARTITIONED);
>>> pConfig.setCopyOnRead(false);
>>> pConfig.setSwapEnabled(true);
>>> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
>>> pConfig.setSqlOnheapRowCacheSize(100_000);
>>> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024);
>>> pConfig.setStartSize(2000000);
>>> pConfig.setStatisticsEnabled(true);
>>>
>>> Thanks for your help.
>>>
>>> On 9 November 2016 at 19:56, Anil <anilk...@gmail.com> wrote:
>>>
>>>> HI,
>>>>
>>>> Data streamer closed exception is very frequent. I did not see any
>>>> explicit errors/exception about data streamer close. the excption i see
>>>> only when message is getting added.
>>>>
>>>> I have 4 node ignite cluster and each node have consumer to connection
>>>> and push the message received to streamer.
>>>>
>>>> What if the node is down and re-joined when message is getting added
>>>> cache.
>>>>
>>>> Following is the exception from logs -
>>>>
>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 -
>>>> Exception while adding to streamer
>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:507)
>>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:498)
>>>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>>>> heDataStreamer.java:140)
>>>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>>>> aStreamer.java:197)
>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>>> s.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 -
>>>> Message is ignored due to an error 
>>>> [msg=MessageAndMetadata(TestTopic,1,Message(magic
>>>> = 0, attributes = 0, crc = 2111790081, key = null, payload =
>>>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ
>>>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)]
>>>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>>>> PERSON_CACHE
>>>>         at org.apache.ignite.internal.processors.cache.GridCacheGateway
>>>> .enter(GridCacheGateway.java:160)
>>>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>>>> .onEnter(IgniteCacheProxy.java:2103)
>>>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>>>> .size(IgniteCacheProxy.java:826)
>>>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>>>> heDataStreamer.java:149)
>>>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>>>> aStreamer.java:197)
>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>>> s.java:511)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I have attached the KafkaCacheDataStreamer class and let me know if you
>>>> need any additional details. thanks.
>>>>
>>>>
>>>
>>
>

Reply via email to