wilfred-s commented on a change in pull request #128:
URL: 
https://github.com/apache/incubator-yunikorn-k8shim/pull/128#discussion_r444965276



##########
File path: pkg/cache/task.go
##########
@@ -415,3 +416,19 @@ func (task *Task) enterState(event *fsm.Event) {
                zap.String("destination", event.Dst),
                zap.String("event", event.Event))
 }
+
+func PublishTaskEvent(taskID, reason, msg string, app interfaces.ManagedApp) 
error {
+       task, err := app.GetTask(taskID)
+       if err != nil {
+               return fmt.Errorf("could not find %s task belonging to %s app", 
taskID, app.GetApplicationID())
+       }
+       pod := task.GetTaskPod()
+       if pod == nil {
+               return fmt.Errorf("could not obtain %s task's pod", taskID)
+       }
+
+       log.Logger.Debug("publishing task event", zap.String("podName", 
pod.ObjectMeta.Name), zap.String("reason", reason), zap.String("message", msg))

Review comment:
       NIT: format the log lines, each `zap` on its own line

##########
File path: pkg/cache/task.go
##########
@@ -23,6 +23,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"

Review comment:
       This will not pass the lint checks

##########
File path: pkg/cache/task.go
##########
@@ -415,3 +416,19 @@ func (task *Task) enterState(event *fsm.Event) {
                zap.String("destination", event.Dst),
                zap.String("event", event.Event))
 }
+
+func PublishTaskEvent(taskID, reason, msg string, app interfaces.ManagedApp) 
error {
+       task, err := app.GetTask(taskID)
+       if err != nil {
+               return fmt.Errorf("could not find %s task belonging to %s app", 
taskID, app.GetApplicationID())
+       }
+       pod := task.GetTaskPod()
+       if pod == nil {
+               return fmt.Errorf("could not obtain %s task's pod", taskID)
+       }
+
+       log.Logger.Debug("publishing task event", zap.String("podName", 
pod.ObjectMeta.Name), zap.String("reason", reason), zap.String("message", msg))
+       events.GetRecorder().Event(pod, v1.EventTypeWarning, reason, msg)
+
+       return nil
+}

Review comment:
       missing newline at the end of the file

##########
File path: pkg/callback/scheduler_callback.go
##########
@@ -143,3 +144,35 @@ func (callback *AsyncRMCallback) ReSyncSchedulerCache(args 
*si.ReSyncSchedulerCa
        }
        return nil
 }
+
+// this callback implement scheduler plugin interface EventPlugin.
+func (callback *AsyncRMCallback) SendEvent(eventRecords []*si.EventRecord) 
error {
+       errors := make([]string, 0)
+       if len(eventRecords) > 0 {
+               log.Logger.Debug(fmt.Sprintf("processing %d events", 
len(eventRecords)))
+               for _, record := range eventRecords {
+                       reason := record.Reason
+                       msg := record.Message
+
+                       switch record.Type {
+                       case si.EventRecord_REQUEST:
+                               taskID := record.ObjectID
+                               appID := record.GroupID
+                               app := callback.context.GetApplication(appID)
+
+                               err := cache.PublishTaskEvent(taskID, reason, 
msg, app)
+                               if err != nil {
+                                       errors = append(errors, err.Error())
+                               }
+                               log.Logger.Debug("event emitted")
+                       case si.EventRecord_APP:
+                               // until we don't have app CRD let's expose app 
event to all its pods (asks)
+                               // pending on YUNIKORN-170
+                               errors = append(errors, fmt.Sprintf("processing 
app event is not implemented yet: %s", record))

Review comment:
       We need to be consistent: either do it for all known types that we do 
not support yet or do it for none.

##########
File path: pkg/callback/scheduler_callback.go
##########
@@ -143,3 +144,35 @@ func (callback *AsyncRMCallback) ReSyncSchedulerCache(args 
*si.ReSyncSchedulerCa
        }
        return nil
 }
+
+// this callback implement scheduler plugin interface EventPlugin.
+func (callback *AsyncRMCallback) SendEvent(eventRecords []*si.EventRecord) 
error {

Review comment:
       This is in sync with the core change.
   The name is a bit ambiguous as it is a send from the core to the shim but 
anything else would be confusing on the core side.
   Unless there is a big objection I think we can leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to