HI Anton,

Sounds perfect !

#1 - i will reproduce and share you the logs. i have project commitment for
coming Monday
#2 - I will work on this from coming Tuesday.

Thanks

On 11 November 2016 at 14:25, Anton Vinogradov <[email protected]>
wrote:

> Anil,
>
>
>> I suspect there is a problem when node rejoins the cluster and streamer
>> is already closed and not recreated. Correct ?
>
>
> Correct, this seems to be linked somehow. I need logs and sourcess to tell
> more.
>
> I had to implement my own kafka streamer because of
>> https://issues.apache.org/jira/browse/IGNITE-4140
>
>
> I'd like to propose you to refactor streamer according to this issue and
> contribute solution. I can help you with tips and review.
> Sounds good?
>
> On Fri, Nov 11, 2016 at 11:41 AM, Anil <[email protected]> wrote:
>
>> HI Anton,
>>
>> Thanks for responding. i will check if i can reproduce with issue with
>> reproducer.
>>
>> I had to implement my own kafka streamer because of
>> https://issues.apache.org/jira/browse/IGNITE-4140
>>
>> I suspect there is a problem when node rejoins the cluster and streamer
>> is already closed and not recreated. Correct ?
>>
>> In the above case, kafka streamer tries to getStreamer and push the data
>> but streamer is not available.
>>
>> Thanks.
>>
>>
>>
>> On 11 November 2016 at 14:00, Anton Vinogradov <[email protected]> wrote:
>>
>>> Anil,
>>>
>>> Unfortunately,
>>>   at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>>> heDataStreamer.java:149)
>>> does not fits on attached sources.
>>>
>>> But,
>>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>>> PERSON_CACHE
>>> is a reason of closed datastreamer.
>>>
>>> It it possible to write reproducible example or to attach both (full,
>>> all) logs and sourcess?
>>>
>>>
>>> BTW, we already have Kafka streamer, why you decided to reimplement it?
>>>
>>>
>>>
>>> On Wed, Nov 9, 2016 at 5:39 PM, Anil <[email protected]> wrote:
>>>
>>>> Would there be any issues because of size of data ?
>>>> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32
>>>> GB RAM configuration.
>>>>
>>>> and cache configuration -
>>>>
>>>> CacheConfiguration<String, Person> pConfig = new
>>>> CacheConfiguration<String, Person>();
>>>> pConfig.setName("Person_Cache");
>>>> pConfig.setIndexedTypes(String.class, Person.class);
>>>> pConfig.setBackups(1);
>>>> pConfig.setCacheMode(CacheMode.PARTITIONED);
>>>> pConfig.setCopyOnRead(false);
>>>> pConfig.setSwapEnabled(true);
>>>> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
>>>> pConfig.setSqlOnheapRowCacheSize(100_000);
>>>> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024);
>>>> pConfig.setStartSize(2000000);
>>>> pConfig.setStatisticsEnabled(true);
>>>>
>>>> Thanks for your help.
>>>>
>>>> On 9 November 2016 at 19:56, Anil <[email protected]> wrote:
>>>>
>>>>> HI,
>>>>>
>>>>> Data streamer closed exception is very frequent. I did not see any
>>>>> explicit errors/exception about data streamer close. the excption i see
>>>>> only when message is getting added.
>>>>>
>>>>> I have 4 node ignite cluster and each node have consumer to connection
>>>>> and push the message received to streamer.
>>>>>
>>>>> What if the node is down and re-joined when message is getting added
>>>>> cache.
>>>>>
>>>>> Following is the exception from logs -
>>>>>
>>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 -
>>>>> Exception while adding to streamer
>>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer
>>>>> Impl.java:360)
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>> pl.java:507)
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>> pl.java:498)
>>>>>         at com.test.cs.cache.KafkaCacheDa
>>>>> taStreamer.addMessage(KafkaCacheDataStreamer.java:140)
>>>>>         at com.test.cs.cache.KafkaCacheDa
>>>>> taStreamer$1.run(KafkaCacheDataStreamer.java:197)
>>>>>         at java.util.concurrent.Executors
>>>>> $RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 -
>>>>> Message is ignored due to an error 
>>>>> [msg=MessageAndMetadata(TestTopic,1,Message(magic
>>>>> = 0, attributes = 0, crc = 2111790081, key = null, payload =
>>>>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ
>>>>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)]
>>>>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>>>>> PERSON_CACHE
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.cache.GridCacheGateway.enter(GridCacheGateway.java:160)
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.cache.IgniteCacheProxy.onEnter(IgniteCacheProxy.java:2103)
>>>>>         at org.apache.ignite.internal.pro
>>>>> cessors.cache.IgniteCacheProxy.size(IgniteCacheProxy.java:826)
>>>>>         at com.test.cs.cache.KafkaCacheDa
>>>>> taStreamer.addMessage(KafkaCacheDataStreamer.java:149)
>>>>>         at com.test.cs.cache.KafkaCacheDa
>>>>> taStreamer$1.run(KafkaCacheDataStreamer.java:197)
>>>>>         at java.util.concurrent.Executors
>>>>> $RunnableAdapter.call(Executors.java:511)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I have attached the KafkaCacheDataStreamer class and let me know if
>>>>> you need any additional details. thanks.
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to