You certainly have other nodes running somewhere, and they get picked up
when you run examples, yielding incorrect results.
Try adding this right after Ignition.Start:

var nodeCount = ignite.GetCluster().GetNodes().Count;
if (nodeCount > 1)
{
    throw new Exception("Unexpected node count: " + nodeCount);
}


On Tue, May 31, 2022 at 5:30 PM Charlin S <[email protected]> wrote:

> Hi,
> Thanks for the reply,
> I have tried the same code and got an error
> message org.apache.ignite.IgniteCheckedException: Failed to finish
> operation (too many remaps): 32
> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>
> Regards,
> Charlin
>
>
>
> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <[email protected]> wrote:
>
>> If you run multiple nodes in the cluster, the receiver may be invoked on
>> another node, so the breakpoint is not reached.
>> I've simplified yor code a bit and it works as expected:
>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>
>> On Mon, May 30, 2022 at 11:22 AM Charlin S <[email protected]>
>> wrote:
>>
>>> Hi,
>>> Thanks for the reply,
>>> First option working for me by creating a cache instance with expiry
>>> policy just before datastreamer.
>>> My curiosity with datastreamer and receiver also.
>>>
>>> no build error with new changes, but application not working as
>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>
>>> using (var cacheDataStreamer =
>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>> T>(cacheName))
>>>                 {
>>>                     cacheDataStreamer.AllowOverwrite = true;
>>>                     cacheDataStreamer.Receiver = new
>>> MyStreamReceiver<T>();
>>>                     foreach (var item in data)
>>>                     {
>>>                         string cacheKey = item.Key;
>>>                         int index = cacheKey.IndexOf("Model:");
>>>                         if (index > 0)
>>>                             cacheKey = cacheKey.Insert(index +
>>> "Model:".Length, CacheKeyDefault);
>>>                         else
>>>                             cacheKey = CacheKeyDefault + cacheKey;
>>>                          cacheDataStreamer.AddData(cacheName + ":" +
>>> cacheKey, item.Value);
>>>
>>>
>>>                     }
>>>                     cacheDataStreamer.Flush();
>>>                 }
>>>
>>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>     {
>>>         public void Receive(ICache<string, T> cache,
>>> ICollection<ICacheEntry<string, T>> entries)
>>>         {
>>>             foreach (var entry in entries)
>>>             {
>>>                 cache.WithExpiryPolicy(new
>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>> entry.Value);
>>>             }
>>>         }
>>>     }
>>>
>>> Regards,
>>> Charlin
>>>
>>>
>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <[email protected]>
>>> wrote:
>>>
>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>> inserted with DataStreamer are also affected,
>>>> see
>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>
>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>> Try changing
>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>> to
>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>
>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <[email protected]>
>>>> wrote:
>>>>
>>>>> We have a requirement to set data to expire after some time.
>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>> default policies.
>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>> solution.
>>>>>
>>>>> IStreamReceiver  Code:
>>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>>     {
>>>>>         public void Receive(ICache<string, object> cache,
>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>>         {
>>>>>             foreach (var entry in entries)
>>>>>             {
>>>>>                 cache.WithExpiryPolicy(new
>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>> entry.Value);
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>> Datastreamer code error
>>>>> [image: image.png]
>>>>>
>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>> Regards,
>>>>> Charlin
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to