Anil,

Is it first and only exception at logs?

Is it possible to debud this?
You can set breakpoint at first line of
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl#closeEx(boolean,
org.apache.ignite.IgniteCheckedException)
This will give you information who stopping the datastreamer.

On Thu, Nov 3, 2016 at 1:41 PM, Anil <anilk...@gmail.com> wrote:

> Hi Anton,
> No. ignite nodes looks good.
>
> I have attached my KafkaCacheDataStreamer class and following is the code
> to listen to the kafka topic. IgniteCache is created using java
> configuration.
>
> I see cache size is zero after adding the entries to cache as well from
> KafkaCacheDataStreamer. Not sure how to log whether the entries added to
> cache or not.
>
> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
> KafkaCacheDataStreamer<String, String, Person>();
>
>  Properites pros = new Properites() // kafka properties
>  ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
>     try {
>     IgniteDataStreamer<String, Person> stmr = ignite.dataStreamer(
> CacheManager.PERSON_CACHE);
>        // allow overwriting cache data
>        stmr.allowOverwrite(true);
>
>        kafkaStreamer.setIgnite(ignite);
>        kafkaStreamer.setStreamer(stmr);
>
>        // set the topic
>        kafkaStreamer.setTopic(kafkaConfig.getString("topic",
> "TestTopic"));
>
>        // set the number of threads to process Kafka streams
>        kafkaStreamer.setThreads(1);
>
>        // set Kafka consumer configurations
>        kafkaStreamer.setConsumerConfig(consumerConfig);
>
>        // set decoders
>        kafkaStreamer.setKeyDecoder(new StringDecoder(new
> VerifiableProperties()));
>        kafkaStreamer.setValueDecoder(new StringDecoder(new
> VerifiableProperties()));
>        kafkaStreamer.setMultipleTupleExtractor(new
> StreamMultipleTupleExtractor<String, String, Person>() {
>     @Override
>     public Map<String, Person> extract(String msg) {
>     Map<String, Person> entries = new HashMap<>();
>     try {
>     KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
>     IgniteCache<String, Person> cache = CacheManager.getCache();
>
>     if (CollectionUtils.isNotEmpty(request.getPersons())){
>     String id = null;
>     for (Person ib : request.getPersons()){
>     if (StringUtils.isNotBlank(ib.getId())){
>     id = ib.getId();
>     if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
>     cache.remove(id);
>     }else {
>     // no need to store the id. so setting null.
>     ib.setId(null);
>     entries.put(id, ib);
>     }
>     }
>     }
>     }else {
>
>     }
>     }catch (Exception ex){
>     logger.error("Error while updating the cache - {} {} " ,msg, ex);
>     }
>
>     return entries;
>     }
>     });
>
>        kafkaStreamer.start();
>     }catch (Exception ex){
>     logger.error("Error in kafka data streamer ", ex);
>     }
>
>
> Please let me know if you see any issues. thanks.
>
> On 3 November 2016 at 15:59, Anton Vinogradov <avinogra...@gridgain.com>
> wrote:
>
>> Anil,
>>
>> Could you provide getStreamer() code and full logs?
>> Possible, ignite node was disconnected and this cause DataStreamer
>> closure.
>>
>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilk...@gmail.com> wrote:
>>
>>> HI,
>>>
>>> I have created custom kafka data streamer for my use case and i see
>>> following exception.
>>>
>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.addData(DataStreamerImpl.java:507)
>>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.addData(DataStreamerImpl.java:498)
>>>         at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(Kafka
>>> CacheDataStreamer.java:128)
>>>         at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCache
>>> DataStreamer.java:176)
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executor
>>> s.java:511)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> addMessage method is
>>>
>>>  @Override
>>>     protected void addMessage(T msg) {
>>>     if (getMultipleTupleExtractor() == null){
>>>             Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);
>>>
>>>             if (e != null)
>>>                 getStreamer().addData(e);
>>>
>>>         } else {
>>>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>>             if (m != null && !m.isEmpty()){
>>>                 getStreamer().addData(m);
>>>             }
>>>         }
>>>     }
>>>
>>>
>>> Do you see any issue ? Please let me know if you need any additional
>>> information. thanks.
>>>
>>> Thanks.
>>>
>>
>>
>

Reply via email to