NSAmelchev commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487003898



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
     private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
 
     /**
-     * @param part Partition number.
+     * @param currPartCntr Current partition counter.
      * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
-        this.part = part;
+    CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr, 
IgniteLogger log) {
+        this.currPartCntr = currPartCntr;
         this.log = log;
     }
 
     /**
-     * @param part Partition number.
+     * @param log Continuous query category logger.
      */
-    CacheContinuousQueryEventBuffer(int part) {
-        this(part, null);
+    CacheContinuousQueryEventBuffer(IgniteLogger log) {
+        this((backup) -> 0, log);
     }
 
     /**
      * @param updateCntr Acknowledged counter.
      */
-    void cleanupBackupQueue(Long updateCntr) {
-        Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
-        while (it.hasNext()) {
-            CacheContinuousQueryEntry backupEntry = it.next();
-
-            if (backupEntry.updateCounter() <= updateCntr)
-                it.remove();
-        }
-
+    void cleanupOnAck(long updateCntr) {
+        backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <= 
updateCntr);

Review comment:
       Add line break, please

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
                     if (!entry.isFiltered())
                         entries.add(new CacheContinuousQueryEvent<K, V>(cache, 
cctx, entry));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Partition was lost [lastFiredEvt=" + 
lastFiredEvt +
+                    if (log.isInfoEnabled())

Review comment:
       I suggest to revert this change. It's not related to the issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to