Mmuzaf commented on a change in pull request #8028:
URL: https://github.com/apache/ignite/pull/8028#discussion_r487793262
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -76,80 +78,58 @@
private final GridAtomicLong ackedUpdCntr = new GridAtomicLong(0);
/**
- * @param part Partition number.
+ * @param currPartCntr Current partition counter.
* @param log Continuous query category logger.
*/
- CacheContinuousQueryEventBuffer(int part, IgniteLogger log) {
- this.part = part;
+ CacheContinuousQueryEventBuffer(LongUnaryOperator currPartCntr,
IgniteLogger log) {
+ this.currPartCntr = currPartCntr;
this.log = log;
}
/**
- * @param part Partition number.
+ * @param log Continuous query category logger.
*/
- CacheContinuousQueryEventBuffer(int part) {
- this(part, null);
+ CacheContinuousQueryEventBuffer(IgniteLogger log) {
+ this((backup) -> 0, log);
}
/**
* @param updateCntr Acknowledged counter.
*/
- void cleanupBackupQueue(Long updateCntr) {
- Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
-
- while (it.hasNext()) {
- CacheContinuousQueryEntry backupEntry = it.next();
-
- if (backupEntry.updateCounter() <= updateCntr)
- it.remove();
- }
-
+ void cleanupOnAck(long updateCntr) {
+ backupQ.removeIf(backupEntry -> backupEntry.updateCounter() <=
updateCntr);
Review comment:
Fixed
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
##########
@@ -155,8 +155,8 @@ void resetTopologyCache() {
if (!entry.isFiltered())
entries.add(new CacheContinuousQueryEvent<K, V>(cache,
cctx, entry));
- if (log.isDebugEnabled())
- log.debug("Partition was lost [lastFiredEvt=" +
lastFiredEvt +
+ if (log.isInfoEnabled())
Review comment:
Reverted.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
##########
@@ -376,34 +338,32 @@ private Batch initBatch(AffinityTopologyVersion topVer,
boolean backup) {
/** */
private CacheContinuousQueryEntry[] entries;
- /** */
- private final AffinityTopologyVersion topVer;
-
/**
* @param filtered Number of filtered events before this batch.
* @param entries Entries array.
- * @param topVer Current event topology version.
* @param startCntr Start counter.
*/
- Batch(long startCntr, long filtered, CacheContinuousQueryEntry[]
entries, AffinityTopologyVersion topVer) {
+ Batch(long startCntr, long filtered, CacheContinuousQueryEntry[]
entries) {
assert startCntr >= 0;
assert filtered >= 0;
this.startCntr = startCntr;
this.filtered = filtered;
this.entries = entries;
- this.topVer = topVer;
endCntr = startCntr + BUF_SIZE - 1;
}
/**
- * @param res Current entries.
- * @return Entries to send as part of backup queue.
+ * @param filteredFactory Factory which produces filtered entries.
+ * @return Map of collected entries.
*/
- @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry>
flushCurrentEntries(
- @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
- if (entries == null)
+ synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+ BiFunction<Long, Long, CacheContinuousQueryEntry> filteredFactory
+ ) {
+ Map<Long, CacheContinuousQueryEntry> res = new HashMap<>();
Review comment:
Agree, Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]