ptupitsyn commented on code in PR #11107:
URL: https://github.com/apache/ignite/pull/11107#discussion_r1432834699


##########
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Services/ServicesClient.cs:
##########
@@ -202,7 +217,127 @@ public IServicesClient WithServerKeepBinary()
                         : ctx.Reader;
 
                     return reader.ReadObject<object>();
-                });
+                }, 
+                GetServiceTopology(serviceName));
+        }
+
+        /// <summary>
+        /// Provides actual known service topology or empty list if: partition 
awareness is not enabled,
+        /// service topology is not supported or not received yet.
+        /// </summary>
+        private IList<Guid> GetServiceTopology(string serviceName)
+        {
+            if (_servicesTopologies == null || 
!_ignite.Socket.GetSocket().Features.HasFeature(ClientBitmaskFeature.ServiceTopology))
+                return null;
+            
+            return _servicesTopologies.GetOrAdd(serviceName, s => new 
ServiceTopology(serviceName, this)).GetAndUpdate();
+        }
+        
+        /// <summary>
+        /// Keeps and process topology of certain service.
+        /// </summary>
+        private class ServiceTopology
+        {
+            /** Service name. */
+            private readonly string _svcName;
+            
+            /** Ignite services. */
+            private readonly ServicesClient _svcClient;
+
+            /** Flag of topology update progress. */
+            private volatile int _updateInProgress;
+
+            /** Time of the last update. */
+            private long _lastUpdateRequestTime;
+
+            /** Cluster topology version of the last update. */
+            private long _lastAffTop;
+            
+            /** Ids of the nodes with at least one service instance. */
+            private volatile IList<Guid> _nodes = new List<Guid>();
+
+            /// <summary>
+            /// Creates service topology holder.
+            /// </summary>
+            internal ServiceTopology(string name, ServicesClient svcClient)
+            {
+                _svcName = name;
+                _svcClient = svcClient;
+            }
+
+            /// <summary>
+            /// Asynchronously updates the topology.
+            /// </summary>
+            private async Task UpdateTopologyAsync()
+            {
+                if (Interlocked.CompareExchange(ref _updateInProgress, 1, 0) 
== 1)
+                    return;
+
+                var socket = _svcClient._ignite.Socket;
+
+                var topVer = socket.GetTopologyVersion();
+
+                var log = _svcClient._ignite.GetConfiguration().Logger;
+
+                var groupNodes = _svcClient._clusterGroup?.GetNodes();
+                
+                var top = await 
socket.DoOutInOpAsync(ClientOp.ServiceGetTopology,
+                    ctx => ctx.Writer.WriteString(_svcName),
+                    ctx =>
+                    {
+                        var cnt = ctx.Reader.ReadInt();
+
+                        var res = new List<Guid>(cnt);
+                        
+                        for (var i = 0; i < cnt; ++i)
+                            res.Add(BinaryUtils.ReadGuid(ctx.Reader.Stream));
+
+                        return res;
+                    }, (status, err) =>
+                    {
+                        log.Error("Failed to update topology of the service '" 
+ _svcName + "'.", err);
+
+                        return _nodes;
+                    }).ConfigureAwait(false);
+                
+                _nodes = FilterTopology(top, groupNodes?.Select(n => 
n.Id).ToList());
+                
+                Interlocked.Exchange(ref _lastUpdateRequestTime, 
DateTime.Now.Ticks);
+                Interlocked.Exchange(ref _lastAffTop, topVer);
+
+                log.Debug("Topology of service '" + _svcName + "' has been 
updated. The " +

Review Comment:
   ```suggestion
                   if (log.IsEnabled(LogLevel.Debug))
                       log.Debug("Topology of service '" + _svcName + "' has 
been updated. The " +
   ```



##########
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Services/ServicesClient.cs:
##########
@@ -202,7 +217,127 @@ public IServicesClient WithServerKeepBinary()
                         : ctx.Reader;
 
                     return reader.ReadObject<object>();
-                });
+                }, 
+                GetServiceTopology(serviceName));
+        }
+
+        /// <summary>
+        /// Provides actual known service topology or empty list if: partition 
awareness is not enabled,
+        /// service topology is not supported or not received yet.
+        /// </summary>
+        private IList<Guid> GetServiceTopology(string serviceName)
+        {
+            if (_servicesTopologies == null || 
!_ignite.Socket.GetSocket().Features.HasFeature(ClientBitmaskFeature.ServiceTopology))
+                return null;
+            
+            return _servicesTopologies.GetOrAdd(serviceName, s => new 
ServiceTopology(serviceName, this)).GetAndUpdate();
+        }
+        
+        /// <summary>
+        /// Keeps and process topology of certain service.
+        /// </summary>
+        private class ServiceTopology
+        {
+            /** Service name. */
+            private readonly string _svcName;
+            
+            /** Ignite services. */
+            private readonly ServicesClient _svcClient;
+
+            /** Flag of topology update progress. */
+            private volatile int _updateInProgress;
+
+            /** Time of the last update. */
+            private long _lastUpdateRequestTime;
+
+            /** Cluster topology version of the last update. */
+            private long _lastAffTop;
+            
+            /** Ids of the nodes with at least one service instance. */
+            private volatile IList<Guid> _nodes = new List<Guid>();
+
+            /// <summary>
+            /// Creates service topology holder.
+            /// </summary>
+            internal ServiceTopology(string name, ServicesClient svcClient)
+            {
+                _svcName = name;
+                _svcClient = svcClient;
+            }
+
+            /// <summary>
+            /// Asynchronously updates the topology.
+            /// </summary>
+            private async Task UpdateTopologyAsync()
+            {
+                if (Interlocked.CompareExchange(ref _updateInProgress, 1, 0) 
== 1)
+                    return;
+
+                var socket = _svcClient._ignite.Socket;
+
+                var topVer = socket.GetTopologyVersion();
+
+                var log = _svcClient._ignite.GetConfiguration().Logger;
+
+                var groupNodes = _svcClient._clusterGroup?.GetNodes();
+                
+                var top = await 
socket.DoOutInOpAsync(ClientOp.ServiceGetTopology,
+                    ctx => ctx.Writer.WriteString(_svcName),
+                    ctx =>
+                    {
+                        var cnt = ctx.Reader.ReadInt();
+
+                        var res = new List<Guid>(cnt);
+                        
+                        for (var i = 0; i < cnt; ++i)
+                            res.Add(BinaryUtils.ReadGuid(ctx.Reader.Stream));
+
+                        return res;
+                    }, (status, err) =>
+                    {
+                        log.Error("Failed to update topology of the service '" 
+ _svcName + "'.", err);
+
+                        return _nodes;
+                    }).ConfigureAwait(false);
+                
+                _nodes = FilterTopology(top, groupNodes?.Select(n => 
n.Id).ToList());
+                
+                Interlocked.Exchange(ref _lastUpdateRequestTime, 
DateTime.Now.Ticks);
+                Interlocked.Exchange(ref _lastAffTop, topVer);
+
+                log.Debug("Topology of service '" + _svcName + "' has been 
updated. The " +
+                          "service instance nodes: " + string.Join(", ", 
top.Select(gid=>gid.ToString())) +
+                          ". Effective topology with the cluster group is: " + 
+                          string.Join(", ", 
_nodes.Select(gid=>gid.ToString())) + '.');
+
+                _updateInProgress = 0;

Review Comment:
   Mixing `volatile` and `Interlocked` makes the code harder to reason about. 
It is considered a good practice to stick to one synchronization mechanism for 
a given field. Let's remove `volatile` and use `Interlocked.CompareExchange(ref 
_updateInProgress, 0, 0)` to read the value.
   
   See 
https://stackoverflow.com/questions/24808291/reading-an-int-thats-updated-by-interlocked-on-other-threads



##########
modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Services/ServiceAwarenessTest.cs:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Services
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Net;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Apache.Ignite.Core.Client;
+    using Apache.Ignite.Core.Client.Services;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Log;
+    using Apache.Ignite.Core.Services;
+    using Apache.Ignite.Core.Tests.Client.Cache;
+    using Apache.Ignite.Core.Tests.Services;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Service awareness tests.
+    /// </summary>
+    public class ServicesAwarenessTest

Review Comment:
   This class reimplements a lot of logic from `ClientTestBase` - is that 
intentional?



##########
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Services/ServicesClient.cs:
##########
@@ -202,7 +217,127 @@ public IServicesClient WithServerKeepBinary()
                         : ctx.Reader;
 
                     return reader.ReadObject<object>();
-                });
+                }, 
+                GetServiceTopology(serviceName));
+        }
+
+        /// <summary>
+        /// Provides actual known service topology or empty list if: partition 
awareness is not enabled,
+        /// service topology is not supported or not received yet.
+        /// </summary>
+        private IList<Guid> GetServiceTopology(string serviceName)
+        {
+            if (_servicesTopologies == null || 
!_ignite.Socket.GetSocket().Features.HasFeature(ClientBitmaskFeature.ServiceTopology))
+                return null;
+            
+            return _servicesTopologies.GetOrAdd(serviceName, s => new 
ServiceTopology(serviceName, this)).GetAndUpdate();
+        }
+        
+        /// <summary>
+        /// Keeps and process topology of certain service.
+        /// </summary>
+        private class ServiceTopology
+        {
+            /** Service name. */
+            private readonly string _svcName;
+            
+            /** Ignite services. */
+            private readonly ServicesClient _svcClient;
+
+            /** Flag of topology update progress. */
+            private volatile int _updateInProgress;
+
+            /** Time of the last update. */
+            private long _lastUpdateRequestTime;
+
+            /** Cluster topology version of the last update. */
+            private long _lastAffTop;
+            
+            /** Ids of the nodes with at least one service instance. */
+            private volatile IList<Guid> _nodes = new List<Guid>();
+
+            /// <summary>
+            /// Creates service topology holder.
+            /// </summary>
+            internal ServiceTopology(string name, ServicesClient svcClient)
+            {
+                _svcName = name;
+                _svcClient = svcClient;
+            }
+
+            /// <summary>
+            /// Asynchronously updates the topology.
+            /// </summary>
+            private async Task UpdateTopologyAsync()
+            {
+                if (Interlocked.CompareExchange(ref _updateInProgress, 1, 0) 
== 1)
+                    return;
+
+                var socket = _svcClient._ignite.Socket;
+
+                var topVer = socket.GetTopologyVersion();
+
+                var log = _svcClient._ignite.GetConfiguration().Logger;
+
+                var groupNodes = _svcClient._clusterGroup?.GetNodes();
+                
+                var top = await 
socket.DoOutInOpAsync(ClientOp.ServiceGetTopology,
+                    ctx => ctx.Writer.WriteString(_svcName),
+                    ctx =>
+                    {
+                        var cnt = ctx.Reader.ReadInt();
+
+                        var res = new List<Guid>(cnt);
+                        
+                        for (var i = 0; i < cnt; ++i)
+                            res.Add(BinaryUtils.ReadGuid(ctx.Reader.Stream));
+
+                        return res;
+                    }, (status, err) =>
+                    {
+                        log.Error("Failed to update topology of the service '" 
+ _svcName + "'.", err);
+
+                        return _nodes;
+                    }).ConfigureAwait(false);
+                
+                _nodes = FilterTopology(top, groupNodes?.Select(n => 
n.Id).ToList());

Review Comment:
   `ToList` is unnecessary, `Intersect` allows `IEnumerable`.



##########
modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Services/ServicesClient.cs:
##########
@@ -202,7 +217,127 @@ public IServicesClient WithServerKeepBinary()
                         : ctx.Reader;
 
                     return reader.ReadObject<object>();
-                });
+                }, 
+                GetServiceTopology(serviceName));
+        }
+
+        /// <summary>
+        /// Provides actual known service topology or empty list if: partition 
awareness is not enabled,
+        /// service topology is not supported or not received yet.
+        /// </summary>
+        private IList<Guid> GetServiceTopology(string serviceName)
+        {
+            if (_servicesTopologies == null || 
!_ignite.Socket.GetSocket().Features.HasFeature(ClientBitmaskFeature.ServiceTopology))
+                return null;
+            
+            return _servicesTopologies.GetOrAdd(serviceName, s => new 
ServiceTopology(serviceName, this)).GetAndUpdate();
+        }
+        
+        /// <summary>
+        /// Keeps and process topology of certain service.
+        /// </summary>
+        private class ServiceTopology
+        {
+            /** Service name. */
+            private readonly string _svcName;
+            
+            /** Ignite services. */
+            private readonly ServicesClient _svcClient;
+
+            /** Flag of topology update progress. */
+            private volatile int _updateInProgress;
+
+            /** Time of the last update. */
+            private long _lastUpdateRequestTime;
+
+            /** Cluster topology version of the last update. */
+            private long _lastAffTop;
+            
+            /** Ids of the nodes with at least one service instance. */
+            private volatile IList<Guid> _nodes = new List<Guid>();
+
+            /// <summary>
+            /// Creates service topology holder.
+            /// </summary>
+            internal ServiceTopology(string name, ServicesClient svcClient)
+            {
+                _svcName = name;
+                _svcClient = svcClient;
+            }
+
+            /// <summary>
+            /// Asynchronously updates the topology.
+            /// </summary>
+            private async Task UpdateTopologyAsync()
+            {
+                if (Interlocked.CompareExchange(ref _updateInProgress, 1, 0) 
== 1)
+                    return;
+
+                var socket = _svcClient._ignite.Socket;
+
+                var topVer = socket.GetTopologyVersion();
+
+                var log = _svcClient._ignite.GetConfiguration().Logger;
+
+                var groupNodes = _svcClient._clusterGroup?.GetNodes();
+                
+                var top = await 
socket.DoOutInOpAsync(ClientOp.ServiceGetTopology,
+                    ctx => ctx.Writer.WriteString(_svcName),
+                    ctx =>
+                    {
+                        var cnt = ctx.Reader.ReadInt();
+
+                        var res = new List<Guid>(cnt);
+                        
+                        for (var i = 0; i < cnt; ++i)
+                            res.Add(BinaryUtils.ReadGuid(ctx.Reader.Stream));
+
+                        return res;
+                    }, (status, err) =>
+                    {
+                        log.Error("Failed to update topology of the service '" 
+ _svcName + "'.", err);
+
+                        return _nodes;
+                    }).ConfigureAwait(false);
+                
+                _nodes = FilterTopology(top, groupNodes?.Select(n => 
n.Id).ToList());
+                
+                Interlocked.Exchange(ref _lastUpdateRequestTime, 
DateTime.Now.Ticks);
+                Interlocked.Exchange(ref _lastAffTop, topVer);
+
+                log.Debug("Topology of service '" + _svcName + "' has been 
updated. The " +
+                          "service instance nodes: " + string.Join(", ", 
top.Select(gid=>gid.ToString())) +
+                          ". Effective topology with the cluster group is: " + 
+                          string.Join(", ", 
_nodes.Select(gid=>gid.ToString())) + '.');
+
+                _updateInProgress = 0;
+            }
+
+            /// <summary>
+            /// Filters service topology regarding to the cluster group.
+            /// </summary>
+            private static IList<Guid> FilterTopology(IList<Guid> 
serviceTopology, IList<Guid> clusterGroup)
+            {
+                return clusterGroup == null ? serviceTopology : 
serviceTopology.Intersect(clusterGroup).ToList();
+            }
+
+            /// <summary>
+            /// Provides last known service topology and asynchronously 
updates it if required.
+            /// </summary>
+            internal IList<Guid> GetAndUpdate()
+            {
+                if (_updateInProgress != 0)
+                    return _nodes;
+
+                var curAff = _svcClient._ignite.Socket.GetTopologyVersion();
+                var lastKnownAff = Interlocked.Read(ref _lastAffTop);
+                var sinceLastUpdate = DateTime.Now.Ticks - 
Interlocked.Read(ref _lastUpdateRequestTime);
+
+                if (curAff > lastKnownAff || sinceLastUpdate > 
SrvTopUpdatePeriod)
+                    UpdateTopologyAsync().ConfigureAwait(false);

Review Comment:
   ```suggestion
                       _ = UpdateTopologyAsync().ConfigureAwait(false);
   ```
   
   Discard the task explicitly to show that we did not skip `await` by mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to