[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events

2020-06-24 Thread GitBox


wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444960603



##
File path: pkg/events/events.go
##
@@ -0,0 +1,40 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "time"
+
+   "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+func createEventRecord(recordType si.EventRecord_Type, objectID, groupID, 
reason, message string) (*si.EventRecord, error) {
+   return {
+   Type:  recordType,
+   ObjectID:  objectID,
+   GroupID:   groupID,
+   Reason:reason,
+   Message:   message,

Review comment:
   sounds good, if in the future that changes we can adjust what we have.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events

2020-06-24 Thread GitBox


wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444959075



##
File path: pkg/events/event_publisher.go
##
@@ -0,0 +1,89 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "sync"
+   "time"
+
+   "github.com/apache/incubator-yunikorn-core/pkg/log"
+   "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+   "go.uber.org/zap"
+)
+
+type EventPublisher interface {
+   StartService()
+   Stop()
+   GetEventStore() EventStore
+}
+
+type shimPublisher struct {
+   store EventStore
+   stopped bool
+
+   sync.Mutex
+}
+
+func NewShimPublisher(event EventStore) EventPublisher {
+   return {
+   store:   event,
+   stopped: false,
+   }
+}
+
+func (sp *shimPublisher) isStopped() bool {
+   sp.Lock()
+   defer sp.Unlock()
+
+   return sp.stopped
+}
+
+func (sp *shimPublisher) StartService() {
+   go func () {
+   for {
+   if sp.isStopped() {
+   break
+   }
+   if eventPlugin := plugins.GetEventPlugin(); eventPlugin 
!= nil {
+   messages := sp.store.CollectEvents()
+   log.Logger().Debug("Sending events", 
zap.Int("number of messages", len(messages)))

Review comment:
   not at this point in time, we'll work on this when we see more perf 
numbers





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events

2020-06-24 Thread GitBox


wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444922722



##
File path: pkg/events/event_cache.go
##
@@ -0,0 +1,117 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "sync"
+
+   "github.com/apache/incubator-yunikorn-core/pkg/log"
+   "github.com/apache/incubator-yunikorn-core/pkg/metrics"
+   "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+// need to change for testing
+var defaultEventChannelSize = 10
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+   channel chan *si.EventRecord // channelling input eventChannel
+   store   EventStore   // storing eventChannel
+   stopchan bool// whether the service is stop
+
+   sync.Mutex
+}
+
+func GetEventCache() *EventCache {
+   once.Do(func() {
+   store := newEventStoreImpl()
+
+   cache = createEventCacheInternal(store)
+   })
+   return cache
+}
+
+func createEventCacheInternal(store EventStore) *EventCache {
+   return {
+   channel: nil,
+   store:   store,
+   stop:make(chan bool),
+   }
+}
+
+func (ec *EventCache) StartService() {
+   ec.Lock()
+   defer ec.Unlock()
+
+   ec.channel = make(chan *si.EventRecord, defaultEventChannelSize)
+
+   go ec.processEvent()
+}
+
+func (ec *EventCache) IsStarted() bool {
+   ec.Lock()
+   defer ec.Unlock()
+
+   return ec.channel != nil
+}
+
+func (ec *EventCache) Stop() {
+   ec.Lock()
+   defer ec.Unlock()
+
+   ec.stop <- true
+   if ec.channel != nil {
+   close(ec.channel)
+   ec.channel = nil
+   }
+}
+
+func (ec *EventCache) AddEvent(event *si.EventRecord) {
+   metrics.GetEventMetrics().IncEventsCreated()
+   select {
+   case ec.channel <- event:
+   metrics.GetEventMetrics().IncEventsChanneled()
+   default:
+   log.Logger().Debug("could not add Event to channel")
+   metrics.GetEventMetrics().IncEventsNotChanneled()
+   }
+}
+
+func (ec *EventCache) GetEventStore() EventStore {
+   return ec.store
+}
+
+func (ec *EventCache) processEvent() {
+   for {
+   select {
+   case <-ec.stop:
+   return
+   case event, ok := <-ec.channel:
+   if !ok {
+   return
+   }
+   if event != nil {
+   ec.store.Store(event)
+   metrics.GetEventMetrics().IncEventsProcessed()
+   }

Review comment:
   We should not allow nil values, and we do not do that on the sending 
side. If the event construction fails we do not send. It also does not make 
sense to send a nil event over the channel.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events

2020-06-23 Thread GitBox


wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r444627153



##
File path: pkg/events/event_store_test.go
##
@@ -0,0 +1,130 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "testing"
+
+   "gotest.tools/assert"
+
+   "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+func TestStoreAndRetrieveAllocationAsk(t *testing.T) {

Review comment:
   All these tests are independent of the type used.
   Change the name to not include the type info.

##
File path: pkg/scheduler/scheduling_application.go
##
@@ -703,3 +714,10 @@ func (sa *SchedulingApplication) finishRecovery() {
zap.Error(err))
}
 }
+
+func createInsufficientQueueResourcesEvent(ask *si.AllocationAsk, message 
string) (*si.EventRecord, error) {
+   if ask == nil {
+   return nil, fmt.Errorf("could not create Event object from nil 
AllocationAsk")
+   }
+   return events.CreateRequestEventRecord(ask.AllocationKey, 
ask.ApplicationID, "InsufficientQueueResources", message)

Review comment:
   Should use `events.CreateRequestEventRecord()` directly, not this wrapper

##
File path: pkg/events/event_cache_test.go
##
@@ -0,0 +1,170 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "testing"
+   "time"
+
+   "gotest.tools/assert"
+
+   "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+func TestStartStop(t *testing.T) {
+   cache := GetEventCache()
+   assert.Equal(t, cache.IsStarted(), false, "EventCache should not be 
started when constructed")
+   // adding event to stopped cache does not cause panic
+   cache.AddEvent(nil)
+   cache.StartService()
+   // add an event
+   cache.AddEvent(nil)
+   assert.Equal(t, cache.IsStarted(), true, "EventCache should have been 
started")
+   cache.Stop()
+   // adding event to stopped cache does not cause panic
+   cache.AddEvent(nil)
+   assert.Equal(t, cache.IsStarted(), false, "EventCache should have been 
stopped")
+}
+
+func TestSingleEvent(t *testing.T) {
+   cache := GetEventCache()
+   store := cache.GetEventStore()
+
+   cache.StartService()
+
+   event := si.EventRecord{
+   Type: si.EventRecord_REQUEST,
+   ObjectID: "alloc1",
+   GroupID:  "app1",
+   Reason:   "reason",
+   Message:  "message",
+   }
+   cache.AddEvent()
+
+   // wait for events to be processed
+   time.Sleep(1 * time.Millisecond)

Review comment:
   Should make this smart with a check of the number in the channel and 
when it drops to 0 progress
   Waiting 1msec could make the test flaky use a simple `len(cache.channel)` in 
a `waitFor` construct

##
File path: pkg/events/event_cache.go
##
@@ -0,0 +1,117 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless 

[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #141: [YUNIKORN-117] Create event cache for queue and application events

2020-06-09 Thread GitBox


wilfred-s commented on a change in pull request #141:
URL: 
https://github.com/apache/incubator-yunikorn-core/pull/141#discussion_r437121702



##
File path: pkg/events/event_publisher_test.go
##
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "testing"
+   "time"
+
+   "github.com/apache/incubator-yunikorn-core/pkg/plugins"
+   "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+   "gotest.tools/assert"
+)
+
+type eventStoreForTest struct {
+   events chan Event
+}
+
+func eventToRecord(event Event) *si.EventRecord {
+   source, ok := event.GetSource().(string)
+   if !ok {
+   panic("Expected string in tests")
+   }
+   return {
+   ObjectID: source,
+   GroupID: event.GetGroup(),
+   Reason: event.GetReason(),
+   Message: event.GetMessage(),
+   }
+}
+
+func (es *eventStoreForTest) Store(e Event) {
+   es.events <- e
+}
+
+func (es *eventStoreForTest) CollectEvents() []*si.EventRecord {
+   records := make([]*si.EventRecord, 0)
+   for {
+   select {
+   case event := <-es.events:
+   records = append(records, eventToRecord(event))
+   default:
+   return records
+   }
+   }
+}
+
+type eventPluginForTest struct {
+   records chan *si.EventRecord
+}
+
+// create and register mocked event plugin
+func createEventPluginForTest(t *testing.T) eventPluginForTest {
+   eventPlugin := eventPluginForTest{
+   records: make(chan *si.EventRecord, 3),
+   }
+   plugins.RegisterSchedulerPlugin()
+   if plugins.GetEventPlugin() == nil {
+   t.Fatalf("Event plugin should have been registered!")
+   }
+   return eventPlugin
+}
+
+func (ep *eventPluginForTest) SendEvent(events []*si.EventRecord) error {
+   for _, event := range events {
+   ep.records <- event
+   }
+   return nil
+}
+
+func (ep *eventPluginForTest) getNextEventRecord() *si.EventRecord {
+   select {
+   case record := <- ep.records:
+   return record
+   default:
+   return nil
+   }
+}
+
+func TestServiceStartStopWithoutEventPlugin(t *testing.T) {
+   store := {
+   events: make(chan Event, 2),
+   }
+   publisher := NewShimPublisher(store)
+   publisher.StartService()
+   assert.Equal(t, publisher.GetEventStore(), store)
+   time.Sleep(100 * time.Millisecond)
+   publisher.Stop()
+}
+
+func TestServiceStartStopWithEventPlugin(t *testing.T) {
+   createEventPluginForTest(t)

Review comment:
   How do we test that the plugin is set correctly? This does not make any 
additional checks than the test without the plugin.

##
File path: pkg/events/event_cache.go
##
@@ -0,0 +1,144 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+   "sync"
+   "time"
+
+   "github.com/apache/incubator-yunikorn-core/pkg/log"
+)
+
+// TODO this should be configurable?
+const sleepTimeInterval = 10 * time.Millisecond
+const pushEventInterval = 2 * time.Second
+
+var once sync.Once
+var cache *EventCache
+
+type EventCache struct {
+   channelEventChannel // channelling input events
+   store  EventStore   // storing