alex-plekhanov commented on code in PR #10140:
URL: https://github.com/apache/ignite/pull/10140#discussion_r921265351
##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java:
##########
@@ -46,7 +46,7 @@ public void testResourcesReleasedAfterClientClosed() throws
Exception {
assertFalse(channels[0].isClosed());
assertFalse(channels[1].isClosed());
- assertEquals(1, threadsCount(THREAD_PREFIX));
+ assertTrue(GridTestUtils.waitForCondition(() ->
threadsCount(THREAD_PREFIX) == 1, 1_000L));
Review Comment:
Why do we need to wait here? Looks like it's enough to just check threads
count, thread should be started at this moment.
##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java:
##########
@@ -73,6 +76,28 @@ public void testPartitionedCustomAffinityCache() {
testNotApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
}
+ /**
+ * Test that partition awareness is applicable for partitioned cache with
custom affinity function
+ * and key to partition mapping function is set on the client side.
+ */
+ @Test
+ public void testPartitionedCustomAffinityCacheWithMapper() throws
Exception {
+ client.close();
Review Comment:
Looks weird when we first init client in `beforeTest` and close it right
after initialization. Let's create new test class instead.
##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientIgniteSetImpl.java:
##########
@@ -83,6 +83,8 @@ public ClientIgniteSetImpl(
this.name = name;
this.colocated = colocated;
this.cacheId = cacheId;
+
+ this.ch.registerKeyPartitionMapperFactory(cacheId, this.name);
Review Comment:
`this.name` - it's not a cache name, can't be used here
##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java:
##########
@@ -143,8 +147,23 @@ public static ClientCacheAffinityMapping
merge(ClientCacheAffinityMapping... map
*
* @param ch Output channel.
* @param cacheIds Cache IDs.
+ * @param mappers Function that produces key mapping functions.
*/
- public static void writeRequest(PayloadOutputChannel ch,
Collection<Integer> cacheIds) {
+ public static void writeRequest(
+ PayloadOutputChannel ch,
+ Collection<Integer> cacheIds,
+ Function<Integer, Function<Integer, ToIntFunction<Object>>> mappers
+ ) {
+ ProtocolContext ctx = ch.clientChannel().protocolCtx();
+
+ boolean hasFactory = cacheIds.stream()
Review Comment:
Can we send some flag with request if mapping for no default affinity is
required? Currently it will be always sent, but if there is no factories it's
useless.
##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientAtomicLongImpl.java:
##########
@@ -55,6 +55,8 @@ public ClientAtomicLongImpl(String name, @Nullable String
groupName, ReliableCha
String groupNameInternal = groupName == null ?
DataStructuresProcessor.DEFAULT_DS_GROUP_NAME : groupName;
cacheId =
ClientUtils.cacheId(DataStructuresProcessor.ATOMICS_CACHE_NAME + "@" +
groupNameInternal);
+
+ this.ch.registerKeyPartitionMapperFactory(cacheId, this.name);
Review Comment:
`this.name` - it's not a cache name, can't be used here
##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -219,6 +230,14 @@ private ClientCacheAffinityMapping currentMapping() {
return mapping;
}
+ /**
+ * @param cacheId Cache id.
+ * @param factory Key mapper factory.
+ */
+ public void addKeyMapperFactory(int cacheId, Function<Integer,
ToIntFunction<Object>> factory) {
+ cacheKeyMapperFactoryMap.putIfAbsent(cacheId, factory);
Review Comment:
There is memory leak in this map in case continuous cache create/destroy.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java:
##########
@@ -52,12 +53,14 @@ class ClientCachePartitionsResponse extends ClientResponse {
@Override public void encode(ClientConnectionContext ctx,
BinaryRawWriterEx writer) {
encode(ctx, writer, affinityVer);
+ CacheObjectBinaryProcessorImpl proc =
(CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
+
affinityVer.write(writer);
writer.writeInt(mappings.size());
for (ClientCachePartitionAwarenessGroup mapping : mappings) {
- mapping.write(writer);
+ mapping.write(proc, writer, ctx.currentProtocolContext());
Review Comment:
Lets also remove redundant braces.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java:
##########
@@ -77,14 +75,13 @@ public ClientCachePartitionsRequest(BinaryRawReader reader)
{
if (cacheDesc == null)
continue;
- ClientCachePartitionAwarenessGroup grp = processCache(ctx, groups,
cacheGroupIds, affinityVer, cacheDesc);
+ ClientCachePartitionAwarenessGroup grp = processCache(ctx,
affinityVer, cacheDesc);
- // Cache already processed.
if (grp == null)
continue;
- groups.add(grp);
- cacheGroupIds.put(cacheDesc.groupId(), grp);
+ grps.computeIfAbsent(grp, grp0 -> grp::addCache)
Review Comment:
Complex logic, hard to understand, moreover addCache called twice for the
first cache. I think it's better to use something line
`Map<ClientCachePartitionAwarenessGroup, ClientCachePartitionAwarenessGroup>
grps` instead and add groups like:
```
ClientCachePartitionAwarenessGroup oldGrp = grps.putIfAbsent(grp, grp);
if (oldGrp != null)
oldGrp.addCache(cacheDesc);
```
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java:
##########
@@ -95,79 +92,43 @@ public ClientCachePartitionsRequest(BinaryRawReader reader)
{
if (!cacheDesc.cacheType().userCache())
continue;
- processCache(ctx, groups, cacheGroupIds, affinityVer, cacheDesc);
+ ClientCachePartitionAwarenessGroup grp = processCache(ctx,
affinityVer, cacheDesc);
+
+ if (grp == null)
+ continue;
+
+ if (grps.containsKey(grp))
Review Comment:
Lets use something like
```
grp0 = grps.get(grp);
if (grp0 != null)
grp.addCache(cacheDesc)
```
`grp.equals` and `grp.hashCode` are expensive operations, so we should at
least minimize their usage.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java:
##########
@@ -77,14 +75,13 @@ public ClientCachePartitionsRequest(BinaryRawReader reader)
{
if (cacheDesc == null)
continue;
- ClientCachePartitionAwarenessGroup grp = processCache(ctx, groups,
cacheGroupIds, affinityVer, cacheDesc);
+ ClientCachePartitionAwarenessGroup grp = processCache(ctx,
affinityVer, cacheDesc);
Review Comment:
One cache group can contains a lot of caches, but the same mapping.
Comparing mapping for each cache is too resource consuming, we should group
these comparing by cache groups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]