This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new d48a810f Improve round robin selector (#500)
d48a810f is described below
commit d48a810f8cca8b66d7b3b179f36090d78f46e12c
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 29 09:11:59 2024 +0800
Improve round robin selector (#500)
---
banyand/internal/storage/segment.go | 8 ++
docs/concept/clustering.md | 28 +++++-
pkg/cmdsetup/liaison.go | 4 +-
pkg/node/interface.go | 12 ++-
pkg/node/maglev.go | 9 +-
pkg/node/round_robin.go | 183 ++++++++++++++++++++----------------
pkg/node/round_robin_test.go | 73 ++++++++------
test/docker/base-compose.yml | 14 +--
test/e2e-v2/script/env | 6 +-
test/stress/classic/env | 6 +-
test/stress/classic/env.dev | 6 +-
test/stress/trace/env | 2 +-
12 files changed, 216 insertions(+), 135 deletions(-)
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 9153ce3e..0d780342 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -52,6 +52,7 @@ type segment[T TSTable, O any] struct {
refCount int32
mustBeDeleted uint32
id segmentID
+ mu sync.Mutex
}
func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime,
endTime time.Time, path, suffix string,
@@ -118,6 +119,8 @@ func (s *segment[T, O]) DecRef() {
if n > 0 {
return
}
+ s.mu.Lock()
+ defer s.mu.Unlock()
deletePath := ""
if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
@@ -146,6 +149,11 @@ func (s *segment[T, O]) delete() {
}
func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) {
+ if s, ok := s.getShard(id); ok {
+ return s.table, nil
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
if s, ok := s.getShard(id); ok {
return s.table, nil
}
diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md
index 3f49cda1..2959c646 100644
--- a/docs/concept/clustering.md
+++ b/docs/concept/clustering.md
@@ -97,11 +97,33 @@ Futhermore, the storage system might be cheaper. For
instance, S3 can be more co
### 5.2 Data Sharding
-Data distribution across the cluster is determined based on the `shard_num`
setting for a group and the specified `entity` in each resource, be it a stream
or measure. The resource’s `name` with its `entity` is the sharding key,
guiding data distribution to the appropriate Data Node during write operations.
+Data distribution across the cluster is determined by the `shard_num` setting
for a group and the specified `entity` in each resource, whether it is a stream
or a measure. The combination of the resource’s `name` and its `entity` forms
the sharding key, which guides data distribution to the appropriate Data Node
during write operations.
-Liaison Nodes retrieve shard mapping information from Meta Nodes to achieve
efficient data routing. This information is used to route data to the
appropriate Data Nodes based on the sharding key of the data.
+For example, if a group has 5 shards, the data is distributed across these
shards based on the sharding key:
-This sharding strategy ensures the write load is evenly distributed across the
cluster, enhancing write performance and overall system efficiency. BanyanDB
uses a hash algorithm for sharding. The hash function maps the sharding key
(resource name and entity) to a node in the cluster. Each shard is assigned to
the node returned by the hash function.
+- Group `measure-minute` Shard 0
+- Group `measure-minute` Shard 1
+- Group `stream-log` Shard 0
+- Group `stream-log` Shard 1
+- Group `stream-log` Shard 2
+
+A measure named `service_cpm` belonging to `measure-minute` with an entity of
`service_id` "frontend" will be written to a specific shard based on the hashed
value of the sharding key `service_cpm:frontend`.
+
+Similarly, a stream named `system_log` belonging to `stream-log` with an
entity combination of `service_id` "frontend" and `instance_id` "10.0.0.1" will
be written to a specific shard based on the hashed value of the sharding key
`system_log:frontend|10.0.0.1`.
+
+> Note: If there are ":" or "|" in the entity, they will be prefixed with a
backslash "\".
+
+Liaison Nodes play a crucial role in this process by retrieving the `Group`
list from Meta Nodes. This information is essential for efficient data routing,
as it allows Liaison Nodes to direct data to the appropriate Data Nodes based
on the sharding key.
+
+This sharding strategy ensures that the write load is evenly distributed
across the cluster, thereby enhancing write performance and overall system
efficiency. BanyanDB sorts the shards by the `Group` name and the shard ID,
then assigns the shards to the Data Nodes in a round-robin fashion. This method
guarantees an even distribution of data across the cluster, preventing any
single node from becoming a bottleneck.
+
+For example, consider a group with 5 shards and a cluster with 3 Data Nodes.
The shards are distributed as follows:
+
+- Group `measure-minute` Shard 0: Data Node 1
+- Group `measure-minute` Shard 1: Data Node 2
+- Group `stream-log` Shard 0: Data Node 3
+- Group `stream-log` Shard 1: Data Node 1
+- Group `stream-log` Shard 2: Data Node 2
### 5.3 Data Write Path
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 460c8544..66f3da41 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
- nodeSel := node.NewRoundRobinSelector()
+ nodeSel := node.NewRoundRobinSelector(metaSvc)
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
profSvc := observability.NewProfService()
@@ -62,6 +62,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
metaSvc,
localPipeline,
pipeline,
+ nodeSel,
dQuery,
grpcServer,
httpServer,
@@ -77,7 +78,6 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
- defer nodeSel.Close()
node, err := common.GenerateNode(grpcServer.GetPort(),
httpServer.GetPort())
if err != nil {
return err
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 1a4f5882..0f41aab5 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -19,12 +19,14 @@
package node
import (
+ "context"
"sync"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
var (
@@ -39,7 +41,7 @@ type Selector interface {
AddNode(node *databasev1.Node)
RemoveNode(node *databasev1.Node)
Pick(group, name string, shardID uint32) (string, error)
- Close()
+ run.PreRunner
}
// NewPickFirstSelector returns a simple selector that always returns the
first node if exists.
@@ -56,7 +58,13 @@ type pickFirstSelector struct {
mu sync.RWMutex
}
-func (p *pickFirstSelector) Close() {}
+func (p *pickFirstSelector) PreRun(context.Context) error {
+ return nil
+}
+
+func (p *pickFirstSelector) Name() string {
+ return "pick-first"
+}
func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
nodeID := node.GetMetadata().GetName()
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index 4ae21740..34a855f6 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -18,6 +18,7 @@
package node
import (
+ "context"
"sort"
"strconv"
"sync"
@@ -37,7 +38,13 @@ type maglevSelector struct {
mutex sync.RWMutex
}
-func (m *maglevSelector) Close() {}
+func (m *maglevSelector) Name() string {
+ return "maglev-selector"
+}
+
+func (m *maglevSelector) PreRun(context.Context) error {
+ return nil
+}
func (m *maglevSelector) AddNode(node *databasev1.Node) {
m.mutex.Lock()
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
index da85c210..2815b19c 100644
--- a/pkg/node/round_robin.go
+++ b/pkg/node/round_robin.go
@@ -18,49 +18,107 @@
package node
import (
+ "context"
"fmt"
"slices"
"sort"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/pkg/errors"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
-)
-
-const (
- expiredKeyCleanupInterval = 1 * time.Hour
- keyTTL = 24 * time.Hour
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
type roundRobinSelector struct {
- clock timestamp.Clock
- closeCh chan struct{}
- lookupTable sync.Map
- nodes []string
- mu sync.RWMutex
- once sync.Once
- tMu sync.Mutex
-}
-
-func (r *roundRobinSelector) Close() {
- close(r.closeCh)
+ schemaRegistry metadata.Repo
+ closeCh chan struct{}
+ lookupTable sync.Map
+ nodes []string
+ mu sync.RWMutex
}
// NewRoundRobinSelector creates a new round-robin selector.
-func NewRoundRobinSelector() Selector {
+func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector {
rrs := &roundRobinSelector{
- nodes: make([]string, 0),
- clock: timestamp.NewClock(),
- closeCh: make(chan struct{}),
+ nodes: make([]string, 0),
+ closeCh: make(chan struct{}),
+ schemaRegistry: schemaRegistry,
}
return rrs
}
+func (r *roundRobinSelector) Name() string {
+ return "round-robin-selector"
+}
+
+func (r *roundRobinSelector) PreRun(context.Context) error {
+ r.schemaRegistry.RegisterHandler("liaison", schema.KindGroup, r)
+ return nil
+}
+
+func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata schema.Metadata) {
+ if schemaMetadata.Kind != schema.KindGroup {
+ return
+ }
+ group, ok := schemaMetadata.Spec.(*commonv1.Group)
+ if !ok || !validateGroup(group) {
+ return
+ }
+ for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
+ k := key{group: group.Metadata.Name, shardID: i}
+ r.lookupTable.Store(k, 0)
+ }
+ r.sortEntries()
+}
+
+func (r *roundRobinSelector) OnDelete(schemaMetadata schema.Metadata) {
+ if schemaMetadata.Kind != schema.KindGroup {
+ return
+ }
+ group := schemaMetadata.Spec.(*commonv1.Group)
+ for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
+ k := key{group: group.Metadata.Name, shardID: i}
+ r.lookupTable.Delete(k)
+ }
+ r.sortEntries()
+}
+
+func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) {
+ if len(kinds) != 1 {
+ return false, nil
+ }
+ if kinds[0] != schema.KindGroup {
+ return false, nil
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ gg, err := r.schemaRegistry.GroupRegistry().ListGroup(ctx)
+ if err != nil {
+ panic(fmt.Sprintf("failed to list group: %v", err))
+ }
+ var revision int64
+ r.lookupTable = sync.Map{}
+ for _, g := range gg {
+ if !validateGroup(g) {
+ continue
+ }
+ if g.Metadata.ModRevision > revision {
+ revision = g.Metadata.ModRevision
+ }
+ for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ {
+ k := key{group: g.Metadata.Name, shardID: i}
+ r.lookupTable.Store(k, 0)
+ }
+ }
+ r.sortEntries()
+ return true, []int64{revision}
+}
+
func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
r.mu.Lock()
defer r.mu.Unlock()
@@ -90,13 +148,11 @@ func (r *roundRobinSelector) Pick(group, _ string, shardID
uint32) (string, erro
if ok {
return r.selectNode(entry), nil
}
- r.tMu.Lock()
- defer r.tMu.Unlock()
- if entry, ok := r.lookupTable.Load(k); ok {
- return r.selectNode(entry), nil
- }
+ return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID)
+}
- keys := []key{k}
+func (r *roundRobinSelector) sortEntries() {
+ var keys []key
r.lookupTable.Range(func(k, _ any) bool {
keys = append(keys, k.(key))
return true
@@ -109,69 +165,30 @@ func (r *roundRobinSelector) Pick(group, _ string,
shardID uint32) (string, erro
return int(a.shardID) - int(b.shardID)
})
for i := range keys {
- if entry, ok := r.lookupTable.Load(keys[i]); ok {
- entry.(*tableEntry).index = i
- } else {
- r.lookupTable.Store(keys[i], r.newTableEntry(i))
- }
- }
- r.once.Do(r.startCleanupTicker)
- if entry, ok := r.lookupTable.Load(k); ok {
- return r.selectNode(entry), nil
+ r.lookupTable.Store(keys[i], i)
}
- panic(fmt.Sprintf("key %v not found", k))
}
-func (r *roundRobinSelector) selectNode(entry any) string {
- e := entry.(*tableEntry)
- now := r.clock.Now()
- e.lastAccess.Store(&now)
- return r.nodes[e.index%len(r.nodes)]
-}
-
-type key struct {
- group string
- shardID uint32
+func (r *roundRobinSelector) Close() {
+ close(r.closeCh)
}
-type tableEntry struct {
- lastAccess *atomic.Pointer[time.Time]
- index int
+func (r *roundRobinSelector) selectNode(entry any) string {
+ index := entry.(int)
+ return r.nodes[index%len(r.nodes)]
}
-func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
- p := atomic.Pointer[time.Time]{}
- now := r.clock.Now()
- p.Store(&now)
- return &tableEntry{
- index: index,
- lastAccess: &p,
+func validateGroup(group *commonv1.Group) bool {
+ if group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
+ return false
}
+ if group.ResourceOpts == nil {
+ return false
+ }
+ return true
}
-func (r *roundRobinSelector) cleanupExpiredEntries() {
- now := r.clock.Now()
- r.tMu.Lock()
- defer r.tMu.Unlock()
-
- r.lookupTable.Range(func(k, value any) bool {
- e := value.(*tableEntry)
- if now.Sub(*e.lastAccess.Load()) > keyTTL {
- r.lookupTable.Delete(k)
- }
- return true
- })
-}
-
-func (r *roundRobinSelector) startCleanupTicker() {
- ticker := r.clock.Ticker(expiredKeyCleanupInterval)
- go func() {
- select {
- case <-r.closeCh:
- ticker.Stop()
- return
- case <-ticker.C:
- r.cleanupExpiredEntries()
- }
- }()
+type key struct {
+ group string
+ shardID uint32
}
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
index 69c1822a..9e8037c5 100644
--- a/pkg/node/round_robin_test.go
+++ b/pkg/node/round_robin_test.go
@@ -19,23 +19,33 @@ package node
import (
"testing"
- "time"
"github.com/stretchr/testify/assert"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
func TestPickEmptySelector(t *testing.T) {
- selector := NewRoundRobinSelector()
+ selector := NewRoundRobinSelector(nil)
+ setupGroup(selector)
_, err := selector.Pick("group1", "", 0)
assert.Error(t, err)
}
+func TestPickUnknownGroup(t *testing.T) {
+ selector := NewRoundRobinSelector(nil)
+ _, err := selector.Pick("group1", "", 0)
+ assert.Error(t, err)
+ setupGroup(selector)
+ _, err = selector.Pick("group1", "", 100)
+ assert.Error(t, err)
+}
+
func TestPickSingleSelection(t *testing.T) {
- selector := NewRoundRobinSelector()
+ selector := NewRoundRobinSelector(nil)
+ setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
node, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
@@ -43,13 +53,12 @@ func TestPickSingleSelection(t *testing.T) {
}
func TestPickMultipleSelections(t *testing.T) {
- selector := NewRoundRobinSelector()
+ selector := NewRoundRobinSelector(nil)
+ setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
- // load data
- _, err := selector.Pick("group1", "", 0)
- assert.NoError(t, err)
- _, err = selector.Pick("group1", "", 1)
+
+ _, err := selector.Pick("group1", "", 1)
assert.NoError(t, err)
node1, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
@@ -59,7 +68,8 @@ func TestPickMultipleSelections(t *testing.T) {
}
func TestPickNodeRemoval(t *testing.T) {
- selector := NewRoundRobinSelector()
+ selector := NewRoundRobinSelector(nil)
+ setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
@@ -69,7 +79,8 @@ func TestPickNodeRemoval(t *testing.T) {
}
func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
- selector := NewRoundRobinSelector()
+ selector := NewRoundRobinSelector(nil)
+ setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node3"}})
@@ -86,27 +97,35 @@ func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
assert.Equal(t, "node3", node)
}
-func TestCleanupExpiredEntries(t *testing.T) {
- mc := timestamp.NewMockClock()
- mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+func TestCleanupGroup(t *testing.T) {
selector := &roundRobinSelector{
nodes: make([]string, 0),
- clock: mc,
}
+ setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
_, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
- _, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
- assert.True(t, ok)
- mc.Add(25 * time.Hour)
- _, err = selector.Pick("group1", "", 1)
- assert.NoError(t, err)
- _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
- assert.True(t, ok)
- selector.cleanupExpiredEntries()
- _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
- assert.False(t, ok)
- _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
- assert.True(t, ok)
+ selector.OnDelete(groupSchema)
+ _, err = selector.Pick("group1", "", 0)
+ assert.Error(t, err)
+}
+
+var groupSchema = schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindGroup,
+ },
+ Spec: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: "group1",
+ },
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ },
+ },
+}
+
+func setupGroup(selector Selector) {
+ selector.(*roundRobinSelector).OnAddOrUpdate(groupSchema)
}
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index ac7d0c33..d0aec238 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -22,8 +22,8 @@ services:
command: standalone
healthcheck:
test: ["CMD", "./bydbctl", "health", "--config=-",
"--addr=http://banyandb:17913"]
- interval: 30s
- timeout: 30s
+ interval: 5s
+ timeout: 120s
retries: 120
liaison:
@@ -35,8 +35,8 @@ services:
command: liaison --etcd-endpoints=http://etcd:2379
healthcheck:
test: ["CMD", "./bydbctl", "health", "--addr=http://liaison:17913"]
- interval: 30s
- timeout: 30s
+ interval: 5s
+ timeout: 120s
retries: 120
data:
@@ -48,8 +48,8 @@ services:
command: data --etcd-endpoints=http://etcd:2379
healthcheck:
test: ["CMD", "./bydbctl", "health", "--grpc-addr=data:17912"]
- interval: 30s
- timeout: 30s
+ interval: 5s
+ timeout: 120s
retries: 120
agent:
@@ -73,7 +73,7 @@ services:
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/11800"]
interval: 5s
- timeout: 60s
+ timeout: 120s
retries: 120
ui:
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index e16487d1..971a4327 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -21,9 +21,9 @@ SW_AGENT_GO_COMMIT=774a6d56baba1187eb03bf1861af542c923b3dff
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
-SW_KUBERNETES_COMMIT_SHA=3eb9fc8235fd3a773650d1f08adfccfd5d0745fc
+SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468
-SW_OAP_COMMIT=9c5f4ab6950dbc9ea3477e3301550adb49528af2
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
+SW_OAP_COMMIT=ceaa88f378d40bedf40e40c33169b9804fc96aae
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=5d0bd0569b23bcd40a3c9ea0898e27d6aefd1e84
diff --git a/test/stress/classic/env b/test/stress/classic/env
index e7a51146..cd0c184c 100644
--- a/test/stress/classic/env
+++ b/test/stress/classic/env
@@ -22,11 +22,11 @@ SW_AGENT_GO_COMMIT=774a6d56baba1187eb03bf1861af542c923b3dff
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
-SW_KUBERNETES_COMMIT_SHA=3eb9fc8235fd3a773650d1f08adfccfd5d0745fc
+SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468
-SW_OAP_COMMIT=9c5f4ab6950dbc9ea3477e3301550adb49528af2
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
+SW_OAP_COMMIT=ceaa88f378d40bedf40e40c33169b9804fc96aae
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=5d0bd0569b23bcd40a3c9ea0898e27d6aefd1e84
VUS=10
diff --git a/test/stress/classic/env.dev b/test/stress/classic/env.dev
index 2193d9d0..461d042a 100644
--- a/test/stress/classic/env.dev
+++ b/test/stress/classic/env.dev
@@ -21,11 +21,11 @@ SW_AGENT_GO_COMMIT=774a6d56baba1187eb03bf1861af542c923b3dff
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
-SW_KUBERNETES_COMMIT_SHA=3eb9fc8235fd3a773650d1f08adfccfd5d0745fc
+SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468
-SW_OAP_COMMIT=9c5f4ab6950dbc9ea3477e3301550adb49528af2
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9
+SW_OAP_COMMIT=ceaa88f378d40bedf40e40c33169b9804fc96aae
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=5d0bd0569b23bcd40a3c9ea0898e27d6aefd1e84
VUS=1
diff --git a/test/stress/trace/env b/test/stress/trace/env
index 957ddfa2..c3a08b7c 100644
--- a/test/stress/trace/env
+++ b/test/stress/trace/env
@@ -14,4 +14,4 @@
# limitations under the License.
-SW_OAP_COMMIT=9c5f4ab6950dbc9ea3477e3301550adb49528af2
+SW_OAP_COMMIT=ceaa88f378d40bedf40e40c33169b9804fc96aae