wilfred-s commented on a change in pull request #128: URL: https://github.com/apache/incubator-yunikorn-k8shim/pull/128#discussion_r444965276
########## File path: pkg/cache/task.go ########## @@ -415,3 +416,19 @@ func (task *Task) enterState(event *fsm.Event) { zap.String("destination", event.Dst), zap.String("event", event.Event)) } + +func PublishTaskEvent(taskID, reason, msg string, app interfaces.ManagedApp) error { + task, err := app.GetTask(taskID) + if err != nil { + return fmt.Errorf("could not find %s task belonging to %s app", taskID, app.GetApplicationID()) + } + pod := task.GetTaskPod() + if pod == nil { + return fmt.Errorf("could not obtain %s task's pod", taskID) + } + + log.Logger.Debug("publishing task event", zap.String("podName", pod.ObjectMeta.Name), zap.String("reason", reason), zap.String("message", msg)) Review comment: NIT: format the log lines, each `zap` on its own line ########## File path: pkg/cache/task.go ########## @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces" Review comment: This will not pass the lint checks ########## File path: pkg/cache/task.go ########## @@ -415,3 +416,19 @@ func (task *Task) enterState(event *fsm.Event) { zap.String("destination", event.Dst), zap.String("event", event.Event)) } + +func PublishTaskEvent(taskID, reason, msg string, app interfaces.ManagedApp) error { + task, err := app.GetTask(taskID) + if err != nil { + return fmt.Errorf("could not find %s task belonging to %s app", taskID, app.GetApplicationID()) + } + pod := task.GetTaskPod() + if pod == nil { + return fmt.Errorf("could not obtain %s task's pod", taskID) + } + + log.Logger.Debug("publishing task event", zap.String("podName", pod.ObjectMeta.Name), zap.String("reason", reason), zap.String("message", msg)) + events.GetRecorder().Event(pod, v1.EventTypeWarning, reason, msg) + + return nil +} Review comment: missing newline at the end of the file ########## File path: pkg/callback/scheduler_callback.go ########## @@ -143,3 +144,35 @@ func (callback *AsyncRMCallback) ReSyncSchedulerCache(args *si.ReSyncSchedulerCa } return nil } + +// this callback implement scheduler plugin interface EventPlugin. +func (callback *AsyncRMCallback) SendEvent(eventRecords []*si.EventRecord) error { + errors := make([]string, 0) + if len(eventRecords) > 0 { + log.Logger.Debug(fmt.Sprintf("processing %d events", len(eventRecords))) + for _, record := range eventRecords { + reason := record.Reason + msg := record.Message + + switch record.Type { + case si.EventRecord_REQUEST: + taskID := record.ObjectID + appID := record.GroupID + app := callback.context.GetApplication(appID) + + err := cache.PublishTaskEvent(taskID, reason, msg, app) + if err != nil { + errors = append(errors, err.Error()) + } + log.Logger.Debug("event emitted") + case si.EventRecord_APP: + // until we don't have app CRD let's expose app event to all its pods (asks) + // pending on YUNIKORN-170 + errors = append(errors, fmt.Sprintf("processing app event is not implemented yet: %s", record)) Review comment: We need to be consistent: either do it for all known types that we do not support yet or do it for none. ########## File path: pkg/callback/scheduler_callback.go ########## @@ -143,3 +144,35 @@ func (callback *AsyncRMCallback) ReSyncSchedulerCache(args *si.ReSyncSchedulerCa } return nil } + +// this callback implement scheduler plugin interface EventPlugin. +func (callback *AsyncRMCallback) SendEvent(eventRecords []*si.EventRecord) error { Review comment: This is in sync with the core change. The name is a bit ambiguous as it is a send from the core to the shim but anything else would be confusing on the core side. Unless there is a big objection I think we can leave it as is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org