Yes,. that is only exception i see in logs. i will try the debug option.
thanks.
though data streamer is not returning exception all the time,
IgniteCache#size() remains empty all the time. It weird.
1.
for (Map.Entry<K, V> entry : m.entrySet()){
getStreamer().addData(entry.getKey(), entry.getValue());
}
2.
for (Map.Entry<K, V> entry : m.entrySet()){
cache.put((String)entry.getKey(), (Person)
entry.getValue());
}
3.
for (Map.Entry<K, V> entry : m.entrySet()){
cache.replace((String)entry.getKey(), (Person)
entry.getValue());
}
cache size with #1 & #3 is 0
cache size with #2 is 1 as expected.
Have you see similar issue before ?
Thanks
On 3 November 2016 at 16:33, Anton Vinogradov <[email protected]>
wrote:
> Anil,
>
> Is it first and only exception at logs?
>
> Is it possible to debud this?
> You can set breakpoint at first line of org.apache.ignite.internal.
> processors.datastreamer.DataStreamerImpl#closeEx(boolean,
> org.apache.ignite.IgniteCheckedException)
> This will give you information who stopping the datastreamer.
>
> On Thu, Nov 3, 2016 at 1:41 PM, Anil <[email protected]> wrote:
>
>> Hi Anton,
>> No. ignite nodes looks good.
>>
>> I have attached my KafkaCacheDataStreamer class and following is the code
>> to listen to the kafka topic. IgniteCache is created using java
>> configuration.
>>
>> I see cache size is zero after adding the entries to cache as well from
>> KafkaCacheDataStreamer. Not sure how to log whether the entries added to
>> cache or not.
>>
>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
>> KafkaCacheDataStreamer<String, String, Person>();
>>
>> Properites pros = new Properites() // kafka properties
>> ConsumerConfig consumerConfig = new ConsumerConfig(props);
>>
>> try {
>> IgniteDataStreamer<String, Person> stmr =
>> ignite.dataStreamer(CacheManager.PERSON_CACHE);
>> // allow overwriting cache data
>> stmr.allowOverwrite(true);
>>
>> kafkaStreamer.setIgnite(ignite);
>> kafkaStreamer.setStreamer(stmr);
>>
>> // set the topic
>> kafkaStreamer.setTopic(kafkaConfig.getString("topic",
>> "TestTopic"));
>>
>> // set the number of threads to process Kafka streams
>> kafkaStreamer.setThreads(1);
>>
>> // set Kafka consumer configurations
>> kafkaStreamer.setConsumerConfig(consumerConfig);
>>
>> // set decoders
>> kafkaStreamer.setKeyDecoder(new StringDecoder(new
>> VerifiableProperties()));
>> kafkaStreamer.setValueDecoder(new StringDecoder(new
>> VerifiableProperties()));
>> kafkaStreamer.setMultipleTupleExtractor(new
>> StreamMultipleTupleExtractor<String, String, Person>() {
>> @Override
>> public Map<String, Person> extract(String msg) {
>> Map<String, Person> entries = new HashMap<>();
>> try {
>> KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
>> IgniteCache<String, Person> cache = CacheManager.getCache();
>>
>> if (CollectionUtils.isNotEmpty(request.getPersons())){
>> String id = null;
>> for (Person ib : request.getPersons()){
>> if (StringUtils.isNotBlank(ib.getId())){
>> id = ib.getId();
>> if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
>> cache.remove(id);
>> }else {
>> // no need to store the id. so setting null.
>> ib.setId(null);
>> entries.put(id, ib);
>> }
>> }
>> }
>> }else {
>>
>> }
>> }catch (Exception ex){
>> logger.error("Error while updating the cache - {} {} " ,msg, ex);
>> }
>>
>> return entries;
>> }
>> });
>>
>> kafkaStreamer.start();
>> }catch (Exception ex){
>> logger.error("Error in kafka data streamer ", ex);
>> }
>>
>>
>> Please let me know if you see any issues. thanks.
>>
>> On 3 November 2016 at 15:59, Anton Vinogradov <[email protected]>
>> wrote:
>>
>>> Anil,
>>>
>>> Could you provide getStreamer() code and full logs?
>>> Possible, ignite node was disconnected and this cause DataStreamer
>>> closure.
>>>
>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <[email protected]> wrote:
>>>
>>>> HI,
>>>>
>>>> I have created custom kafka data streamer for my use case and i see
>>>> following exception.
>>>>
>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:507)
>>>> at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>>> merImpl.addData(DataStreamerImpl.java:498)
>>>> at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka
>>>> CacheDataStreamer.java:128)
>>>> at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache
>>>> DataStreamer.java:176)
>>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>>> s.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>
>>>> addMessage method is
>>>>
>>>> @Override
>>>> protected void addMessage(T msg) {
>>>> if (getMultipleTupleExtractor() == null){
>>>> Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>>>>
>>>> if (e != null)
>>>> getStreamer().addData(e);
>>>>
>>>> } else {
>>>> Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>>> if (m != null && !m.isEmpty()){
>>>> getStreamer().addData(m);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> Do you see any issue ? Please let me know if you need any additional
>>>> information. thanks.
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>