You certainly have other nodes running somewhere, and they get picked up
when you run examples, yielding incorrect results.
Try adding this right after Ignition.Start:
var nodeCount = ignite.GetCluster().GetNodes().Count;
if (nodeCount > 1)
{
throw new Exception("Unexpected node count: " + nodeCount);
}
On Tue, May 31, 2022 at 5:30 PM Charlin S <[email protected]> wrote:
> Hi,
> Thanks for the reply,
> I have tried the same code and got an error
> message org.apache.ignite.IgniteCheckedException: Failed to finish
> operation (too many remaps): 32
> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>
> Regards,
> Charlin
>
>
>
> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <[email protected]> wrote:
>
>> If you run multiple nodes in the cluster, the receiver may be invoked on
>> another node, so the breakpoint is not reached.
>> I've simplified yor code a bit and it works as expected:
>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>
>> On Mon, May 30, 2022 at 11:22 AM Charlin S <[email protected]>
>> wrote:
>>
>>> Hi,
>>> Thanks for the reply,
>>> First option working for me by creating a cache instance with expiry
>>> policy just before datastreamer.
>>> My curiosity with datastreamer and receiver also.
>>>
>>> no build error with new changes, but application not working as
>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>
>>> using (var cacheDataStreamer =
>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>> T>(cacheName))
>>> {
>>> cacheDataStreamer.AllowOverwrite = true;
>>> cacheDataStreamer.Receiver = new
>>> MyStreamReceiver<T>();
>>> foreach (var item in data)
>>> {
>>> string cacheKey = item.Key;
>>> int index = cacheKey.IndexOf("Model:");
>>> if (index > 0)
>>> cacheKey = cacheKey.Insert(index +
>>> "Model:".Length, CacheKeyDefault);
>>> else
>>> cacheKey = CacheKeyDefault + cacheKey;
>>> cacheDataStreamer.AddData(cacheName + ":" +
>>> cacheKey, item.Value);
>>>
>>>
>>> }
>>> cacheDataStreamer.Flush();
>>> }
>>>
>>> public class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>> {
>>> public void Receive(ICache<string, T> cache,
>>> ICollection<ICacheEntry<string, T>> entries)
>>> {
>>> foreach (var entry in entries)
>>> {
>>> cache.WithExpiryPolicy(new
>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>> entry.Value);
>>> }
>>> }
>>> }
>>>
>>> Regards,
>>> Charlin
>>>
>>>
>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <[email protected]>
>>> wrote:
>>>
>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>> inserted with DataStreamer are also affected,
>>>> see
>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>
>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>> Try changing
>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>> to
>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>
>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <[email protected]>
>>>> wrote:
>>>>
>>>>> We have a requirement to set data to expire after some time.
>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>> default policies.
>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>> solution.
>>>>>
>>>>> IStreamReceiver Code:
>>>>> public class MyStreamReceiver : IStreamReceiver<string, object>
>>>>> {
>>>>> public void Receive(ICache<string, object> cache,
>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>> {
>>>>> foreach (var entry in entries)
>>>>> {
>>>>> cache.WithExpiryPolicy(new
>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>> entry.Value);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>> Datastreamer code error
>>>>> [image: image.png]
>>>>>
>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>> Regards,
>>>>> Charlin
>>>>>
>>>>>
>>>>>
>>>>>