wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r437121702



##########
File path: pkg/events/event_publisher_test.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "gotest.tools/assert"
+)
+
+type eventStoreForTest struct {
+       events chan Event
+}
+
+func eventToRecord(event Event) *si.EventRecord {
+       source, ok := event.GetSource().(string)
+       if !ok {
+               panic("Expected string in tests")
+       }
+       return &si.EventRecord{
+               ObjectID: source,
+               GroupID: event.GetGroup(),
+               Reason: event.GetReason(),
+               Message: event.GetMessage(),
+       }
+}
+
+func (es *eventStoreForTest) Store(e Event) {
+       es.events <- e
+}
+
+func (es *eventStoreForTest) CollectEvents() []*si.EventRecord {
+       records := make([]*si.EventRecord, 0)
+       for {
+               select {
+                       case event := <-es.events:
+                               records = append(records, eventToRecord(event))
+                       default:
+                               return records
+               }
+       }
+}
+
+type eventPluginForTest struct {
+       records chan *si.EventRecord
+}
+
+// create and register mocked event plugin
+func createEventPluginForTest(t *testing.T) eventPluginForTest {
+       eventPlugin := eventPluginForTest{
+               records: make(chan *si.EventRecord, 3),
+       }
+       plugins.RegisterSchedulerPlugin(&eventPlugin)
+       if plugins.GetEventPlugin() == nil {
+               t.Fatalf("Event plugin should have been registered!")
+       }
+       return eventPlugin
+}
+
+func (ep *eventPluginForTest) SendEvent(events []*si.EventRecord) error {
+       for _, event := range events {
+               ep.records <- event
+       }
+       return nil
+}
+
+func (ep *eventPluginForTest) getNextEventRecord() *si.EventRecord {
+       select {
+               case record := <- ep.records:
+                       return record
+               default:
+                       return nil
+       }
+}
+
+func TestServiceStartStopWithoutEventPlugin(t *testing.T) {
+       store := &eventStoreForTest{
+               events: make(chan Event, 2),
+       }
+       publisher := NewShimPublisher(store)
+       publisher.StartService()
+       assert.Equal(t, publisher.GetEventStore(), store)
+       time.Sleep(100 * time.Millisecond)
+       publisher.Stop()
+}
+
+func TestServiceStartStopWithEventPlugin(t *testing.T) {
+       createEventPluginForTest(t)

Review comment:
       How do we test that the plugin is set correctly? This does not make any 
additional checks than the test without the plugin.

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO this should be configurable?
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+       channel    EventChannel     // channelling input events
+       store      EventStore       // storing events
+       publishers []EventPublisher // publishing events to sinks
+       started    bool             // whether the service is started
+       stopped    bool             // whether the service is stopped
+
+       sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+       once.Do(func(){
+               store := newEventStoreImpl()
+               pub := newShimPublisher(store)
+
+               cache = &EventCache{
+                       channel:    newEventChannelImpl(),
+                       store:      store,
+                       publishers: []EventPublisher{pub},
+                       started:    false,
+                       stopped:    false,
+               }
+       })
+       return cache
+}
+
+// TODO consider thread pool
+func (ec *EventCache) StartService() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       ec.started = true

Review comment:
       You should not mark it as started until all handling has been performed.
   If error states are introduced later on you have marked the service already 
started before the work has been done.

##########
File path: pkg/events/events.go
##########
@@ -0,0 +1,92 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "fmt"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "go.uber.org/zap"
+)
+
+type Event interface {
+       GetSource() interface{}
+       GetGroup() string
+       GetReason() string
+       GetMessage() string
+}
+
+func toEventMessage(e Event) (*si.EventRecord, error) {
+       eventType, id, err := convertSourceToTypeAndID(e.GetSource())
+       if err != nil {
+               return nil, err
+       }
+       return &si.EventRecord{
+               Type:     eventType,
+               ObjectID: id,
+               GroupID:  e.GetGroup(),
+               Reason:   e.GetReason(),
+               Message:  e.GetMessage(),
+       }, nil
+}
+
+func convertSourceToTypeAndID(obj interface{}) (si.EventRecord_Type, string, 
error) {
+       // TODO other type checks
+       if ask, ok := obj.(*si.AllocationAsk); ok {
+               return si.EventRecord_REQUEST, ask.AllocationKey, nil
+       }
+       log.Logger().Warn("Could not convert source object to 
EventMessageType", zap.Any("object", obj))
+
+       // TODO should add UNKNOWN request?
+       return si.EventRecord_REQUEST, "", fmt.Errorf("could not convert source 
object to EventMessageType")
+}
+
+type baseEvent struct {
+       source  interface{}

Review comment:
       This means that the garbage collector will never release the source 
object for a cleanup unless the event has been removed. Tracking the source 
object pointer will thus have a direct extended impact on memory use of the 
core.
   We need a real specific reason why would need to track the object and not an 
ID or something like that.

##########
File path: pkg/events/event_channel.go
##########
@@ -0,0 +1,74 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "fmt"
+       "strconv"
+       "sync"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO should configure the size of the channel
+const eventChannelSize = 100000
+
+// wrapping the channel into a struct - so that the underlying implementation 
can be changed
+type eventChannel struct {
+       events       chan Event

Review comment:
       This would be one place to use an atomic int.
   Normally I would say that atomic is not the right thing to use but in this 
case we only need to know that there is a guarantee that the update will be 
applied and that we always look at the right value.

##########
File path: pkg/events/event_store.go
##########
@@ -0,0 +1,73 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "go.uber.org/zap"
+)
+
+type EventStore interface {
+       Store(Event)
+       CollectEvents() []*si.EventRecord
+}
+
+type defaultEventStore struct {
+       eventMap map[interface{}]Event
+
+       sync.RWMutex
+}
+
+func newEventStoreImpl() EventStore {
+       return &defaultEventStore{
+               eventMap: make(map[interface{}]Event),
+       }
+}
+
+func (es *defaultEventStore) Store(event Event) {
+       es.Lock()
+       defer es.Unlock()
+
+       es.eventMap[event.GetSource()] = event
+}
+
+func (es *defaultEventStore) CollectEvents() []*si.EventRecord {
+       es.Lock()
+       defer es.Unlock()
+
+       messages := make([]*si.EventRecord, 0)
+
+       // collect events
+       for _, v := range es.eventMap {
+               message, err := toEventMessage(v)
+               if err != nil {
+                       log.Logger().Warn("Could not translate object to 
EventMessage", zap.Any("object", v))
+                       continue
+               }
+               messages = append(messages, message)
+       }
+
+       // clear map
+       es.eventMap = make(map[interface{}]Event)

Review comment:
       This means we can never use more than one publisher for events, that is 
a huge limitation.

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,143 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+const defaultEventChannelSize = 100000
+
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+       channel    EventChannel     // channelling input events
+       store      EventStore       // storing events
+       publishers []EventPublisher // publishing events to sinks
+       started    bool             // whether the service is started
+       stopped    bool             // whether the service is stopped
+
+       sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+       once.Do(func(){
+               store := newEventStoreImpl()
+
+               cache = &EventCache{
+                       channel:    
newEventChannelImpl(defaultEventChannelSize),
+                       store:      store,
+                       publishers: make([]EventPublisher, 0),
+                       started:    false,
+                       stopped:    false,
+               }
+       })
+       return cache
+}
+
+// TODO consider thread pool
+func (ec *EventCache) StartService() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       ec.started = true
+
+       // start main event processing thread
+       go ec.processEvent()
+}
+
+func (ec *EventCache) Stop() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started {
+               if ec.stopped {
+                       panic("could not stop EventCache service")

Review comment:
       We should never panic in production code.

##########
File path: pkg/events/event_cache_test.go
##########
@@ -0,0 +1,112 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "gotest.tools/assert"
+)
+
+func TestSingleEvent(t *testing.T) {
+       cache := GetEventCache()
+       store := cache.GetEventStore()
+
+       cache.StartService()
+
+       ask := si.AllocationAsk{
+               AllocationKey: "alloc1",
+       }
+       event := baseEvent{
+               source:  &ask,
+               group:   "app1",
+               reason:  "reason",
+               message: "message",
+       }
+       cache.AddEvent(&event)
+
+       // wait for cache to process the event
+       time.Sleep(2 * sleepTimeInterval)
+
+       records := store.CollectEvents()
+       assert.Equal(t, len(records), 1)
+       record := records[0]
+       assert.Equal(t, record.Type, si.EventRecord_REQUEST)
+       assert.Equal(t, record.ObjectID, "alloc1")
+       assert.Equal(t, record.GroupID, "app1")
+       assert.Equal(t, record.Message, "message")
+       assert.Equal(t, record.Reason, "reason")
+}
+
+func TestMultipleEvents(t *testing.T) {
+       cache := GetEventCache()
+       store := cache.GetEventStore()
+
+       cache.StartService()
+
+       ask1 := si.AllocationAsk{
+               AllocationKey: "alloc1",
+       }
+       ask2 := si.AllocationAsk{
+               AllocationKey: "alloc2",
+       }
+       event1 := baseEvent{
+               source:  &ask1,
+               group:   "app1",
+               reason:  "reason1",
+               message: "message1",
+       }
+       event2 := baseEvent{
+               source:  &ask1,
+               group:   "app1",
+               reason:  "reason2",
+               message: "message2",
+       }
+       event3 := baseEvent{
+               source:  &ask2,
+               group:   "app2",
+               reason:  "reason3",
+               message: "message3",
+       }
+       cache.AddEvent(&event1)
+       cache.AddEvent(&event2)
+       cache.AddEvent(&event3)
+
+       // wait for cache to process the event
+       time.Sleep(2 * sleepTimeInterval)
+
+       records := store.CollectEvents()
+       assert.Equal(t, len(records), 2)
+       for _, record := range records {
+               assert.Equal(t, record.Type, si.EventRecord_REQUEST)
+               if record.ObjectID == "alloc1" {
+                       assert.Equal(t, record.GroupID, "app1")
+                       assert.Equal(t, record.Message, "message2")
+                       assert.Equal(t, record.Reason, "reason2")
+               } else if record.ObjectID == "alloc2" {
+                       assert.Equal(t, record.GroupID, "app2")
+                       assert.Equal(t, record.Message, "message3")
+                       assert.Equal(t, record.Reason, "reason3")
+               } else {
+                       t.Fatalf("Unexpected allocation found")
+               }
+       }
+}

Review comment:
       missing newline at the end of file
   all files with the &ominus; at the end need to be fixed

##########
File path: pkg/common/resources/resources.go
##########
@@ -367,13 +367,19 @@ func subNonNegative(left, right *Resource) (*Resource, 
string) {
 // Check if smaller fitin larger, negative values will be treated as 0
 // A nil resource is treated as an empty resource (zero)
 func FitIn(larger, smaller *Resource) bool {
+       fitIn, _ := FitInWithExplanation(larger, smaller)
+       return fitIn
+}
+
+// TODO test this
+func FitInWithExplanation(larger, smaller *Resource) (bool, string)  {

Review comment:
       I do not see why we would do that here. We know exactly what happens: 
fit or not.
   
   Looking at the individual values is **not** a good idea. Ranging over a map 
(the quantities in the resource) is **guaranteed** to be random (part of the go 
language definition)
   So a call like this:
   ```
   larger = "first": 5, "second": 5
   smaller = "first": 10, "second": 10
   FitIn(larger, smaller)
   ```
   A call will return different results as explanation when run multiple times.
   You can not rely on the returned specific details.

##########
File path: pkg/events/event_publisher.go
##########
@@ -0,0 +1,89 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "go.uber.org/zap"
+)
+
+type EventPublisher interface {
+       StartService()
+       Stop()
+       GetEventStore() EventStore

Review comment:
       This seems to be only used for test. Do we really need to expose this 
because of that?
   If needed please annotate that it is only supposed to be used in test, 
otherwise do not export and use the object directly in the tests

##########
File path: pkg/events/event_publisher.go
##########
@@ -0,0 +1,89 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "go.uber.org/zap"
+)
+
+type EventPublisher interface {
+       StartService()
+       Stop()
+       GetEventStore() EventStore
+}
+
+type shimPublisher struct {
+       store EventStore
+       stopped bool

Review comment:
       This is used to signal not really to show a state, `stop` would be more 
appropriate.

##########
File path: pkg/events/event_cache_test.go
##########
@@ -0,0 +1,112 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "gotest.tools/assert"

Review comment:
       Check lint this should not pass the lint check

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO this should be configurable?
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+       channel    EventChannel     // channelling input events
+       store      EventStore       // storing events
+       publishers []EventPublisher // publishing events to sinks
+       started    bool             // whether the service is started
+       stopped    bool             // whether the service is stopped
+
+       sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+       once.Do(func(){
+               store := newEventStoreImpl()
+               pub := newShimPublisher(store)
+
+               cache = &EventCache{
+                       channel:    newEventChannelImpl(),
+                       store:      store,
+                       publishers: []EventPublisher{pub},
+                       started:    false,
+                       stopped:    false,
+               }
+       })
+       return cache
+}
+
+// TODO consider thread pool
+func (ec *EventCache) StartService() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       ec.started = true
+
+       // start main event processing thread
+       go ec.processEvent()
+
+       // start event publishers
+       for _, publisher := range ec.publishers {
+               publisher.StartService()
+       }
+}
+
+func (ec *EventCache) Stop() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started {
+               if ec.stopped {
+                       panic("could not stop EventCache service")
+               }
+               ec.stopped = true
+
+               // stop event publishers
+               for _, publisher := range ec.publishers {
+                       publisher.Stop()
+               }
+       }
+}
+
+func (ec *EventCache) AddPublisher(pub EventPublisher) {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started  {

Review comment:
       The event cache gets started as part of 
`startAllServicesWithParameters`. That means we have a race condition here. If 
the publishers are not added early enough in the startup cycle of the rest of 
the code the fact that the cache is started will prevent them from being added.
   
   We need to be more flexible and handle it better.

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,143 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+const defaultEventChannelSize = 100000
+
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second

Review comment:
       Why is this in the cache and not in the publisher?
   Each publisher could use its own interval, we should not have the cache 
define this.

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO this should be configurable?
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+       channel    EventChannel     // channelling input events
+       store      EventStore       // storing events
+       publishers []EventPublisher // publishing events to sinks
+       started    bool             // whether the service is started
+       stopped    bool             // whether the service is stopped
+
+       sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+       once.Do(func(){
+               store := newEventStoreImpl()
+               pub := newShimPublisher(store)
+
+               cache = &EventCache{
+                       channel:    newEventChannelImpl(),
+                       store:      store,
+                       publishers: []EventPublisher{pub},
+                       started:    false,
+                       stopped:    false,
+               }
+       })
+       return cache
+}
+
+// TODO consider thread pool
+func (ec *EventCache) StartService() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       ec.started = true
+
+       // start main event processing thread
+       go ec.processEvent()
+
+       // start event publishers
+       for _, publisher := range ec.publishers {
+               publisher.StartService()
+       }
+}
+
+func (ec *EventCache) Stop() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started {
+               if ec.stopped {
+                       panic("could not stop EventCache service")
+               }
+               ec.stopped = true

Review comment:
       If the event cache has been stopped `started` is true calling stop again 
will panic because `stopped` is also set.
   We should not use two flags, you cannot be in two states at the same time.

##########
File path: pkg/events/event_publisher_test.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "gotest.tools/assert"
+)
+
+type eventStoreForTest struct {
+       events chan Event
+}
+
+func eventToRecord(event Event) *si.EventRecord {
+       source, ok := event.GetSource().(string)
+       if !ok {
+               panic("Expected string in tests")
+       }

Review comment:
       This should be a test failure not a panic

##########
File path: pkg/events/event_publisher_test.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+       "gotest.tools/assert"
+)
+
+type eventStoreForTest struct {
+       events chan Event
+}
+
+func eventToRecord(event Event) *si.EventRecord {
+       source, ok := event.GetSource().(string)
+       if !ok {
+               panic("Expected string in tests")
+       }
+       return &si.EventRecord{
+               ObjectID: source,
+               GroupID: event.GetGroup(),
+               Reason: event.GetReason(),
+               Message: event.GetMessage(),
+       }
+}
+
+func (es *eventStoreForTest) Store(e Event) {
+       es.events <- e
+}
+
+func (es *eventStoreForTest) CollectEvents() []*si.EventRecord {
+       records := make([]*si.EventRecord, 0)
+       for {
+               select {
+                       case event := <-es.events:
+                               records = append(records, eventToRecord(event))
+                       default:
+                               return records
+               }
+       }
+}
+
+type eventPluginForTest struct {
+       records chan *si.EventRecord
+}
+
+// create and register mocked event plugin
+func createEventPluginForTest(t *testing.T) eventPluginForTest {

Review comment:
       This does not look right: pass in `test` for the failure. Make it a 
utility function and fail the test if a nil gets returned from here. 

##########
File path: pkg/events/event_cache.go
##########
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO this should be configurable?
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+       channel    EventChannel     // channelling input events
+       store      EventStore       // storing events
+       publishers []EventPublisher // publishing events to sinks
+       started    bool             // whether the service is started
+       stopped    bool             // whether the service is stopped
+
+       sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+       once.Do(func(){
+               store := newEventStoreImpl()
+               pub := newShimPublisher(store)
+
+               cache = &EventCache{
+                       channel:    newEventChannelImpl(),
+                       store:      store,
+                       publishers: []EventPublisher{pub},
+                       started:    false,
+                       stopped:    false,
+               }
+       })
+       return cache
+}
+
+// TODO consider thread pool
+func (ec *EventCache) StartService() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       ec.started = true
+
+       // start main event processing thread
+       go ec.processEvent()
+
+       // start event publishers
+       for _, publisher := range ec.publishers {
+               publisher.StartService()
+       }
+}
+
+func (ec *EventCache) Stop() {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started {
+               if ec.stopped {
+                       panic("could not stop EventCache service")
+               }
+               ec.stopped = true
+
+               // stop event publishers
+               for _, publisher := range ec.publishers {
+                       publisher.Stop()
+               }
+       }
+}
+
+func (ec *EventCache) AddPublisher(pub EventPublisher) {
+       ec.Lock()
+       defer ec.Unlock()
+
+       if ec.started  {
+               log.Logger().Error("Added Publisher to a running EventCache!")
+       } else {
+               ec.publishers = append(ec.publishers, pub)
+       }
+}
+
+func (ec *EventCache) AddEvent(event Event) {
+       ec.channel.AddEvent(event)
+}
+
+func (ec *EventCache) IsStarted() bool {
+       ec.Lock()
+       defer ec.Unlock()
+
+       return ec.started
+}
+
+func (ec *EventCache) isStopped() bool {
+       ec.Lock()
+       defer ec.Unlock()
+
+       return ec.stopped
+}
+
+func (ec *EventCache) processEvent() {
+       for {
+               // break the main loop if stopped
+               if ec.isStopped() {
+                       break
+               }
+               for {
+                       // do not process any new event if the process has been 
stopped
+                       if ec.isStopped() {
+                               break
+                       }
+                       // TODO for debugging: add time info about how long did 
this step take
+                       event, ok := ec.channel.GetNextEvent()

Review comment:
       if `GetNextEvent()` returns it has an event. That event is either nil or 
has a value. Why do we need the extra boolean?

##########
File path: pkg/events/event_publisher.go
##########
@@ -0,0 +1,89 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-yunikorn-core/pkg/log"
+       "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+       "go.uber.org/zap"
+)
+
+type EventPublisher interface {
+       StartService()
+       Stop()
+       GetEventStore() EventStore
+}
+
+type shimPublisher struct {
+       store EventStore
+       stopped bool
+
+       sync.Mutex
+}
+
+func NewShimPublisher(event EventStore) EventPublisher {
+       return &shimPublisher{
+               store:   event,
+               stopped: false,
+       }
+}
+
+func (sp *shimPublisher) isStopped() bool {
+       sp.Lock()
+       defer sp.Unlock()
+
+       return sp.stopped
+}
+
+func (sp *shimPublisher) StartService() {
+       go func () {
+               for {
+                       if sp.isStopped() {
+                               break
+                       }
+                       if eventPlugin := plugins.GetEventPlugin(); eventPlugin 
!= nil {
+                               messages := sp.store.CollectEvents()
+                               log.Logger().Debug("Sending events", 
zap.Int("number of messages", len(messages)))

Review comment:
       How do we handle requests from a certain point onwards ?
   Do we always get all events or is there a limit (like 500 or possible) ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to