[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events
wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444960603 ## File path: pkg/events/events.go ## @@ -0,0 +1,40 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "time" + + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +func createEventRecord(recordType si.EventRecord_Type, objectID, groupID, reason, message string) (*si.EventRecord, error) { + return { + Type: recordType, + ObjectID: objectID, + GroupID: groupID, + Reason:reason, + Message: message, Review comment: sounds good, if in the future that changes we can adjust what we have. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events
wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444959075 ## File path: pkg/events/event_publisher.go ## @@ -0,0 +1,89 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "go.uber.org/zap" +) + +type EventPublisher interface { + StartService() + Stop() + GetEventStore() EventStore +} + +type shimPublisher struct { + store EventStore + stopped bool + + sync.Mutex +} + +func NewShimPublisher(event EventStore) EventPublisher { + return { + store: event, + stopped: false, + } +} + +func (sp *shimPublisher) isStopped() bool { + sp.Lock() + defer sp.Unlock() + + return sp.stopped +} + +func (sp *shimPublisher) StartService() { + go func () { + for { + if sp.isStopped() { + break + } + if eventPlugin := plugins.GetEventPlugin(); eventPlugin != nil { + messages := sp.store.CollectEvents() + log.Logger().Debug("Sending events", zap.Int("number of messages", len(messages))) Review comment: not at this point in time, we'll work on this when we see more perf numbers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events
wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444922722 ## File path: pkg/events/event_cache.go ## @@ -0,0 +1,117 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-core/pkg/metrics" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +// need to change for testing +var defaultEventChannelSize = 10 + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel chan *si.EventRecord // channelling input eventChannel + store EventStore // storing eventChannel + stopchan bool// whether the service is stop + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func() { + store := newEventStoreImpl() + + cache = createEventCacheInternal(store) + }) + return cache +} + +func createEventCacheInternal(store EventStore) *EventCache { + return { + channel: nil, + store: store, + stop:make(chan bool), + } +} + +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.channel = make(chan *si.EventRecord, defaultEventChannelSize) + + go ec.processEvent() +} + +func (ec *EventCache) IsStarted() bool { + ec.Lock() + defer ec.Unlock() + + return ec.channel != nil +} + +func (ec *EventCache) Stop() { + ec.Lock() + defer ec.Unlock() + + ec.stop <- true + if ec.channel != nil { + close(ec.channel) + ec.channel = nil + } +} + +func (ec *EventCache) AddEvent(event *si.EventRecord) { + metrics.GetEventMetrics().IncEventsCreated() + select { + case ec.channel <- event: + metrics.GetEventMetrics().IncEventsChanneled() + default: + log.Logger().Debug("could not add Event to channel") + metrics.GetEventMetrics().IncEventsNotChanneled() + } +} + +func (ec *EventCache) GetEventStore() EventStore { + return ec.store +} + +func (ec *EventCache) processEvent() { + for { + select { + case <-ec.stop: + return + case event, ok := <-ec.channel: + if !ok { + return + } + if event != nil { + ec.store.Store(event) + metrics.GetEventMetrics().IncEventsProcessed() + } Review comment: We should not allow nil values, and we do not do that on the sending side. If the event construction fails we do not send. It also does not make sense to send a nil event over the channel. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events
wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444627153 ## File path: pkg/events/event_store_test.go ## @@ -0,0 +1,130 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + + "gotest.tools/assert" + + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +func TestStoreAndRetrieveAllocationAsk(t *testing.T) { Review comment: All these tests are independent of the type used. Change the name to not include the type info. ## File path: pkg/scheduler/scheduling_application.go ## @@ -703,3 +714,10 @@ func (sa *SchedulingApplication) finishRecovery() { zap.Error(err)) } } + +func createInsufficientQueueResourcesEvent(ask *si.AllocationAsk, message string) (*si.EventRecord, error) { + if ask == nil { + return nil, fmt.Errorf("could not create Event object from nil AllocationAsk") + } + return events.CreateRequestEventRecord(ask.AllocationKey, ask.ApplicationID, "InsufficientQueueResources", message) Review comment: Should use `events.CreateRequestEventRecord()` directly, not this wrapper ## File path: pkg/events/event_cache_test.go ## @@ -0,0 +1,170 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "gotest.tools/assert" + + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" +) + +func TestStartStop(t *testing.T) { + cache := GetEventCache() + assert.Equal(t, cache.IsStarted(), false, "EventCache should not be started when constructed") + // adding event to stopped cache does not cause panic + cache.AddEvent(nil) + cache.StartService() + // add an event + cache.AddEvent(nil) + assert.Equal(t, cache.IsStarted(), true, "EventCache should have been started") + cache.Stop() + // adding event to stopped cache does not cause panic + cache.AddEvent(nil) + assert.Equal(t, cache.IsStarted(), false, "EventCache should have been stopped") +} + +func TestSingleEvent(t *testing.T) { + cache := GetEventCache() + store := cache.GetEventStore() + + cache.StartService() + + event := si.EventRecord{ + Type: si.EventRecord_REQUEST, + ObjectID: "alloc1", + GroupID: "app1", + Reason: "reason", + Message: "message", + } + cache.AddEvent() + + // wait for events to be processed + time.Sleep(1 * time.Millisecond) Review comment: Should make this smart with a check of the number in the channel and when it drops to 0 progress Waiting 1msec could make the test flaky use a simple `len(cache.channel)` in a `waitFor` construct ## File path: pkg/events/event_cache.go ## @@ -0,0 +1,117 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless
[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events
wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r437121702 ## File path: pkg/events/event_publisher_test.go ## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" +) + +type eventStoreForTest struct { + events chan Event +} + +func eventToRecord(event Event) *si.EventRecord { + source, ok := event.GetSource().(string) + if !ok { + panic("Expected string in tests") + } + return { + ObjectID: source, + GroupID: event.GetGroup(), + Reason: event.GetReason(), + Message: event.GetMessage(), + } +} + +func (es *eventStoreForTest) Store(e Event) { + es.events <- e +} + +func (es *eventStoreForTest) CollectEvents() []*si.EventRecord { + records := make([]*si.EventRecord, 0) + for { + select { + case event := <-es.events: + records = append(records, eventToRecord(event)) + default: + return records + } + } +} + +type eventPluginForTest struct { + records chan *si.EventRecord +} + +// create and register mocked event plugin +func createEventPluginForTest(t *testing.T) eventPluginForTest { + eventPlugin := eventPluginForTest{ + records: make(chan *si.EventRecord, 3), + } + plugins.RegisterSchedulerPlugin() + if plugins.GetEventPlugin() == nil { + t.Fatalf("Event plugin should have been registered!") + } + return eventPlugin +} + +func (ep *eventPluginForTest) SendEvent(events []*si.EventRecord) error { + for _, event := range events { + ep.records <- event + } + return nil +} + +func (ep *eventPluginForTest) getNextEventRecord() *si.EventRecord { + select { + case record := <- ep.records: + return record + default: + return nil + } +} + +func TestServiceStartStopWithoutEventPlugin(t *testing.T) { + store := { + events: make(chan Event, 2), + } + publisher := NewShimPublisher(store) + publisher.StartService() + assert.Equal(t, publisher.GetEventStore(), store) + time.Sleep(100 * time.Millisecond) + publisher.Stop() +} + +func TestServiceStartStopWithEventPlugin(t *testing.T) { + createEventPluginForTest(t) Review comment: How do we test that the plugin is set correctly? This does not make any additional checks than the test without the plugin. ## File path: pkg/events/event_cache.go ## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO this should be configurable? +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channelEventChannel // channelling input events + store EventStore // storing