yangwwei opened a new pull request #303: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/303
### What is this PR for? It is found that occasionally the scheduler could crash due to concurrent map r/w error. The root cause is because during the evaluation of the predicates, we pass a nodesInfo map as a reference to the K8s side, and when concurrently there is a node being add/remove to the cluster, it could possibly cause the concurrent read/write issue. This PR is to fix that by passing a copy of nodes into. ### What type of PR is it? * [x] - Bug Fix * [ ] - Improvement * [ ] - Feature * [ ] - Documentation * [ ] - Hot Fix * [ ] - Refactoring ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/YUNIKORN-848 ### How should this be tested? I've reproduced this locally with the following code (apply this to `predicator_test.go`): ``` func TestConcurrentAccess(t *testing.T) { conf.GetSchedulerConf().SetTestMode(true) events.SetRecorderForTest(events.NewMockedRecorder()) predictor := newPredictorInternal(&factory.PluginFactoryArgs{}, schedulerapi.Policy{ Predicates: []schedulerapi.PredicatePolicy{ {Name: predicates.MatchInterPodAffinityPred}, }}) pod1 := v1.Pod{ Spec: v1.PodSpec{ NodeName: "machine1", Affinity: &v1.Affinity{ PodAffinity: &v1.PodAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ { LabelSelector: metav1.SetAsLabelSelector(labels.Set{ "app": "xyz", }), }, }, }, }, }, } node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} podsOnNode := []*v1.Pod{ &pod1, } nodeInfo := deschedulernode.NewNodeInfo(podsOnNode...) _ = nodeInfo.SetNode(&node1) nodeInfoMap := map[string]*deschedulernode.NodeInfo{ node1.Name: nodeInfo, } cache := &nodesCacheForTest{ nodesMap: nodeInfoMap, } go func() { for { meta := predictor.GetPredicateMeta(&pod1, cache.getNodesMap()) if err := predictor.Predicates(&pod1, meta, nodeInfo, true); err != nil { //fmt.Println(err.Error()) } } }() i := 0 go func() { for { time.Sleep(1 * time.Millisecond) nodeName := fmt.Sprintf("machine%d", i) t.Logf("adding node %s", nodeName) newNodeInfo := deschedulernode.NewNodeInfo() nodeN := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} _ = newNodeInfo.SetNode(&nodeN) cache.addNode(nodeN.Name, newNodeInfo) i++ } }() time.Sleep(10 * time.Second) fmt.Println(len(nodeInfoMap)) } type nodesCacheForTest struct { nodesMap map[string]*deschedulernode.NodeInfo sync.RWMutex } func (c *nodesCacheForTest) getNodesMap() map[string]*deschedulernode.NodeInfo { return c.nodesMap } func (c *nodesCacheForTest) getNodesMapCopy() map[string]*deschedulernode.NodeInfo { c.RLock() defer c.RUnlock() newMap := make(map[string]*deschedulernode.NodeInfo, len(c.nodesMap)) for k, v := range c.nodesMap { newMap[k] = v.Clone() } return newMap } func (c *nodesCacheForTest) addNode(name string, nodeInfo *deschedulernode.NodeInfo) { c.Lock() defer c.Unlock() c.nodesMap[name] = nodeInfo } ``` Note, I set 1ms to update the map in order to trigger this in UT code. I got the stack trace: ``` fatal error: concurrent map read and map write goroutine 1248 [running]: runtime.throw(0x23ca940, 0x21) /Users/wyang/go1.15/src/runtime/panic.go:1116 +0x72 fp=0xc000368c90 sp=0xc000368c60 pc=0x1038ed2 runtime.mapaccess1_faststr(0x21bf6a0, 0xc0005a22a0, 0x23aa919, 0x8, 0xc00030bb70) /Users/wyang/go1.15/src/runtime/map_faststr.go:21 +0x465 fp=0xc000368d00 sp=0xc000368c90 pc=0x1015fc5 k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.getTPMapMatchingIncomingAffinityAntiAffinity.func2(0x0) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:715 +0xc5 fp=0xc000368f50 sp=0xc000368d00 pc=0x201ab45 k8s.io/client-go/util/workqueue.ParallelizeUntil.func1(0xc0007a2a60, 0xc0007cbb80, 0xc000311c10, 0xc0007e8750) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:57 +0x8d fp=0xc000368fc0 sp=0xc000368f50 pc=0x1eacccd runtime.goexit() /Users/wyang/go1.15/src/runtime/asm_amd64.s:1374 +0x1 fp=0xc000368fc8 sp=0xc000368fc0 pc=0x1070c21 created by k8s.io/client-go/util/workqueue.ParallelizeUntil /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:49 +0x14a goroutine 1 [chan receive]: testing.(*T).Run(0xc000502900, 0x23b90c8, 0x14, 0x2452260, 0x1096401) /Users/wyang/go1.15/src/testing/testing.go:1160 +0x3ad testing.runTests.func1(0xc000582d80) /Users/wyang/go1.15/src/testing/testing.go:1430 +0x78 testing.tRunner(0xc000582d80, 0xc000755de0) /Users/wyang/go1.15/src/testing/testing.go:1108 +0xef testing.runTests(0xc0000dc4a0, 0x2f499a0, 0xa, 0xa, 0x0, 0x0, 0x0, 0x100fc48) /Users/wyang/go1.15/src/testing/testing.go:1428 +0x2e8 testing.(*M).Run(0xc00050e400, 0x0) /Users/wyang/go1.15/src/testing/testing.go:1338 +0x245 main.main() _testmain.go:61 +0x138 goroutine 19 [chan receive]: k8s.io/klog.(*loggingT).flushDaemon(0x2f5f480) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/klog.go:1010 +0x8b created by k8s.io/klog.init.0 /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/klog.go:411 +0xd8 goroutine 8 [chan receive]: k8s.io/klog/v2.(*loggingT).flushDaemon(0x2f5f560) /Users/wyang/gopath/pkg/mod/k8s.io/klog/[email protected]/klog.go:1107 +0x8b created by k8s.io/klog/v2.init.0 /Users/wyang/gopath/pkg/mod/k8s.io/klog/[email protected]/klog.go:416 +0xd8 goroutine 54 [sleep]: time.Sleep(0x2540be400) /Users/wyang/go1.15/src/runtime/time.go:188 +0xbf github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess(0xc000502900) /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2203 +0x54d testing.tRunner(0xc000502900, 0x2452260) /Users/wyang/go1.15/src/testing/testing.go:1108 +0xef created by testing.(*T).Run /Users/wyang/go1.15/src/testing/testing.go:1159 +0x386 goroutine 55 [semacquire]: sync.runtime_Semacquire(0xc0007a2a68) /Users/wyang/go1.15/src/runtime/sema.go:56 +0x45 sync.(*WaitGroup).Wait(0xc0007a2a60) /Users/wyang/go1.15/src/sync/waitgroup.go:130 +0x65 k8s.io/client-go/util/workqueue.ParallelizeUntil(0x258a060, 0xc0007e2fc0, 0x4, 0x4, 0xc0007e8750) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:62 +0x16d k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.getTPMapMatchingIncomingAffinityAntiAffinity(0xc0001b0000, 0xc0005a22a0, 0xc0007e6670, 0x0, 0x0, 0xc0007e6650) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:754 +0x6c5 k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.(*PredicateMetadataFactory).GetMetadata(0xc000262370, 0xc0001b0000, 0xc0005a22a0, 0x2550320, 0xc0007e6650) /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:222 +0x1ee github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.(*Predictor).GetPredicateMeta(...) /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predictor.go:245 github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess.func1(0xc0002c44d0, 0xc0001b0000, 0xc00000e100, 0xc00030ba00) /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2182 +0x43 created by github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2180 +0x4ee goroutine 56 [sleep]: time.Sleep(0xf4240) /Users/wyang/go1.15/src/runtime/time.go:188 +0xbf github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess.func2(0xc00051a088, 0xc000502900, 0xc00000e100) /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2192 +0x86 created by github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess /Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2190 +0x53a Process finished with the exit code 1 ``` to fix this, the simplest, most straightforward solution is to make a copy of the map. See `getNodesMapCopy()`. after using this method, the error is gone. This will bring some performance decrease when we do this, simulated nodes copy from 1000 to 10000, the perf decrease is somewhere near: - 1000: 600+ µs - 10000: 6+ ms ### Screenshots (if appropriate) ### Questions: * [ ] - The licenses files need update. * [ ] - There is breaking changes for older versions. * [ ] - It needs documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
