This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/oom
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 26a32c2c3ca1aa94fb16fb7a987cba7f7308b00f
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Sep 21 08:27:28 2025 +0800

    Fix memory leaks and OOM issues in streaming processing
    
    Fix the issue by implementing deduplication logic in priority queues
     and improving sliding window memory management.
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                |   1 +
 banyand/measure/write_liaison.go          |   8 +-
 banyand/queue/pub/pub.go                  |  11 +-
 banyand/stream/write_liaison.go           |   8 +-
 pkg/flow/dedup_priority_queue.go          |  54 ++++-
 pkg/flow/dedup_priority_queue_test.go     | 370 ++++++++++++++++++++++++++++++
 pkg/flow/streaming/sliding_window.go      | 110 ++++-----
 pkg/flow/streaming/sliding_window_test.go | 137 +++++++++++
 pkg/flow/types.go                         |   2 +-
 9 files changed, 634 insertions(+), 67 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d788fc11..c48ab2c0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -57,6 +57,7 @@ Release Notes.
 - Fix topN parsing panic when the criteria is set.
 - Remove the indexed_only field in TagSpec.
 - Fix returning empty result when using IN operatior on the array type tags.
+- Fix memory leaks and OOM issues in streaming processing by implementing 
deduplication logic in priority queues and improving sliding window memory 
management.
 
 ### Document
 
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 4b360c78..7b287ff5 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -131,9 +131,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                // Send to all nodes for this shard
                                for _, node := range nodes {
                                        message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
-                                       _, publishErr := 
w.tire2Client.Publish(ctx, topic, message)
+                                       future, publishErr := 
w.tire2Client.Publish(ctx, topic, message)
                                        if publishErr != nil {
                                                
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
+                                               continue
+                                       }
+                                       _, err := future.Get()
+                                       if err != nil {
+                                               
w.l.Error().Err(err).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to get response from publish")
+                                               continue
                                        }
                                }
                        }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 74fa2432..e8a397ed 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -229,7 +229,9 @@ type publishResult struct {
 
 func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages 
...bus.Message) (bus.Future, error) {
        var err error
-       f := &future{}
+       f := &future{
+               log: p.log,
+       }
        handleMessage := func(m bus.Message, err error) error {
                r, errSend := messageToRequest(topic, m)
                if errSend != nil {
@@ -362,6 +364,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
 }
 
 type future struct {
+       log      *logger.Logger
        clients  []clusterv1.Service_SendClient
        cancelFn []func()
        topics   []bus.Topic
@@ -372,10 +375,16 @@ func (l *future) Get() (bus.Message, error) {
        if len(l.clients) < 1 {
                return bus.Message{}, io.EOF
        }
+
        c := l.clients[0]
        t := l.topics[0]
        n := l.nodes[0]
+
        defer func() {
+               if err := c.CloseSend(); err != nil {
+                       l.log.Error().Err(err).Msg("failed to close send 
stream")
+               }
+
                l.clients = l.clients[1:]
                l.topics = l.topics[1:]
                l.cancelFn[0]()
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index d8478f1b..c78d1a37 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -204,9 +204,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                        // Send to all nodes for this shard
                                        for _, node := range nodes {
                                                message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
-                                               _, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
+                                               future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
                                                if publishErr != nil {
                                                        
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
+                                                       continue
+                                               }
+                                               _, err := future.Get()
+                                               if err != nil {
+                                                       
w.l.Error().Err(err).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to get response from publish")
+                                                       continue
                                                }
                                        }
                                }
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index 527ab6f5..becbd55d 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -31,11 +31,21 @@ type Element interface {
        SetIndex(int)
 }
 
+// HashableElement represents an element that can be hashed and compared for 
equality.
+type HashableElement interface {
+       Element
+       // Hash returns a hash value for this element
+       Hash() uint64
+       // Equal compares this element with another for content equality
+       Equal(HashableElement) bool
+}
+
 // DedupPriorityQueue implements heap.Interface.
 // DedupPriorityQueue is not thread-safe.
 type DedupPriorityQueue struct {
        comparator      utils.Comparator
        cache           map[Element]struct{}
+       hashCache       map[uint64][]HashableElement // For content-based 
deduplication
        Items           []Element
        allowDuplicates bool
 }
@@ -46,6 +56,7 @@ func NewPriorityQueue(comparator utils.Comparator, 
allowDuplicates bool) *DedupP
                comparator:      comparator,
                Items:           make([]Element, 0),
                cache:           make(map[Element]struct{}),
+               hashCache:       make(map[uint64][]HashableElement),
                allowDuplicates: allowDuplicates,
        }
 }
@@ -60,6 +71,9 @@ func (pq *DedupPriorityQueue) Less(i, j int) bool {
 
 // Swap exchanges indexes of the items.
 func (pq *DedupPriorityQueue) Swap(i, j int) {
+       if i < 0 || i >= len(pq.Items) || j < 0 || j >= len(pq.Items) {
+               panic("index out of range in DedupPriorityQueue.Swap")
+       }
        pq.Items[i], pq.Items[j] = pq.Items[j], pq.Items[i]
        pq.Items[i].SetIndex(i)
        pq.Items[j].SetIndex(j)
@@ -71,11 +85,29 @@ func (pq *DedupPriorityQueue) Push(x interface{}) {
        item := x.(Element)
        // if duplicates is not allowed
        if !pq.allowDuplicates {
-               // use mutex to protect cache and items
-               // check existence
+               // Check for reference-based duplicates first
                if _, ok := pq.cache[item]; ok {
                        return
                }
+
+               // Check for content-based duplicates if the item implements 
HashableElement
+               if hashableItem, ok := item.(HashableElement); ok {
+                       hash := hashableItem.Hash()
+                       if existingItems, exists := pq.hashCache[hash]; exists {
+                               // Check if any existing item has the same 
content
+                               for _, existing := range existingItems {
+                                       if hashableItem.Equal(existing) {
+                                               return // Duplicate found, 
don't add
+                                       }
+                               }
+                               // No duplicate found, add to hash cache
+                               pq.hashCache[hash] = append(pq.hashCache[hash], 
hashableItem)
+                       } else {
+                               // First item with this hash
+                               pq.hashCache[hash] = 
[]HashableElement{hashableItem}
+                       }
+               }
+
                pq.cache[item] = struct{}{}
        }
        n := len(pq.Items)
@@ -90,6 +122,24 @@ func (pq *DedupPriorityQueue) Pop() interface{} {
        item := pq.Items[n-1]
        item.SetIndex(-1) // for safety
        delete(pq.cache, item)
+
+       // Clean up hash cache if item implements HashableElement
+       if hashableItem, ok := item.(HashableElement); ok {
+               hash := hashableItem.Hash()
+               if existingItems, exists := pq.hashCache[hash]; exists {
+                       // Remove the specific item from the hash cache
+                       for i, existing := range existingItems {
+                               if hashableItem.Equal(existing) {
+                                       pq.hashCache[hash] = 
append(existingItems[:i], existingItems[i+1:]...)
+                                       if len(pq.hashCache[hash]) == 0 {
+                                               delete(pq.hashCache, hash)
+                                       }
+                                       break
+                               }
+                       }
+               }
+       }
+
        pq.Items = pq.Items[0 : n-1]
        return item
 }
diff --git a/pkg/flow/dedup_priority_queue_test.go 
b/pkg/flow/dedup_priority_queue_test.go
new file mode 100644
index 00000000..d79b0187
--- /dev/null
+++ b/pkg/flow/dedup_priority_queue_test.go
@@ -0,0 +1,370 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flow
+
+import (
+       "container/heap"
+       "fmt"
+       "testing"
+)
+
+type testHashableElement struct {
+       id    string
+       val   int
+       index int
+}
+
+func (t *testHashableElement) GetIndex() int {
+       return t.index
+}
+
+func (t *testHashableElement) SetIndex(idx int) {
+       t.index = idx
+}
+
+func (t *testHashableElement) Hash() uint64 {
+       // Simple hash based on id
+       hash := uint64(0)
+       for _, b := range []byte(t.id) {
+               hash = hash*31 + uint64(b)
+       }
+       return hash
+}
+
+func (t *testHashableElement) Equal(other HashableElement) bool {
+       if otherElem, ok := other.(*testHashableElement); ok {
+               return t.id == otherElem.id
+       }
+       return false
+}
+
+type testElement struct {
+       val   int
+       index int
+}
+
+func (t *testElement) GetIndex() int {
+       return t.index
+}
+
+func (t *testElement) SetIndex(idx int) {
+       t.index = idx
+}
+
+func TestHashableElementHash(t *testing.T) {
+       elem1 := &testHashableElement{id: "test"}
+       elem2 := &testHashableElement{id: "test"}
+       elem3 := &testHashableElement{id: "different"}
+
+       if elem1.Hash() != elem2.Hash() {
+               t.Errorf("Expected same hash for elements with same id, got %d 
and %d", elem1.Hash(), elem2.Hash())
+       }
+
+       if elem1.Hash() == elem3.Hash() {
+               t.Errorf("Expected different hash for elements with different 
id, got same hash %d", elem1.Hash())
+       }
+}
+
+func TestHashableElementEqual(t *testing.T) {
+       elem1 := &testHashableElement{id: "test", val: 10}
+       elem2 := &testHashableElement{id: "test", val: 20}      // Same id, 
different val
+       elem3 := &testHashableElement{id: "different", val: 10} // Different 
id, same val
+
+       // Same id should be equal regardless of other fields
+       if !elem1.Equal(elem2) {
+               t.Error("Expected elements with same id to be equal")
+       }
+
+       // Different id should not be equal
+       if elem1.Equal(elem3) {
+               t.Error("Expected elements with different id to not be equal")
+       }
+}
+
+func TestDedupPriorityQueue_ContentBasedDeduplication(t *testing.T) {
+       // Create a priority queue with deduplication enabled
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Create two elements with the same id (should be deduplicated)
+       elem1 := &testHashableElement{id: "test", val: 10}
+       elem2 := &testHashableElement{id: "test", val: 10}
+
+       // Push both elements
+       heap.Push(pq, elem1)
+       heap.Push(pq, elem2)
+
+       // Should only have one element in the heap
+       if pq.Len() != 1 {
+               t.Errorf("Expected 1 element in heap after deduplication, got 
%d", pq.Len())
+       }
+
+       // Create elements with different ids (should not be deduplicated)
+       elem3 := &testHashableElement{id: "different", val: 20}
+       heap.Push(pq, elem3)
+
+       // Should now have two elements
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements in heap after adding different 
element, got %d", pq.Len())
+       }
+}
+
+func TestDedupPriorityQueue_AllowDuplicates(t *testing.T) {
+       // Create a priority queue with duplicates allowed
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, true)
+
+       // Create two elements with the same id
+       elem1 := &testHashableElement{id: "test", val: 10}
+       elem2 := &testHashableElement{id: "test", val: 10}
+
+       // Push both elements
+       heap.Push(pq, elem1)
+       heap.Push(pq, elem2)
+
+       // Should have two elements in the heap (duplicates allowed)
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements in heap when duplicates are 
allowed, got %d", pq.Len())
+       }
+
+       // Even pushing the same reference should be allowed
+       heap.Push(pq, elem1)
+       if pq.Len() != 3 {
+               t.Errorf("Expected 3 elements in heap after pushing same 
reference when duplicates allowed, got %d", pq.Len())
+       }
+}
+
+func TestDedupPriorityQueue_HashCollisionHandling(t *testing.T) {
+       // Create a priority queue with deduplication enabled
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Create elements that might have hash collisions
+       elem1 := &testHashableElement{id: "ab", val: 10} // Hash might collide 
with "ba"
+       elem2 := &testHashableElement{id: "ba", val: 10} // Different id, might 
have same hash
+       elem3 := &testHashableElement{id: "ab", val: 20} // Same id as elem1, 
should be deduplicated
+
+       heap.Push(pq, elem1)
+       heap.Push(pq, elem2)
+       heap.Push(pq, elem3)
+
+       // Should have 2 elements: elem1 (or elem3, they're equivalent) and 
elem2
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements after hash collision test, got 
%d", pq.Len())
+       }
+}
+
+func TestDedupPriorityQueue_PopCleanup(t *testing.T) {
+       // Create a priority queue with deduplication enabled
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Add elements
+       elem1 := &testHashableElement{id: "test1", val: 10}
+       elem2 := &testHashableElement{id: "test2", val: 20}
+       elem3 := &testHashableElement{id: "test1", val: 30} // Same id as elem1
+
+       heap.Push(pq, elem1)
+       heap.Push(pq, elem2)
+       heap.Push(pq, elem3) // Should be deduplicated
+
+       // Should have 2 elements (elem1 and elem2)
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements after deduplication, got %d", 
pq.Len())
+       }
+
+       // Pop an element
+       popped := heap.Pop(pq).(*testHashableElement)
+
+       // Should have 1 element left
+       if pq.Len() != 1 {
+               t.Errorf("Expected 1 element after pop, got %d", pq.Len())
+       }
+
+       // Try to add the same element again - should be allowed since it was 
popped
+       heap.Push(pq, popped)
+
+       // Should now have 2 elements again
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements after re-adding popped element, 
got %d", pq.Len())
+       }
+}
+
+func TestDedupPriorityQueue_MixedElementTypes(t *testing.T) {
+       // Create a priority queue that can handle both Element and 
HashableElement
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               switch va := a.(type) {
+               case *testHashableElement:
+                       switch vb := b.(type) {
+                       case *testHashableElement:
+                               return va.val - vb.val
+                       case *testElement:
+                               return va.val - vb.val
+                       }
+               case *testElement:
+                       switch vb := b.(type) {
+                       case *testHashableElement:
+                               return va.val - vb.val
+                       case *testElement:
+                               return va.val - vb.val
+                       }
+               }
+               return 0
+       }, false)
+
+       // Add mixed element types
+       hashElem1 := &testHashableElement{id: "test", val: 10}
+       hashElem2 := &testHashableElement{id: "test", val: 20} // Same id, 
should be deduplicated
+       regularElem := &testElement{val: 15}
+
+       heap.Push(pq, hashElem1)
+       heap.Push(pq, regularElem)
+       heap.Push(pq, hashElem2) // Should be deduplicated with hashElem1
+
+       // Should have 2 elements: hashElem1 and regularElem
+       if pq.Len() != 2 {
+               t.Errorf("Expected 2 elements with mixed types, got %d", 
pq.Len())
+       }
+}
+
+func TestDedupPriorityQueue_EmptyHeapOperations(t *testing.T) {
+       // Create an empty priority queue
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Test operations on empty heap
+       if pq.Len() != 0 {
+               t.Errorf("Expected empty heap to have length 0, got %d", 
pq.Len())
+       }
+
+       if pq.Peek() != nil {
+               t.Error("Expected Peek() on empty heap to return nil")
+       }
+
+       // Test that Pop panics on empty heap
+       defer func() {
+               if r := recover(); r == nil {
+                       t.Error("Expected Pop() on empty heap to panic")
+               }
+       }()
+       heap.Pop(pq)
+}
+
+func TestDedupPriorityQueue_PriorityOrdering(t *testing.T) {
+       // Test that deduplication doesn't affect priority ordering
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Add elements in random order
+       elem1 := &testHashableElement{id: "high", val: 30}
+       elem2 := &testHashableElement{id: "low", val: 10}
+       elem3 := &testHashableElement{id: "high", val: 40} // Same id as elem1, 
should be deduplicated
+       elem4 := &testHashableElement{id: "medium", val: 20}
+
+       heap.Push(pq, elem1)
+       heap.Push(pq, elem2)
+       heap.Push(pq, elem3) // Should be deduplicated
+       heap.Push(pq, elem4)
+
+       // Should have 3 elements (elem3 deduplicated)
+       if pq.Len() != 3 {
+               t.Errorf("Expected 3 elements after deduplication, got %d", 
pq.Len())
+       }
+
+       // Pop elements and verify they come out in priority order
+       expectedOrder := []int{10, 20, 30} // elem2, elem4, elem1
+       for i, expectedVal := range expectedOrder {
+               if pq.Len() == 0 {
+                       t.Errorf("Expected more elements, but heap is empty at 
position %d", i)
+                       break
+               }
+               popped := heap.Pop(pq).(*testHashableElement)
+               if popped.val != expectedVal {
+                       t.Errorf("Expected value %d at position %d, got %d", 
expectedVal, i, popped.val)
+               }
+       }
+}
+
+func TestDedupPriorityQueue_LargeScaleDeduplication(t *testing.T) {
+       // Test deduplication with many elements
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Add 100 elements, but only 10 unique ids
+       for i := 0; i < 100; i++ {
+               id := "id" + string(rune(i%10+'0')) // id0, id1, ..., id9, id0, 
id1, ...
+               elem := &testHashableElement{id: id, val: i}
+               heap.Push(pq, elem)
+       }
+
+       // Should have only 10 unique elements
+       if pq.Len() != 10 {
+               t.Errorf("Expected 10 unique elements, got %d", pq.Len())
+       }
+
+       // Verify all remaining elements have unique ids
+       seen := make(map[string]bool)
+       for pq.Len() > 0 {
+               elem := heap.Pop(pq).(*testHashableElement)
+               if seen[elem.id] {
+                       t.Errorf("Found duplicate id %s in final heap", elem.id)
+               }
+               seen[elem.id] = true
+       }
+
+       // Should have seen exactly 10 unique ids
+       if len(seen) != 10 {
+               t.Errorf("Expected 10 unique ids, got %d", len(seen))
+       }
+}
+
+func BenchmarkDedupPriorityQueue_Push(b *testing.B) {
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               elem := &testHashableElement{id: "benchmark", val: i}
+               heap.Push(pq, elem)
+       }
+}
+
+func BenchmarkDedupPriorityQueue_Pop(b *testing.B) {
+       pq := NewPriorityQueue(func(a, b interface{}) int {
+               return a.(*testHashableElement).val - 
b.(*testHashableElement).val
+       }, false)
+
+       // Pre-populate with unique elements
+       for i := 0; i < b.N; i++ {
+               elem := &testHashableElement{id: fmt.Sprintf("unique_%d", i), 
val: i}
+               heap.Push(pq, elem)
+       }
+
+       b.ResetTimer()
+       for i := 0; i < b.N && pq.Len() > 0; i++ {
+               heap.Pop(pq)
+       }
+}
diff --git a/pkg/flow/streaming/sliding_window.go 
b/pkg/flow/streaming/sliding_window.go
index ed012084..b049dd5d 100644
--- a/pkg/flow/streaming/sliding_window.go
+++ b/pkg/flow/streaming/sliding_window.go
@@ -142,7 +142,7 @@ func (s *tumblingTimeWindows) flushDueWindows() {
        defer s.timerMu.Unlock()
        for {
                if lookAhead, ok := s.timerHeap.Peek().(*internalTimer); ok {
-                       if lookAhead.triggerTimeMillis <= s.currentWatermark {
+                       if lookAhead.w.MaxTimestamp() <= s.currentWatermark {
                                oldestTimer := 
heap.Pop(s.timerHeap).(*internalTimer)
                                s.flushWindow(oldestTimer.w)
                                continue
@@ -162,37 +162,32 @@ func (s *tumblingTimeWindows) receive() {
        defer s.Done()
 
        for elem := range s.in {
-               assignedWindows, err := s.AssignWindows(elem.TimestampMillis())
+               assignedWindow, err := s.AssignWindows(elem.TimestampMillis())
                if err != nil {
                        s.errorHandler(err)
                        continue
                }
-               ctx := triggerContext{
-                       delegation: s,
+               // drop if the window is late
+               if s.isWindowLate(assignedWindow) {
+                       continue
                }
-               for _, w := range assignedWindows {
-                       // drop if the window is late
-                       if s.isWindowLate(w) {
-                               continue
-                       }
-                       tw := w.(timeWindow)
-                       ctx.window = tw
-                       // add elem to the bucket
-                       if oldAggr, ok := s.snapshots.Get(tw); ok {
-                               
oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem})
-                       } else {
-                               newAggr := s.aggregationFactory()
-                               newAggr.Add([]flow.StreamRecord{elem})
-                               s.snapshots.Add(tw, newAggr)
-                               if e := s.l.Debug(); e.Enabled() {
-                                       e.Stringer("window", tw).Msg("create 
new window")
-                               }
+               tw := assignedWindow.(timeWindow)
+               // add elem to the bucket
+               if oldAggr, ok := s.snapshots.Get(tw); ok {
+                       
oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem})
+               } else {
+                       newAggr := s.aggregationFactory()
+                       newAggr.Add([]flow.StreamRecord{elem})
+                       s.snapshots.Add(tw, newAggr)
+                       if e := s.l.Debug(); e.Enabled() {
+                               e.Stringer("window", tw).Msg("create new 
window")
                        }
+               }
 
-                       result := ctx.OnElement(elem)
-                       if result == fire {
-                               s.flushWindow(tw)
-                       }
+               result := s.eventTimeTriggerOnElement(tw)
+
+               if result == fire {
+                       s.flushWindow(tw)
                }
 
                // even if the incoming elements do not follow strict order,
@@ -260,7 +255,7 @@ func NewTumblingTimeWindows(size time.Duration, 
maxFlushInterval time.Duration)
        return &tumblingTimeWindows{
                windowSize: ws,
                timerHeap: flow.NewPriorityQueue(func(a, b interface{}) int {
-                       return int(a.(*internalTimer).triggerTimeMillis - 
b.(*internalTimer).triggerTimeMillis)
+                       return int(a.(*internalTimer).w.MaxTimestamp() - 
b.(*internalTimer).w.MaxTimestamp())
                }, false),
                in:               make(chan flow.StreamRecord),
                out:              make(chan flow.StreamRecord),
@@ -285,14 +280,12 @@ func (t timeWindow) String() string {
 }
 
 // AssignWindows assigns windows according to the given timestamp.
-func (s *tumblingTimeWindows) AssignWindows(timestamp int64) ([]flow.Window, 
error) {
+func (s *tumblingTimeWindows) AssignWindows(timestamp int64) (flow.Window, 
error) {
        if timestamp > math.MinInt64 {
                start := getWindowStart(timestamp, s.windowSize)
-               return []flow.Window{
-                       timeWindow{
-                               start: start,
-                               end:   start + s.windowSize,
-                       },
+               return timeWindow{
+                       start: start,
+                       end:   start + s.windowSize,
                }, nil
        }
        return nil, errors.New("invalid timestamp from the element")
@@ -305,43 +298,27 @@ func getWindowStart(timestamp, windowSize int64) int64 {
 }
 
 // eventTimeTriggerOnElement processes element(s) with EventTimeTrigger.
-func eventTimeTriggerOnElement(window timeWindow, ctx *triggerContext) 
triggerResult {
-       if window.MaxTimestamp() <= ctx.GetCurrentWatermark() {
+func (s *tumblingTimeWindows) eventTimeTriggerOnElement(window timeWindow) 
triggerResult {
+       if window.MaxTimestamp() <= s.currentWatermark {
                // if watermark is already past the window fire immediately
                return fire
        }
-       ctx.RegisterEventTimeTimer(window.MaxTimestamp())
-       return cont
-}
-
-type triggerContext struct {
-       delegation *tumblingTimeWindows
-       window     timeWindow
-}
-
-func (ctx *triggerContext) GetCurrentWatermark() int64 {
-       return ctx.delegation.currentWatermark
-}
-
-func (ctx *triggerContext) RegisterEventTimeTimer(triggerTime int64) {
-       ctx.delegation.timerMu.Lock()
-       defer ctx.delegation.timerMu.Unlock()
-       heap.Push(ctx.delegation.timerHeap, &internalTimer{
-               triggerTimeMillis: triggerTime,
-               w:                 ctx.window,
+       s.timerMu.Lock()
+       defer s.timerMu.Unlock()
+       heap.Push(s.timerHeap, &internalTimer{
+               w: window,
        })
+       return cont
 }
 
-func (ctx *triggerContext) OnElement(_ flow.StreamRecord) triggerResult {
-       return eventTimeTriggerOnElement(ctx.window, ctx)
-}
-
-var _ flow.Element = (*internalTimer)(nil)
+var (
+       _ flow.Element         = (*internalTimer)(nil)
+       _ flow.HashableElement = (*internalTimer)(nil)
+)
 
 type internalTimer struct {
-       w                 timeWindow
-       triggerTimeMillis int64
-       index             int
+       w     timeWindow
+       index int
 }
 
 func (t *internalTimer) GetIndex() int {
@@ -351,3 +328,14 @@ func (t *internalTimer) GetIndex() int {
 func (t *internalTimer) SetIndex(idx int) {
        t.index = idx
 }
+
+func (t *internalTimer) Equal(other flow.HashableElement) bool {
+       if otherTimer, ok := other.(*internalTimer); ok {
+               return t.w.start == otherTimer.w.start && t.w.end == 
otherTimer.w.end
+       }
+       return false
+}
+
+func (t *internalTimer) Hash() uint64 {
+       return uint64(t.w.start)<<32 | uint64(t.w.end)
+}
diff --git a/pkg/flow/streaming/sliding_window_test.go 
b/pkg/flow/streaming/sliding_window_test.go
index cee7973d..28077bc8 100644
--- a/pkg/flow/streaming/sliding_window_test.go
+++ b/pkg/flow/streaming/sliding_window_test.go
@@ -18,6 +18,7 @@
 package streaming
 
 import (
+       "container/heap"
        "context"
        "time"
 
@@ -136,4 +137,140 @@ var _ = g.Describe("Sliding Window", func() {
                        
}).WithTimeout(flags.EventuallyTimeout).Should(gomega.Succeed())
                })
        })
+
+       g.Describe("Timer Heap Deduplication", func() {
+               var timerHeap *flow.DedupPriorityQueue
+
+               g.BeforeEach(func() {
+                       timerHeap = flow.NewPriorityQueue(func(a, b 
interface{}) int {
+                               return int(a.(*internalTimer).w.MaxTimestamp() 
- b.(*internalTimer).w.MaxTimestamp())
+                       }, false)
+               })
+
+               g.It("Should deduplicate same reference internalTimer objects", 
func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000},
+                       }
+
+                       // Push the same reference twice
+                       heap.Push(timerHeap, timer1)
+                       heap.Push(timerHeap, timer1)
+
+                       // Should only have one item due to reference-based 
deduplication
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1))
+               })
+
+               g.It("Should deduplicate different internalTimer objects with 
same window content", func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000},
+                       }
+                       timer2 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // Same 
window content
+                       }
+
+                       // Push different objects with same content
+                       heap.Push(timerHeap, timer1)
+                       heap.Push(timerHeap, timer2)
+
+                       // Should only have one item due to content-based 
deduplication
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(1))
+               })
+
+               g.It("Should not deduplicate internalTimer objects with 
different windows", func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000},
+                       }
+                       timer2 := &internalTimer{
+                               w: timeWindow{start: 2000, end: 3000}, // 
Different window
+                       }
+
+                       // Push different objects with different content
+                       heap.Push(timerHeap, timer1)
+                       heap.Push(timerHeap, timer2)
+
+                       // Should have two items as they have different content
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+               })
+
+               g.It("Should maintain proper ordering after deduplication", 
func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 3000, end: 4000}, // Later 
timestamp
+                       }
+                       timer2 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // 
Earlier timestamp
+                       }
+                       timer3 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // 
Duplicate of timer2
+                       }
+
+                       // Push in order: later, earlier, duplicate
+                       heap.Push(timerHeap, timer1)
+                       heap.Push(timerHeap, timer2)
+                       heap.Push(timerHeap, timer3) // Should be deduplicated
+
+                       // Should only have 2 items
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+
+                       // Peek should return the earliest timer (timer2)
+                       earliest := timerHeap.Peek().(*internalTimer)
+                       
gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000)))
+                       
gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000)))
+               })
+
+               g.It("Should verify Hash and Equal methods work correctly", 
func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000},
+                       }
+                       timer2 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // Same 
content
+                       }
+                       timer3 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 3000}, // 
Different end
+                       }
+
+                       // Test Hash method
+                       
gomega.Expect(timer1.Hash()).Should(gomega.Equal(timer2.Hash()))
+                       
gomega.Expect(timer1.Hash()).ShouldNot(gomega.Equal(timer3.Hash()))
+
+                       // Test Equal method
+                       
gomega.Expect(timer1.Equal(timer2)).Should(gomega.BeTrue())
+                       
gomega.Expect(timer1.Equal(timer3)).Should(gomega.BeFalse())
+               })
+
+               g.It("Should work with heap operations", func() {
+                       timer1 := &internalTimer{
+                               w: timeWindow{start: 3000, end: 4000}, // Later
+                       }
+                       timer2 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // 
Earlier
+                       }
+                       timer3 := &internalTimer{
+                               w: timeWindow{start: 1000, end: 2000}, // 
Duplicate of timer2
+                       }
+
+                       // Initialize heap
+                       heap.Init(timerHeap)
+
+                       // Push timers
+                       heap.Push(timerHeap, timer1)
+                       heap.Push(timerHeap, timer2)
+                       heap.Push(timerHeap, timer3) // Should be deduplicated
+
+                       // Should only have 2 items
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(2))
+
+                       // Pop should return earliest first
+                       earliest := heap.Pop(timerHeap).(*internalTimer)
+                       
gomega.Expect(earliest.w.start).Should(gomega.Equal(int64(1000)))
+                       
gomega.Expect(earliest.w.end).Should(gomega.Equal(int64(2000)))
+
+                       // Next should be the later timer
+                       later := heap.Pop(timerHeap).(*internalTimer)
+                       
gomega.Expect(later.w.start).Should(gomega.Equal(int64(3000)))
+                       
gomega.Expect(later.w.end).Should(gomega.Equal(int64(4000)))
+
+                       // Heap should be empty now
+                       gomega.Expect(timerHeap.Len()).Should(gomega.Equal(0))
+               })
+       })
 })
diff --git a/pkg/flow/types.go b/pkg/flow/types.go
index 6e8576a4..20bf4c50 100644
--- a/pkg/flow/types.go
+++ b/pkg/flow/types.go
@@ -73,7 +73,7 @@ type Window interface {
 type WindowAssigner interface {
        // AssignWindows assigns a slice of Window according to the given 
timestamp, e.g. eventTime.
        // The unit of the timestamp here is MilliSecond.
-       AssignWindows(timestamp int64) ([]Window, error)
+       AssignWindows(timestamp int64) (Window, error)
 }
 
 // AggregationOp defines the stateful operation for aggregation.

Reply via email to