pbacsko commented on a change in pull request #353:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/353#discussion_r780086629



##########
File path: pkg/scheduler/usergroupmanagement/user.go
##########
@@ -0,0 +1,57 @@
+package usergroupmanagement
+
+import (
+       "sync/atomic"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+)
+
+type User struct {
+       name    string // Name of the user
+       maxResources    *resources.Resource // Max Resource configured per user
+       maxApplications int32 // Max Applications configured per user
+       runningApplications     *int32 // Running Applications
+       usedGroup       string
+}
+
+func NewUser(user string) *User {
+       return &User{
+               name:   user,
+       }
+}
+
+func (u *User) GetName() string {
+       return u.name
+}
+
+func (u *User) SetMaxApplications(maxApplications int32) {
+       u.maxApplications = maxApplications
+}
+
+func (u *User) IncRunningApplications() {
+       atomic.AddInt32(u.runningApplications, 1)
+}
+
+func (u *User) DecRunningApplications() {
+       atomic.AddInt32(u.runningApplications, -1)
+}
+
+func (u *User) CanRun() bool {
+       if atomic.LoadInt32(u.runningApplications) < u.maxApplications {

Review comment:
       I was thinking about this and I've become more confident that even my 
first suggestion is wrong. You can't separate the loading from the increment. 
The two operations are atomic, but their composition is not.
   
   Consider the following state with multiple Goroutines running simultaneously:
   
   maxApplications = 30
   runningApplications = 29
   
   **Scenario 1**
   
   The first is incorrect because of:
   Goruoutine 1:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 30
   Goruoutine 2:  loads current value -> [execution suspended for a brief 
period] -> checks if 29 < 30 (current value is already 30) -> returns true -> 
increments runningApplications to 31
   
   Goruoutine 2 should not return true, but it does.
   
   **Scenario 2**
   
   The first suggestion defends against this scenario 1, but not against this:
   Goruoutine 1:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 30 or 31
   Goruoutine 2:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 31 or 30
   
   You can't have `CanRun()` and `IncRunningApplications()` in two separate 
calls. The check and increment must behave atomically. It would only work if 
the call site of these methods were written as atomic:
   ```
   guard  *sync.Mutex
   ...
        if user != nil {
                   guard.Lock()
                if user.CanRun() {
                        user.IncRunningApplications()
                } else {
                   ...
                   }
                   guard.Unlock()
   ```
   
   But this makes the usage of atomics unnecessary.

##########
File path: pkg/scheduler/usergroupmanagement/user.go
##########
@@ -0,0 +1,57 @@
+package usergroupmanagement
+
+import (
+       "sync/atomic"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+)
+
+type User struct {
+       name    string // Name of the user
+       maxResources    *resources.Resource // Max Resource configured per user
+       maxApplications int32 // Max Applications configured per user
+       runningApplications     *int32 // Running Applications
+       usedGroup       string
+}
+
+func NewUser(user string) *User {
+       return &User{
+               name:   user,
+       }
+}
+
+func (u *User) GetName() string {
+       return u.name
+}
+
+func (u *User) SetMaxApplications(maxApplications int32) {
+       u.maxApplications = maxApplications
+}
+
+func (u *User) IncRunningApplications() {
+       atomic.AddInt32(u.runningApplications, 1)
+}
+
+func (u *User) DecRunningApplications() {
+       atomic.AddInt32(u.runningApplications, -1)
+}
+
+func (u *User) CanRun() bool {
+       if atomic.LoadInt32(u.runningApplications) < u.maxApplications {

Review comment:
       I was thinking about this and I've become more confident that even my 
first suggestion is wrong. You can't separate the loading from the increment. 
The two operations are atomic, but their composition is not.
   
   Consider the following state with multiple Goroutines running simultaneously:
   
   maxApplications = 30
   runningApplications = 29
   
   **Scenario 1**
   
   The first is incorrect because of:
   Goruoutine 1:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 30
   Goruoutine 2:  loads current value -> [execution suspended for a brief 
period] -> checks if 29 < 30 (current value is already 30) -> returns true -> 
increments runningApplications to 31
   
   Goruoutine 2 should not return true, but it does.
   
   **Scenario 2**
   
   The first suggestion defends against this scenario 1, but not against this:
   Goruoutine 1:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 30 or 31
   Goruoutine 2:  loads current value ->  checks if 29 < 30 -> returns true -> 
increments runningApplications to 31 or 30
   
   You can't have `CanRun()` and `IncRunningApplications()` in two separate 
calls. The check and increment must behave atomically. It would only work if 
the call site of these methods made sure that they are called that way:
   
   ```
   guard  *sync.Mutex
   ...
        if user != nil {
                   guard.Lock()
                if user.CanRun() {
                        user.IncRunningApplications()
                } else {
                   ...
                   }
                   guard.Unlock()
   ```
   
   But this makes the usage of atomics unnecessary.

##########
File path: pkg/scheduler/partition.go
##########
@@ -380,7 +406,33 @@ func (pc *PartitionContext) removeApplication(appID 
string) []*objects.Allocatio
        // Remove app from queue
        if queue := app.GetQueue(); queue != nil {
                queue.RemoveApplication(app)
+
+               var updateGroupMetrics bool
+               if len(app.GetUser().User) > 0 {
+                       user := 
queue.GetUserGroupManager().GetUser(app.GetUser().User)
+                       if user != nil {
+                               user.DecRunningApplications()
+                               g := 
queue.GetUserGroupManager().GetGroup(user.GetUsedGroup())
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       updateGroupMetrics = true
+                               }
+                       }
+               }
+
+               // Used when limit has been configured only for group, not for 
individual user
+               if ! updateGroupMetrics && len(app.GetUser().User) > 0 {
+                       for _, group := range app.GetUser().Groups {
+                               // Is there any group (to which user belongs 
to) config has limit settings?
+                               g := queue.GetUserGroupManager().GetGroup(group)
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       break

Review comment:
       Ok, this might be a nitpick, but I looked at the existing group cache 
code.
   
   I think we might have an issue here. Let's say an user "john" who belongs to 
"qe" group submitted an application which runs for a while. We have limits set 
for this group. Then "john" is removed from "qe" and the group cache gets 
updated. The application finishes but `DecRunningApplications()` will not be 
called.
   
   We need to store this information somewhere else, probably adding an extra 
field to `Application` and not relying on `app.GetUser().Groups`.

##########
File path: pkg/scheduler/partition.go
##########
@@ -380,7 +406,33 @@ func (pc *PartitionContext) removeApplication(appID 
string) []*objects.Allocatio
        // Remove app from queue
        if queue := app.GetQueue(); queue != nil {
                queue.RemoveApplication(app)
+
+               var updateGroupMetrics bool
+               if len(app.GetUser().User) > 0 {
+                       user := 
queue.GetUserGroupManager().GetUser(app.GetUser().User)
+                       if user != nil {
+                               user.DecRunningApplications()
+                               g := 
queue.GetUserGroupManager().GetGroup(user.GetUsedGroup())
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       updateGroupMetrics = true
+                               }
+                       }
+               }
+
+               // Used when limit has been configured only for group, not for 
individual user
+               if ! updateGroupMetrics && len(app.GetUser().User) > 0 {
+                       for _, group := range app.GetUser().Groups {
+                               // Is there any group (to which user belongs 
to) config has limit settings?
+                               g := queue.GetUserGroupManager().GetGroup(group)
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       break

Review comment:
       Ok, this might be a nitpick, but I looked at the existing group cache 
code.
   
   I think we might have an issue here. Let's say a user "john" who belongs to 
"qe" group submitted an application which runs for a while. We have limits set 
for this group. Then "john" is removed from "qe" and the group cache gets 
updated. The application finishes but `DecRunningApplications()` will not be 
called.
   
   We need to store this information somewhere else, probably adding an extra 
field to `Application` and not relying on `app.GetUser().Groups`.

##########
File path: pkg/scheduler/partition.go
##########
@@ -380,7 +406,33 @@ func (pc *PartitionContext) removeApplication(appID 
string) []*objects.Allocatio
        // Remove app from queue
        if queue := app.GetQueue(); queue != nil {
                queue.RemoveApplication(app)
+
+               var updateGroupMetrics bool
+               if len(app.GetUser().User) > 0 {
+                       user := 
queue.GetUserGroupManager().GetUser(app.GetUser().User)
+                       if user != nil {
+                               user.DecRunningApplications()
+                               g := 
queue.GetUserGroupManager().GetGroup(user.GetUsedGroup())
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       updateGroupMetrics = true
+                               }
+                       }
+               }
+
+               // Used when limit has been configured only for group, not for 
individual user
+               if ! updateGroupMetrics && len(app.GetUser().User) > 0 {
+                       for _, group := range app.GetUser().Groups {
+                               // Is there any group (to which user belongs 
to) config has limit settings?
+                               g := queue.GetUserGroupManager().GetGroup(group)
+                               if g != nil {
+                                       g.DecRunningApplications()
+                                       break

Review comment:
       cc @wilfred-s @craigcondit 

##########
File path: pkg/scheduler/usergroupmanagement/user.go
##########
@@ -0,0 +1,57 @@
+package usergroupmanagement
+
+import (
+       "sync/atomic"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+)
+
+type User struct {
+       name    string // Name of the user
+       maxResources    *resources.Resource // Max Resource configured per user
+       maxApplications int32 // Max Applications configured per user
+       runningApplications     *int32 // Running Applications
+       usedGroup       string
+}
+
+func NewUser(user string) *User {
+       return &User{
+               name:   user,
+       }
+}
+
+func (u *User) GetName() string {
+       return u.name
+}
+
+func (u *User) SetMaxApplications(maxApplications int32) {
+       u.maxApplications = maxApplications
+}
+
+func (u *User) IncRunningApplications() {
+       atomic.AddInt32(u.runningApplications, 1)
+}
+
+func (u *User) DecRunningApplications() {
+       atomic.AddInt32(u.runningApplications, -1)
+}
+
+func (u *User) CanRun() bool {
+       if atomic.LoadInt32(u.runningApplications) < u.maxApplications {

Review comment:
       cc @wilfred-s @craigcondit 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to