This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8c8c919220d IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict() 8c8c919220d is described below commit 8c8c919220dd910a6891dfd0795cef63b56974b6 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Mon Oct 30 12:38:13 2023 +0300 IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict() --- .../CacheObjectCompressionConsumptionTest.java | 26 +++++++++++++++++++--- .../platform/client/ClientMessageParser.java | 4 ++-- .../cache/ClientCachePutAllConflictRequest.java | 4 ++-- .../cache/ClientCacheRemoveAllConflictRequest.java | 2 +- .../streamer/ClientDataStreamerAddDataRequest.java | 5 ++--- .../client/streamer/ClientDataStreamerReader.java | 7 +++--- .../streamer/ClientDataStreamerStartRequest.java | 5 ++--- .../processors/platform/utils/PlatformUtils.java | 8 +++++-- 8 files changed, 42 insertions(+), 19 deletions(-) diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java index 38ce530e530..5aabe2baf2b 100644 --- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.transform; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Function; import org.apache.commons.io.FileUtils; import org.apache.ignite.DataRegionMetrics; @@ -34,8 +36,11 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.client.thin.TcpClientCache; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.metric.LongMetric; @@ -285,7 +290,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo Ignite prim = primaryNode(0, CACHE_NAME); - if (mode == ConsumptionTestMode.THIN_CLIENT) { + if (mode == ConsumptionTestMode.THIN_CLIENT || mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) { String host = prim.configuration().getLocalHost(); int port = prim.configuration().getClientConnectorConfiguration().getPort(); @@ -296,7 +301,19 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo Object key = keyGen.apply(i); Object val = valGen.apply(i); - cache.put(key, val); + if (mode == ConsumptionTestMode.THIN_CLIENT) + cache.put(key, val); + else { + assert mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API; + + Map<Object, T3<Object, GridCacheVersion, Long>> data = new HashMap<>(); + + GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 0); + + data.put(key, new T3<>(val, otherVer, 0L)); + + ((TcpClientCache)cache).putAllConflict(data); + } assertEqualsArraysAware(cache.get(key), val); } @@ -335,7 +352,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo clNet += reg.<LongMetric>findMetric(SENT_BYTES_METRIC_NAME).value(); clNet += reg.<LongMetric>findMetric(RECEIVED_BYTES_METRIC_NAME).value(); - if (mode != ConsumptionTestMode.THIN_CLIENT) + if (mode != ConsumptionTestMode.THIN_CLIENT && mode != ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) assertEquals(0, clNet); net += clNet; @@ -406,6 +423,9 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo /** Thin client. */ THIN_CLIENT, + /** Thin client uses internal API. */ + THIN_CLIENT_INTERNAL_API, + /** Node + Persistent. */ PERSISTENT } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index dbd62bc0e28..22cc257e660 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -627,10 +627,10 @@ public class ClientMessageParser implements ClientListenerMessageParser { return new ClientServiceGetDescriptorRequest(reader); case OP_DATA_STREAMER_START: - return new ClientDataStreamerStartRequest(reader); + return new ClientDataStreamerStartRequest(reader, ctx); case OP_DATA_STREAMER_ADD_DATA: - return new ClientDataStreamerAddDataRequest(reader); + return new ClientDataStreamerAddDataRequest(reader, ctx); case OP_ATOMIC_LONG_CREATE: return new ClientAtomicLongCreateRequest(reader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java index ac3182b751b..4701f654e66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java @@ -60,8 +60,8 @@ public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest imp map = new LinkedHashMap<>(cnt); for (int i = 0; i < cnt; i++) { - KeyCacheObject key = readCacheObject(reader, true); - CacheObject val = readCacheObject(reader, false); + KeyCacheObject key = readCacheObject(reader, true, ctx); + CacheObject val = readCacheObject(reader, false, ctx); GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached(); long expireTime = reader.readLong(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java index c6e1a35f1d2..168585bb099 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java @@ -51,7 +51,7 @@ public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest map = new LinkedHashMap<>(cnt); for (int i = 0; i < cnt; i++) { - KeyCacheObject key = readCacheObject(reader, true); + KeyCacheObject key = readCacheObject(reader, true, null); GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached(); map.put(key, ver); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java index 8e17a51a7ea..9eaaec9e2e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.platform.client.streamer; import java.util.Collection; - import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -48,12 +47,12 @@ public class ClientDataStreamerAddDataRequest extends ClientDataStreamerRequest * * @param reader Data reader. */ - public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader) { + public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) { super(reader); streamerId = reader.readLong(); flags = reader.readByte(); - entries = ClientDataStreamerReader.read(reader); + entries = ClientDataStreamerReader.read(reader, ctx); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java index 39a973df3c5..da1ea65bef8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject; @@ -34,7 +35,7 @@ class ClientDataStreamerReader { * @param reader Data reader. * @return Streamer entry. */ - public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) { + public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader, ClientConnectionContext ctx) { int entriesCnt = reader.readInt(); if (entriesCnt == 0) @@ -43,8 +44,8 @@ class ClientDataStreamerReader { Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt); for (int i = 0; i < entriesCnt; i++) { - entries.add(new DataStreamerEntry(readCacheObject(reader, true), - readCacheObject(reader, false))); + entries.add(new DataStreamerEntry(readCacheObject(reader, true, ctx), + readCacheObject(reader, false, ctx))); } return entries; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java index b8cd231aeaa..7c96f423e69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.platform.client.streamer; import java.util.Collection; - import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.internal.GridKernalContext; @@ -73,7 +72,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest { * * @param reader Data reader. */ - public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) { + public ClientDataStreamerStartRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) { super(reader); cacheId = reader.readInt(); @@ -82,7 +81,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest { perThreadBufferSize = reader.readInt(); receiverObj = reader.readObjectDetached(); receiverPlatform = receiverObj == null ? 0 : reader.readByte(); - entries = ClientDataStreamerReader.read(reader); + entries = ClientDataStreamerReader.read(reader, ctx); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index 8462a5d24fe..94b99943d89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformExtendedException; import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -1373,8 +1374,9 @@ public class PlatformUtils { * * @param reader Reader. * @param isKey {@code True} if object is a key. + * @param ctx Client connection context. */ - public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) { + public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey, ClientConnectionContext ctx) { BinaryInputStream in = reader.in(); int pos0 = in.position(); @@ -1393,7 +1395,9 @@ public class PlatformUtils { byte[] objBytes = in.readByteArray(pos1 - pos0); - return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes); + return isKey ? + (T)new KeyCacheObjectImpl(obj, objBytes, -1) : + (T)new CacheObjectImpl(obj, ctx.kernalContext().transformer() == null ? objBytes : null); } /**