wilfred-s commented on a change in pull request #141: URL: https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r437121702
########## File path: pkg/events/event_publisher_test.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" +) + +type eventStoreForTest struct { + events chan Event +} + +func eventToRecord(event Event) *si.EventRecord { + source, ok := event.GetSource().(string) + if !ok { + panic("Expected string in tests") + } + return &si.EventRecord{ + ObjectID: source, + GroupID: event.GetGroup(), + Reason: event.GetReason(), + Message: event.GetMessage(), + } +} + +func (es *eventStoreForTest) Store(e Event) { + es.events <- e +} + +func (es *eventStoreForTest) CollectEvents() []*si.EventRecord { + records := make([]*si.EventRecord, 0) + for { + select { + case event := <-es.events: + records = append(records, eventToRecord(event)) + default: + return records + } + } +} + +type eventPluginForTest struct { + records chan *si.EventRecord +} + +// create and register mocked event plugin +func createEventPluginForTest(t *testing.T) eventPluginForTest { + eventPlugin := eventPluginForTest{ + records: make(chan *si.EventRecord, 3), + } + plugins.RegisterSchedulerPlugin(&eventPlugin) + if plugins.GetEventPlugin() == nil { + t.Fatalf("Event plugin should have been registered!") + } + return eventPlugin +} + +func (ep *eventPluginForTest) SendEvent(events []*si.EventRecord) error { + for _, event := range events { + ep.records <- event + } + return nil +} + +func (ep *eventPluginForTest) getNextEventRecord() *si.EventRecord { + select { + case record := <- ep.records: + return record + default: + return nil + } +} + +func TestServiceStartStopWithoutEventPlugin(t *testing.T) { + store := &eventStoreForTest{ + events: make(chan Event, 2), + } + publisher := NewShimPublisher(store) + publisher.StartService() + assert.Equal(t, publisher.GetEventStore(), store) + time.Sleep(100 * time.Millisecond) + publisher.Stop() +} + +func TestServiceStartStopWithEventPlugin(t *testing.T) { + createEventPluginForTest(t) Review comment: How do we test that the plugin is set correctly? This does not make any additional checks than the test without the plugin. ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO this should be configurable? +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel EventChannel // channelling input events + store EventStore // storing events + publishers []EventPublisher // publishing events to sinks + started bool // whether the service is started + stopped bool // whether the service is stopped + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func(){ + store := newEventStoreImpl() + pub := newShimPublisher(store) + + cache = &EventCache{ + channel: newEventChannelImpl(), + store: store, + publishers: []EventPublisher{pub}, + started: false, + stopped: false, + } + }) + return cache +} + +// TODO consider thread pool +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.started = true Review comment: You should not mark it as started until all handling has been performed. If error states are introduced later on you have marked the service already started before the work has been done. ########## File path: pkg/events/events.go ########## @@ -0,0 +1,92 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "fmt" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "go.uber.org/zap" +) + +type Event interface { + GetSource() interface{} + GetGroup() string + GetReason() string + GetMessage() string +} + +func toEventMessage(e Event) (*si.EventRecord, error) { + eventType, id, err := convertSourceToTypeAndID(e.GetSource()) + if err != nil { + return nil, err + } + return &si.EventRecord{ + Type: eventType, + ObjectID: id, + GroupID: e.GetGroup(), + Reason: e.GetReason(), + Message: e.GetMessage(), + }, nil +} + +func convertSourceToTypeAndID(obj interface{}) (si.EventRecord_Type, string, error) { + // TODO other type checks + if ask, ok := obj.(*si.AllocationAsk); ok { + return si.EventRecord_REQUEST, ask.AllocationKey, nil + } + log.Logger().Warn("Could not convert source object to EventMessageType", zap.Any("object", obj)) + + // TODO should add UNKNOWN request? + return si.EventRecord_REQUEST, "", fmt.Errorf("could not convert source object to EventMessageType") +} + +type baseEvent struct { + source interface{} Review comment: This means that the garbage collector will never release the source object for a cleanup unless the event has been removed. Tracking the source object pointer will thus have a direct extended impact on memory use of the core. We need a real specific reason why would need to track the object and not an ID or something like that. ########## File path: pkg/events/event_channel.go ########## @@ -0,0 +1,74 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "fmt" + "strconv" + "sync" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO should configure the size of the channel +const eventChannelSize = 100000 + +// wrapping the channel into a struct - so that the underlying implementation can be changed +type eventChannel struct { + events chan Event Review comment: This would be one place to use an atomic int. Normally I would say that atomic is not the right thing to use but in this case we only need to know that there is a guarantee that the update will be applied and that we always look at the right value. ########## File path: pkg/events/event_store.go ########## @@ -0,0 +1,73 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "go.uber.org/zap" +) + +type EventStore interface { + Store(Event) + CollectEvents() []*si.EventRecord +} + +type defaultEventStore struct { + eventMap map[interface{}]Event + + sync.RWMutex +} + +func newEventStoreImpl() EventStore { + return &defaultEventStore{ + eventMap: make(map[interface{}]Event), + } +} + +func (es *defaultEventStore) Store(event Event) { + es.Lock() + defer es.Unlock() + + es.eventMap[event.GetSource()] = event +} + +func (es *defaultEventStore) CollectEvents() []*si.EventRecord { + es.Lock() + defer es.Unlock() + + messages := make([]*si.EventRecord, 0) + + // collect events + for _, v := range es.eventMap { + message, err := toEventMessage(v) + if err != nil { + log.Logger().Warn("Could not translate object to EventMessage", zap.Any("object", v)) + continue + } + messages = append(messages, message) + } + + // clear map + es.eventMap = make(map[interface{}]Event) Review comment: This means we can never use more than one publisher for events, that is a huge limitation. ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,143 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +const defaultEventChannelSize = 100000 + +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel EventChannel // channelling input events + store EventStore // storing events + publishers []EventPublisher // publishing events to sinks + started bool // whether the service is started + stopped bool // whether the service is stopped + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func(){ + store := newEventStoreImpl() + + cache = &EventCache{ + channel: newEventChannelImpl(defaultEventChannelSize), + store: store, + publishers: make([]EventPublisher, 0), + started: false, + stopped: false, + } + }) + return cache +} + +// TODO consider thread pool +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.started = true + + // start main event processing thread + go ec.processEvent() +} + +func (ec *EventCache) Stop() { + ec.Lock() + defer ec.Unlock() + + if ec.started { + if ec.stopped { + panic("could not stop EventCache service") Review comment: We should never panic in production code. ########## File path: pkg/events/event_cache_test.go ########## @@ -0,0 +1,112 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" +) + +func TestSingleEvent(t *testing.T) { + cache := GetEventCache() + store := cache.GetEventStore() + + cache.StartService() + + ask := si.AllocationAsk{ + AllocationKey: "alloc1", + } + event := baseEvent{ + source: &ask, + group: "app1", + reason: "reason", + message: "message", + } + cache.AddEvent(&event) + + // wait for cache to process the event + time.Sleep(2 * sleepTimeInterval) + + records := store.CollectEvents() + assert.Equal(t, len(records), 1) + record := records[0] + assert.Equal(t, record.Type, si.EventRecord_REQUEST) + assert.Equal(t, record.ObjectID, "alloc1") + assert.Equal(t, record.GroupID, "app1") + assert.Equal(t, record.Message, "message") + assert.Equal(t, record.Reason, "reason") +} + +func TestMultipleEvents(t *testing.T) { + cache := GetEventCache() + store := cache.GetEventStore() + + cache.StartService() + + ask1 := si.AllocationAsk{ + AllocationKey: "alloc1", + } + ask2 := si.AllocationAsk{ + AllocationKey: "alloc2", + } + event1 := baseEvent{ + source: &ask1, + group: "app1", + reason: "reason1", + message: "message1", + } + event2 := baseEvent{ + source: &ask1, + group: "app1", + reason: "reason2", + message: "message2", + } + event3 := baseEvent{ + source: &ask2, + group: "app2", + reason: "reason3", + message: "message3", + } + cache.AddEvent(&event1) + cache.AddEvent(&event2) + cache.AddEvent(&event3) + + // wait for cache to process the event + time.Sleep(2 * sleepTimeInterval) + + records := store.CollectEvents() + assert.Equal(t, len(records), 2) + for _, record := range records { + assert.Equal(t, record.Type, si.EventRecord_REQUEST) + if record.ObjectID == "alloc1" { + assert.Equal(t, record.GroupID, "app1") + assert.Equal(t, record.Message, "message2") + assert.Equal(t, record.Reason, "reason2") + } else if record.ObjectID == "alloc2" { + assert.Equal(t, record.GroupID, "app2") + assert.Equal(t, record.Message, "message3") + assert.Equal(t, record.Reason, "reason3") + } else { + t.Fatalf("Unexpected allocation found") + } + } +} Review comment: missing newline at the end of file all files with the ⊖ at the end need to be fixed ########## File path: pkg/common/resources/resources.go ########## @@ -367,13 +367,19 @@ func subNonNegative(left, right *Resource) (*Resource, string) { // Check if smaller fitin larger, negative values will be treated as 0 // A nil resource is treated as an empty resource (zero) func FitIn(larger, smaller *Resource) bool { + fitIn, _ := FitInWithExplanation(larger, smaller) + return fitIn +} + +// TODO test this +func FitInWithExplanation(larger, smaller *Resource) (bool, string) { Review comment: I do not see why we would do that here. We know exactly what happens: fit or not. Looking at the individual values is **not** a good idea. Ranging over a map (the quantities in the resource) is **guaranteed** to be random (part of the go language definition) So a call like this: ``` larger = "first": 5, "second": 5 smaller = "first": 10, "second": 10 FitIn(larger, smaller) ``` A call will return different results as explanation when run multiple times. You can not rely on the returned specific details. ########## File path: pkg/events/event_publisher.go ########## @@ -0,0 +1,89 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "go.uber.org/zap" +) + +type EventPublisher interface { + StartService() + Stop() + GetEventStore() EventStore Review comment: This seems to be only used for test. Do we really need to expose this because of that? If needed please annotate that it is only supposed to be used in test, otherwise do not export and use the object directly in the tests ########## File path: pkg/events/event_publisher.go ########## @@ -0,0 +1,89 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "go.uber.org/zap" +) + +type EventPublisher interface { + StartService() + Stop() + GetEventStore() EventStore +} + +type shimPublisher struct { + store EventStore + stopped bool Review comment: This is used to signal not really to show a state, `stop` would be more appropriate. ########## File path: pkg/events/event_cache_test.go ########## @@ -0,0 +1,112 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" Review comment: Check lint this should not pass the lint check ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO this should be configurable? +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel EventChannel // channelling input events + store EventStore // storing events + publishers []EventPublisher // publishing events to sinks + started bool // whether the service is started + stopped bool // whether the service is stopped + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func(){ + store := newEventStoreImpl() + pub := newShimPublisher(store) + + cache = &EventCache{ + channel: newEventChannelImpl(), + store: store, + publishers: []EventPublisher{pub}, + started: false, + stopped: false, + } + }) + return cache +} + +// TODO consider thread pool +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.started = true + + // start main event processing thread + go ec.processEvent() + + // start event publishers + for _, publisher := range ec.publishers { + publisher.StartService() + } +} + +func (ec *EventCache) Stop() { + ec.Lock() + defer ec.Unlock() + + if ec.started { + if ec.stopped { + panic("could not stop EventCache service") + } + ec.stopped = true + + // stop event publishers + for _, publisher := range ec.publishers { + publisher.Stop() + } + } +} + +func (ec *EventCache) AddPublisher(pub EventPublisher) { + ec.Lock() + defer ec.Unlock() + + if ec.started { Review comment: The event cache gets started as part of `startAllServicesWithParameters`. That means we have a race condition here. If the publishers are not added early enough in the startup cycle of the rest of the code the fact that the cache is started will prevent them from being added. We need to be more flexible and handle it better. ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,143 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +const defaultEventChannelSize = 100000 + +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second Review comment: Why is this in the cache and not in the publisher? Each publisher could use its own interval, we should not have the cache define this. ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO this should be configurable? +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel EventChannel // channelling input events + store EventStore // storing events + publishers []EventPublisher // publishing events to sinks + started bool // whether the service is started + stopped bool // whether the service is stopped + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func(){ + store := newEventStoreImpl() + pub := newShimPublisher(store) + + cache = &EventCache{ + channel: newEventChannelImpl(), + store: store, + publishers: []EventPublisher{pub}, + started: false, + stopped: false, + } + }) + return cache +} + +// TODO consider thread pool +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.started = true + + // start main event processing thread + go ec.processEvent() + + // start event publishers + for _, publisher := range ec.publishers { + publisher.StartService() + } +} + +func (ec *EventCache) Stop() { + ec.Lock() + defer ec.Unlock() + + if ec.started { + if ec.stopped { + panic("could not stop EventCache service") + } + ec.stopped = true Review comment: If the event cache has been stopped `started` is true calling stop again will panic because `stopped` is also set. We should not use two flags, you cannot be in two states at the same time. ########## File path: pkg/events/event_publisher_test.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" +) + +type eventStoreForTest struct { + events chan Event +} + +func eventToRecord(event Event) *si.EventRecord { + source, ok := event.GetSource().(string) + if !ok { + panic("Expected string in tests") + } Review comment: This should be a test failure not a panic ########## File path: pkg/events/event_publisher_test.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "testing" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si" + "gotest.tools/assert" +) + +type eventStoreForTest struct { + events chan Event +} + +func eventToRecord(event Event) *si.EventRecord { + source, ok := event.GetSource().(string) + if !ok { + panic("Expected string in tests") + } + return &si.EventRecord{ + ObjectID: source, + GroupID: event.GetGroup(), + Reason: event.GetReason(), + Message: event.GetMessage(), + } +} + +func (es *eventStoreForTest) Store(e Event) { + es.events <- e +} + +func (es *eventStoreForTest) CollectEvents() []*si.EventRecord { + records := make([]*si.EventRecord, 0) + for { + select { + case event := <-es.events: + records = append(records, eventToRecord(event)) + default: + return records + } + } +} + +type eventPluginForTest struct { + records chan *si.EventRecord +} + +// create and register mocked event plugin +func createEventPluginForTest(t *testing.T) eventPluginForTest { Review comment: This does not look right: pass in `test` for the failure. Make it a utility function and fail the test if a nil gets returned from here. ########## File path: pkg/events/event_cache.go ########## @@ -0,0 +1,144 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" +) + +// TODO this should be configurable? +const sleepTimeInterval = 10 * time.Millisecond +const pushEventInterval = 2 * time.Second + +var once sync.Once +var cache *EventCache + +type EventCache struct { + channel EventChannel // channelling input events + store EventStore // storing events + publishers []EventPublisher // publishing events to sinks + started bool // whether the service is started + stopped bool // whether the service is stopped + + sync.Mutex +} + +func GetEventCache() *EventCache { + once.Do(func(){ + store := newEventStoreImpl() + pub := newShimPublisher(store) + + cache = &EventCache{ + channel: newEventChannelImpl(), + store: store, + publishers: []EventPublisher{pub}, + started: false, + stopped: false, + } + }) + return cache +} + +// TODO consider thread pool +func (ec *EventCache) StartService() { + ec.Lock() + defer ec.Unlock() + + ec.started = true + + // start main event processing thread + go ec.processEvent() + + // start event publishers + for _, publisher := range ec.publishers { + publisher.StartService() + } +} + +func (ec *EventCache) Stop() { + ec.Lock() + defer ec.Unlock() + + if ec.started { + if ec.stopped { + panic("could not stop EventCache service") + } + ec.stopped = true + + // stop event publishers + for _, publisher := range ec.publishers { + publisher.Stop() + } + } +} + +func (ec *EventCache) AddPublisher(pub EventPublisher) { + ec.Lock() + defer ec.Unlock() + + if ec.started { + log.Logger().Error("Added Publisher to a running EventCache!") + } else { + ec.publishers = append(ec.publishers, pub) + } +} + +func (ec *EventCache) AddEvent(event Event) { + ec.channel.AddEvent(event) +} + +func (ec *EventCache) IsStarted() bool { + ec.Lock() + defer ec.Unlock() + + return ec.started +} + +func (ec *EventCache) isStopped() bool { + ec.Lock() + defer ec.Unlock() + + return ec.stopped +} + +func (ec *EventCache) processEvent() { + for { + // break the main loop if stopped + if ec.isStopped() { + break + } + for { + // do not process any new event if the process has been stopped + if ec.isStopped() { + break + } + // TODO for debugging: add time info about how long did this step take + event, ok := ec.channel.GetNextEvent() Review comment: if `GetNextEvent()` returns it has an event. That event is either nil or has a value. Why do we need the extra boolean? ########## File path: pkg/events/event_publisher.go ########## @@ -0,0 +1,89 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "sync" + "time" + + "github.com/apache/incubator-yunikorn-core/pkg/log" + "github.com/apache/incubator-yunikorn-core/pkg/plugins" + "go.uber.org/zap" +) + +type EventPublisher interface { + StartService() + Stop() + GetEventStore() EventStore +} + +type shimPublisher struct { + store EventStore + stopped bool + + sync.Mutex +} + +func NewShimPublisher(event EventStore) EventPublisher { + return &shimPublisher{ + store: event, + stopped: false, + } +} + +func (sp *shimPublisher) isStopped() bool { + sp.Lock() + defer sp.Unlock() + + return sp.stopped +} + +func (sp *shimPublisher) StartService() { + go func () { + for { + if sp.isStopped() { + break + } + if eventPlugin := plugins.GetEventPlugin(); eventPlugin != nil { + messages := sp.store.CollectEvents() + log.Logger().Debug("Sending events", zap.Int("number of messages", len(messages))) Review comment: How do we handle requests from a certain point onwards ? Do we always get all events or is there a limit (like 500 or possible) ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org