yangwwei opened a new pull request #303:
URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/303


   ### What is this PR for?
   It is found that occasionally the scheduler could crash due to concurrent 
map r/w error. The root cause is because during the evaluation of the 
predicates, we pass a nodesInfo map as a reference to the K8s side, and when 
concurrently there is a node being add/remove to the cluster, it could possibly 
cause the concurrent read/write issue. This PR is to fix that by passing a copy 
of nodes into.
   
   ### What type of PR is it?
   * [x] - Bug Fix
   * [ ] - Improvement
   * [ ] - Feature
   * [ ] - Documentation
   * [ ] - Hot Fix
   * [ ] - Refactoring
   
   ### Todos
   * [ ] - Task
   
   ### What is the Jira issue?
   https://issues.apache.org/jira/browse/YUNIKORN-848
   
   ### How should this be tested?
   I've reproduced this locally with the following code (apply this to 
`predicator_test.go`):
   
   ```
   func TestConcurrentAccess(t *testing.T) {
        conf.GetSchedulerConf().SetTestMode(true)
        events.SetRecorderForTest(events.NewMockedRecorder())
        predictor := newPredictorInternal(&factory.PluginFactoryArgs{}, 
schedulerapi.Policy{
                Predicates: []schedulerapi.PredicatePolicy{
                        {Name: predicates.MatchInterPodAffinityPred},
                }})
   
        pod1 := v1.Pod{
                Spec: v1.PodSpec{
                        NodeName: "machine1",
                        Affinity: &v1.Affinity{
                                PodAffinity: &v1.PodAffinity{
                                        
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
                                                {
                                                        LabelSelector: 
metav1.SetAsLabelSelector(labels.Set{
                                                                "app": "xyz",
                                                        }),
                                                },
                                        },
                                },
                        },
                },
        }
   
        node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
        podsOnNode := []*v1.Pod{
                &pod1,
        }
        nodeInfo := deschedulernode.NewNodeInfo(podsOnNode...)
        _ = nodeInfo.SetNode(&node1)
        nodeInfoMap := map[string]*deschedulernode.NodeInfo{
                node1.Name: nodeInfo,
        }
   
        cache := &nodesCacheForTest{
                nodesMap: nodeInfoMap,
        }
   
   
        go func() {
                for {
                        meta := predictor.GetPredicateMeta(&pod1, 
cache.getNodesMap())
                        if err := predictor.Predicates(&pod1, meta, nodeInfo, 
true); err != nil {
                                //fmt.Println(err.Error())
                        }
                }
        }()
   
        i := 0
        go func() {
                for {
                        time.Sleep(1 * time.Millisecond)
                        nodeName := fmt.Sprintf("machine%d", i)
                        t.Logf("adding node %s", nodeName)
                        newNodeInfo := deschedulernode.NewNodeInfo()
                        nodeN := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: 
nodeName}}
                        _ = newNodeInfo.SetNode(&nodeN)
                        cache.addNode(nodeN.Name, newNodeInfo)
                        i++
                }
        }()
   
        time.Sleep(10 * time.Second)
        fmt.Println(len(nodeInfoMap))
   }
   
   type nodesCacheForTest struct {
        nodesMap map[string]*deschedulernode.NodeInfo
        sync.RWMutex
   }
   
   func (c *nodesCacheForTest) getNodesMap() 
map[string]*deschedulernode.NodeInfo {
        return c.nodesMap
   }
   
   func (c *nodesCacheForTest) getNodesMapCopy() 
map[string]*deschedulernode.NodeInfo {
        c.RLock()
        defer c.RUnlock()
        newMap := make(map[string]*deschedulernode.NodeInfo, len(c.nodesMap))
        for k, v := range c.nodesMap {
                newMap[k] = v.Clone()
        }
        return newMap
   }
   
   func (c *nodesCacheForTest) addNode(name string, nodeInfo 
*deschedulernode.NodeInfo) {
        c.Lock()
        defer c.Unlock()
        c.nodesMap[name] = nodeInfo
   }
   ```
   
   Note, I set 1ms to update the map in order to trigger this in UT code. I got 
the stack trace:
   
   ```
   fatal error: concurrent map read and map write
   
   goroutine 1248 [running]:
   runtime.throw(0x23ca940, 0x21)
        /Users/wyang/go1.15/src/runtime/panic.go:1116 +0x72 fp=0xc000368c90 
sp=0xc000368c60 pc=0x1038ed2
   runtime.mapaccess1_faststr(0x21bf6a0, 0xc0005a22a0, 0x23aa919, 0x8, 
0xc00030bb70)
        /Users/wyang/go1.15/src/runtime/map_faststr.go:21 +0x465 
fp=0xc000368d00 sp=0xc000368c90 pc=0x1015fc5
   
k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.getTPMapMatchingIncomingAffinityAntiAffinity.func2(0x0)
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:715
 +0xc5 fp=0xc000368f50 sp=0xc000368d00 pc=0x201ab45
   k8s.io/client-go/util/workqueue.ParallelizeUntil.func1(0xc0007a2a60, 
0xc0007cbb80, 0xc000311c10, 0xc0007e8750)
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:57
 +0x8d fp=0xc000368fc0 sp=0xc000368f50 pc=0x1eacccd
   runtime.goexit()
        /Users/wyang/go1.15/src/runtime/asm_amd64.s:1374 +0x1 fp=0xc000368fc8 
sp=0xc000368fc0 pc=0x1070c21
   created by k8s.io/client-go/util/workqueue.ParallelizeUntil
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:49
 +0x14a
   
   goroutine 1 [chan receive]:
   testing.(*T).Run(0xc000502900, 0x23b90c8, 0x14, 0x2452260, 0x1096401)
        /Users/wyang/go1.15/src/testing/testing.go:1160 +0x3ad
   testing.runTests.func1(0xc000582d80)
        /Users/wyang/go1.15/src/testing/testing.go:1430 +0x78
   testing.tRunner(0xc000582d80, 0xc000755de0)
        /Users/wyang/go1.15/src/testing/testing.go:1108 +0xef
   testing.runTests(0xc0000dc4a0, 0x2f499a0, 0xa, 0xa, 0x0, 0x0, 0x0, 0x100fc48)
        /Users/wyang/go1.15/src/testing/testing.go:1428 +0x2e8
   testing.(*M).Run(0xc00050e400, 0x0)
        /Users/wyang/go1.15/src/testing/testing.go:1338 +0x245
   main.main()
        _testmain.go:61 +0x138
   
   goroutine 19 [chan receive]:
   k8s.io/klog.(*loggingT).flushDaemon(0x2f5f480)
        /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/klog.go:1010 +0x8b
   created by k8s.io/klog.init.0
        /Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/klog.go:411 +0xd8
   
   goroutine 8 [chan receive]:
   k8s.io/klog/v2.(*loggingT).flushDaemon(0x2f5f560)
        /Users/wyang/gopath/pkg/mod/k8s.io/klog/[email protected]/klog.go:1107 +0x8b
   created by k8s.io/klog/v2.init.0
        /Users/wyang/gopath/pkg/mod/k8s.io/klog/[email protected]/klog.go:416 +0xd8
   
   goroutine 54 [sleep]:
   time.Sleep(0x2540be400)
        /Users/wyang/go1.15/src/runtime/time.go:188 +0xbf
   
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess(0xc000502900)
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2203
 +0x54d
   testing.tRunner(0xc000502900, 0x2452260)
        /Users/wyang/go1.15/src/testing/testing.go:1108 +0xef
   created by testing.(*T).Run
        /Users/wyang/go1.15/src/testing/testing.go:1159 +0x386
   
   goroutine 55 [semacquire]:
   sync.runtime_Semacquire(0xc0007a2a68)
        /Users/wyang/go1.15/src/runtime/sema.go:56 +0x45
   sync.(*WaitGroup).Wait(0xc0007a2a60)
        /Users/wyang/go1.15/src/sync/waitgroup.go:130 +0x65
   k8s.io/client-go/util/workqueue.ParallelizeUntil(0x258a060, 0xc0007e2fc0, 
0x4, 0x4, 0xc0007e8750)
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/util/workqueue/parallelizer.go:62
 +0x16d
   
k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.getTPMapMatchingIncomingAffinityAntiAffinity(0xc0001b0000,
 0xc0005a22a0, 0xc0007e6670, 0x0, 0x0, 0xc0007e6650)
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:754
 +0x6c5
   
k8s.io/kubernetes/pkg/scheduler/algorithm/predicates.(*PredicateMetadataFactory).GetMetadata(0xc000262370,
 0xc0001b0000, 0xc0005a22a0, 0x2550320, 0xc0007e6650)
        
/Users/wyang/gopath/pkg/mod/k8s.io/[email protected]/pkg/scheduler/algorithm/predicates/metadata.go:222
 +0x1ee
   
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.(*Predictor).GetPredicateMeta(...)
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predictor.go:245
   
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess.func1(0xc0002c44d0,
 0xc0001b0000, 0xc00000e100, 0xc00030ba00)
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2182
 +0x43
   created by 
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2180
 +0x4ee
   
   goroutine 56 [sleep]:
   time.Sleep(0xf4240)
        /Users/wyang/go1.15/src/runtime/time.go:188 +0xbf
   
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess.func2(0xc00051a088,
 0xc000502900, 0xc00000e100)
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2192
 +0x86
   created by 
github.com/apache/incubator-yunikorn-k8shim/pkg/plugin/predicates.TestConcurrentAccess
        
/Users/wyang/workspace/apache/yunikorn/incubator-yunikorn-k8shim/pkg/plugin/predicates/predicator_test.go:2190
 +0x53a
   
   Process finished with the exit code 1
   ```
   to fix this, the simplest, most straightforward solution is to make a copy 
of the map. See `getNodesMapCopy()`. after using this method, the error is 
gone. This will bring some performance decrease when we do this, simulated 
nodes copy from 1000 to 10000, the perf decrease is somewhere near:
   
   - 1000: 600+ µs
   - 10000: 6+ ms
   
   ### Screenshots (if appropriate)
   
   ### Questions:
   * [ ] - The licenses files need update.
   * [ ] - There is breaking changes for older versions.
   * [ ] - It needs documentation.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to