Copilot commented on code in PR #7599:
URL: https://github.com/apache/ignite-3/pull/7599#discussion_r2811944146
##########
modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs:
##########
@@ -212,5 +218,32 @@ public IgniteClientConfiguration(IgniteClientConfiguration
other)
/// See <see cref="BasicAuthenticator"/>.
/// </summary>
public IAuthenticator? Authenticator { get; set; }
+
+ /// <summary>
+ /// Gets or sets the size of cache to store partition awareness
metadata of SQL queries, in number of entries.
+ /// Default is <see
cref="DefaultSqlPartitionAwarenessMetadataCacheSize"/>.
+ /// <para />
+ /// Set to zero to disable SQL partition awareness.
+ /// </summary>
+ /// <remarks>
+ /// SQL partition awareness feature improves query performance by
directing queries to the specific server nodes that hold the
+ /// relevant data, minimizing network overhead. Ignite client builds
the metadata cache during the initial query execution and leverages
+ /// this cache to speed up subsequent queries.
+ /// <para />
+ /// In general, metadata is available for queries
+ /// which have equality predicate over all colocation columns, or
which insert the whole tuple. For example:
+ /// <code>
+ /// // Create reservations table colocated by floor_no.
+ /// CREATE TABLE RoomsReservations (room_no INT, floor_no INT,
PRIMARY_KEY (room_no, floor_no)) COLOCATE BY (floor_no);
+ ///
+ /// // Select reserved rooms by floor_no - allows computing a
partition and routing.
+ /// SELECT room_no FROM RoomsReservations WHERE floor_no = ?;
+ ///
+ /// // INSERT: parametrized by floor_no - allows computing a partition
and routing.
+ /// INSERT INTO RoomsReservations(room_no, floor_no) VALUES(?, ?);
+ /// </code>
+ /// </remarks>
+ /// <value>Cache size, in number of entries.</value>
+ public int SqlPartitionAwarenessMetadataCacheSize { get; set; } =
DefaultSqlPartitionAwarenessMetadataCacheSize;
Review Comment:
The SqlPartitionAwarenessMetadataCacheSize configuration property lacks
validation to ensure it's not negative. While the Sql class correctly handles
zero and positive values, a negative value would be passed to the
ConcurrentCache constructor which doesn't validate the capacity parameter.
Consider adding validation to ensure the value is >= 0, either in the property
setter or in the ConcurrentCache constructor.
```suggestion
private int _sqlPartitionAwarenessMetadataCacheSize =
DefaultSqlPartitionAwarenessMetadataCacheSize;
/// <summary>
/// Gets or sets the size of cache to store partition awareness
metadata of SQL queries, in number of entries.
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when a negative value is assigned.
/// </exception>
public int SqlPartitionAwarenessMetadataCacheSize
{
get => _sqlPartitionAwarenessMetadataCacheSize;
set
{
if (value < 0)
{
throw new ArgumentOutOfRangeException(
nameof(value),
"SqlPartitionAwarenessMetadataCacheSize must be
greater than or equal to 0.");
}
_sqlPartitionAwarenessMetadataCacheSize = value;
}
}
```
##########
modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs:
##########
@@ -328,6 +341,47 @@ private static ResultSetMetadata ReadMeta(ref
MsgPackReader reader)
return new ResultSetMetadata(columns);
}
+ private static SqlPartitionAwarenessMetadata?
ReadPartitionAwarenessMetadata(
+ ConnectionContext ctx, ref MsgPackReader reader, bool
partitionMetadataExpected)
+ {
+ if
(!ctx.ServerHasFeature(ProtocolBitmaskFeature.SqlPartitionAwareness) ||
!partitionMetadataExpected)
+ {
+ return null;
+ }
+
+ if (reader.TryReadNil())
+ {
+ return null;
+ }
+
+ var tableId = reader.ReadInt32();
+
+ var tableName =
ctx.ServerHasFeature(ProtocolBitmaskFeature.SqlPartitionAwarenessTableName)
+ ? QualifiedName.Of(reader.ReadStringNullable(),
reader.ReadString())
+ : null;
+
+ var indexes = ReadIntArray(ref reader);
+ var hash = ReadIntArray(ref reader);
+
+ // Table name is required for caching. Return null if not
available.
+ return tableName == null
+ ? null
+ : new SqlPartitionAwarenessMetadata(tableId, tableName,
indexes, hash);
Review Comment:
Missing protocol handling: The .NET client reads partition awareness
metadata but doesn't consume the directTxMode byte that the server sends when
the SQL_DIRECT_TX_MAPPING feature is supported. After reading the hash array
(line 364), the client should check if the server supports
SQL_DIRECT_TX_MAPPING and skip/read the directTxMode byte to maintain protocol
synchronization. Without this, subsequent reads from the stream will be offset
by one byte.
##########
modules/platforms/dotnet/Apache.Ignite.Tests/ConcurrentCacheTest.cs:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests;
+
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Common;
+using Internal.Common;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="ConcurrentCache{TKey,TValue}"/>.
+/// </summary>
+public class ConcurrentCacheTest
+{
+ [Test]
+ public void TestTryAdd()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsTrue(cache.TryAdd(1, "one"));
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestTryAddExistingKey()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsTrue(cache.TryAdd(1, "one"));
+ Assert.IsFalse(cache.TryAdd(1, "uno"));
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestGetValueOrDefaultMissingKey()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsNull(cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestEviction()
+ {
+ var cache = new ConcurrentCache<int, string>(2);
+
+ cache.TryAdd(1, "one");
+ cache.TryAdd(2, "two");
+ cache.GetValueOrDefault(1); // Mark entry 1 as visited
+ cache.TryAdd(3, "four");
+
+ Assert.IsNull(cache.GetValueOrDefault(2), "Unused entry 2 should be
evicted");
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ Assert.AreEqual("four", cache.GetValueOrDefault(3));
Review Comment:
Inconsistent test data: The value "four" is used for key 3, but should be
"three" to maintain consistency with the naming pattern (key 1 -> "one", key 2
-> "two", key 3 -> "three").
```suggestion
cache.TryAdd(3, "three");
Assert.IsNull(cache.GetValueOrDefault(2), "Unused entry 2 should be
evicted");
Assert.AreEqual("one", cache.GetValueOrDefault(1));
Assert.AreEqual("three", cache.GetValueOrDefault(3));
```
##########
modules/platforms/dotnet/Apache.Ignite/Internal/Common/ConcurrentCache.cs:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Common;
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
+/// <summary>
+/// Concurrent cache with eviction based on capacity.
+/// </summary>
+/// <typeparam name="TKey">Key type.</typeparam>
+/// <typeparam name="TValue">Value type.</typeparam>
+internal sealed class ConcurrentCache<TKey, TValue>
+ where TKey : notnull
+{
+ private readonly int _capacity;
+ private readonly ConcurrentDictionary<TKey, Entry> _map;
+ private readonly IEnumerator<KeyValuePair<TKey, Entry>> _hand;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ConcurrentCache{TKey,
TValue}"/> class.
+ /// </summary>
+ /// <param name="capacity">Maximum capacity.</param>
+ public ConcurrentCache(int capacity)
+ {
+ _capacity = capacity;
+ _map = new ConcurrentDictionary<TKey, Entry>();
+ _hand = _map.GetEnumerator();
+ }
+
+ /// <summary>
+ /// Adds the specified key and value to the cache. Evicts old entries if
capacity is exceeded.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="value">Value.</param>
+ /// <returns>True if the entry was added, false if an entry with the same
key already exists.</returns>
+ public bool TryAdd(TKey key, TValue value)
+ {
+ var added = _map.TryAdd(key, new Entry(value));
+
+ if (added)
+ {
+ EvictIfNeeded();
+ }
+
+ return added;
+ }
+
+ /// <summary>
+ /// Gets the value associated with the specified key.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <returns>Value, or default(TValue) if not exists.</returns>
+ public TValue? GetValueOrDefault(TKey key)
+ {
+ if (!_map.TryGetValue(key, out var entry))
+ {
+ return default;
+ }
+
+ entry.Visited = true;
+ return entry.Value;
Review Comment:
The Visited flag is being set on the Entry record without synchronization.
Since records are reference types and the Visited property is mutable,
concurrent reads and writes from multiple threads could lead to race
conditions. While this might be acceptable for an approximate LRU/SIEVE
algorithm, it should be documented or the property should be marked as volatile
to ensure visibility across threads.
##########
modules/platforms/dotnet/Apache.Ignite/Internal/Common/ConcurrentCache.cs:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Common;
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
+/// <summary>
+/// Concurrent cache with eviction based on capacity.
+/// </summary>
+/// <typeparam name="TKey">Key type.</typeparam>
+/// <typeparam name="TValue">Value type.</typeparam>
+internal sealed class ConcurrentCache<TKey, TValue>
+ where TKey : notnull
+{
+ private readonly int _capacity;
+ private readonly ConcurrentDictionary<TKey, Entry> _map;
+ private readonly IEnumerator<KeyValuePair<TKey, Entry>> _hand;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ConcurrentCache{TKey,
TValue}"/> class.
+ /// </summary>
+ /// <param name="capacity">Maximum capacity.</param>
+ public ConcurrentCache(int capacity)
+ {
+ _capacity = capacity;
+ _map = new ConcurrentDictionary<TKey, Entry>();
+ _hand = _map.GetEnumerator();
+ }
+
+ /// <summary>
+ /// Adds the specified key and value to the cache. Evicts old entries if
capacity is exceeded.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="value">Value.</param>
+ /// <returns>True if the entry was added, false if an entry with the same
key already exists.</returns>
+ public bool TryAdd(TKey key, TValue value)
+ {
+ var added = _map.TryAdd(key, new Entry(value));
+
+ if (added)
+ {
+ EvictIfNeeded();
+ }
+
+ return added;
+ }
+
+ /// <summary>
+ /// Gets the value associated with the specified key.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <returns>Value, or default(TValue) if not exists.</returns>
+ public TValue? GetValueOrDefault(TKey key)
+ {
+ if (!_map.TryGetValue(key, out var entry))
+ {
+ return default;
+ }
+
+ entry.Visited = true;
+ return entry.Value;
+ }
+
+ private void EvictIfNeeded()
+ {
+ if (_map.Count <= _capacity)
+ {
+ return;
+ }
+
+ lock (_hand)
+ {
+ // Avoid infinite loop, evict any entry after a full cycle.
+ int retries = _capacity;
+
+ while (_map.Count > _capacity)
+ {
+ // SIEVE-like eviction.
+ if (!_hand.MoveNext())
+ {
+ _hand.Reset();
+ continue;
+ }
Review Comment:
The enumerator behavior after Reset() on a ConcurrentDictionary may not work
as expected. After calling Reset(), the next MoveNext() should start from the
beginning, but if the dictionary was modified after the enumerator was created,
it may throw an InvalidOperationException. Consider recreating the enumerator
when Reset() returns the iterator to the beginning, or use a different
iteration strategy that doesn't rely on Reset().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]