sanpwc commented on code in PR #4143:
URL: https://github.com/apache/ignite-3/pull/4143#discussion_r1701543333


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -368,56 +360,51 @@ void onSnapshotLoad() {
                         result = 
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
                     }
 
-                    idempotentCommandCache.put(commandId, new 
IdempotentCommandCachedResult(result, now));
+                    idempotentCommandCache.put(commandId, result);
                 }
             }
         }
     }
 
     /**
      * Removes obsolete entries from both volatile and persistent idempotent 
command cache.
+     *
+     * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
+     * @param operationTimestamp Command operation timestamp.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta 
storage compaction.
-    void evictIdempotentCommandsCache() {
-        HybridTimestamp cleanupTimestamp = clusterTime.now();
-        LOG.info("Idempotent command cache cleanup started 
[cleanupTimestamp={}].", cleanupTimestamp);
-
-        maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
-            List<CommandId> commandIdsToRemove = 
idempotentCommandCache.entrySet().stream()
-                    .filter(entry -> 
entry.getValue().commandStartTime.getPhysical()
-                            <= cleanupTimestamp.getPhysical() - 
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
-                    .map(Map.Entry::getKey)
+    void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, 
HybridTimestamp operationTimestamp) {
+        LOG.info("Idempotent command cache cleanup started 
[evictionTimestamp={}].", evictionTimestamp);
+
+        long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+
+        if (obsoleteRevision != -1) {
+            byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+            byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+            List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo, 
obsoleteRevision).stream()
+                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
+                    .filter(entry -> !entry.tombstone() && !entry.empty())
+                    .map(Entry::key)
                     .collect(toList());
 
-            if (!commandIdsToRemove.isEmpty()) {
-                List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
-                        .map(commandId -> ArrayUtils.concat(new byte[]{}, 
ByteUtils.toBytes(commandId)))
-                        .collect(toList());
+            // TODO https://issues.apache.org/jira/browse/IGNITE-22828
+            evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+                byte[] commandIdBytes = copyOfRange(evictionCandidateKeyBytes, 
IDEMPOTENT_COMMAND_PREFIX_BYTES.length,

Review Comment:
   That means that we will either will discard the prefix (which is not safe I 
guess) or put commands into volatile cache with prefix. Latter means that we 
will need to prepend the prefix on each read while invoke/multiInvoke 
processing. Any other options? BTW we may continue the discussion in 
https://issues.apache.org/jira/browse/IGNITE-22828.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to