[TC-109] Enhance event logging to bring it more in line with the Java version of Traffic Monitor, plus a few bug fixes around management of peer states, the health protocol, and opening log files.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/0da25442 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/0da25442 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/0da25442 Branch: refs/heads/master Commit: 0da254428cc60a5846843f52788cd6fb1e6cbdda Parents: a00660a Author: Jeff Elsloo <jeffrey_els...@cable.comcast.com> Authored: Fri Jan 20 16:04:06 2017 -0700 Committer: David Neuman <david.neuma...@gmail.com> Committed: Mon Jan 23 10:25:41 2017 -0700 ---------------------------------------------------------------------- traffic_monitor/experimental/common/log/log.go | 32 ++++--- .../experimental/common/util/join.go | 60 ++++++++++++++ .../experimental/conf/traffic_monitor.cfg | 1 + .../experimental/traffic_monitor/cache/data.go | 11 --- .../traffic_monitor/config/config.go | 10 +++ .../traffic_monitor/deliveryservice/stat.go | 17 +--- .../experimental/traffic_monitor/enum/enum.go | 8 ++ .../traffic_monitor/health/cache_health.go | 35 +++++--- .../traffic_monitor/health/event.go | 36 ++++++++ .../traffic_monitor/manager/datarequest.go | 12 ++- .../traffic_monitor/manager/healthresult.go | 8 +- .../traffic_monitor/manager/manager.go | 9 +- .../traffic_monitor/manager/peer.go | 87 +++++++++++++++----- .../traffic_monitor/peer/crstates.go | 44 ++++++++-- .../experimental/traffic_monitor/peer/peer.go | 14 ++-- .../traffic_monitor/threadsafe/events.go | 19 +++-- .../traffic_monitor/traffic_monitor.go | 22 +++-- 17 files changed, 315 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/common/log/log.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/common/log/log.go b/traffic_monitor/experimental/common/log/log.go index 0fd33c9..b3b567b 100644 --- a/traffic_monitor/experimental/common/log/log.go +++ b/traffic_monitor/experimental/common/log/log.go @@ -9,9 +9,9 @@ package log * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,7 +20,6 @@ package log * under the License. */ - import ( "fmt" "io" @@ -33,40 +32,49 @@ var ( Info *log.Logger Warning *log.Logger Error *log.Logger + Event *log.Logger ) -func Init(errW, warnW, infoW, debugW io.Writer) { +func Init(eventW, errW, warnW, infoW, debugW io.Writer) { Debug = log.New(debugW, "DEBUG: ", log.Lshortfile) Info = log.New(infoW, "INFO: ", log.Lshortfile) Warning = log.New(warnW, "WARNING: ", log.Lshortfile) Error = log.New(errW, "ERROR: ", log.Lshortfile) + Event = log.New(eventW, "", 0) } const timeFormat = time.RFC3339Nano +const stackFrame = 3 func Errorf(format string, v ...interface{}) { - Error.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) + Error.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) } func Errorln(v ...interface{}) { - Error.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) + Error.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) } func Warnf(format string, v ...interface{}) { - Warning.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) + Warning.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) } func Warnln(v ...interface{}) { - Warning.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) + Warning.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) } func Infof(format string, v ...interface{}) { - Info.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) + Info.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) } func Infoln(v ...interface{}) { - Info.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) + Info.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) } func Debugf(format string, v ...interface{}) { - Debug.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) + Debug.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintf(format, v...)) } func Debugln(v ...interface{}) { - Debug.Output(3, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) + Debug.Output(stackFrame, time.Now().Format(timeFormat)+": "+fmt.Sprintln(v...)) +} + +// event log entries (TM event.log, TR access.log, etc) +func Eventf(t time.Time, format string, v ...interface{}) { + // 1484001185.287 ... + Event.Printf("%.3f %s", float64(t.Unix())+(float64(t.Nanosecond())/1e9), fmt.Sprintf(format, v...)) } // Close calls `Close()` on the given Closer, and logs any error. On error, the context is logged, followed by a colon, the error message, and a newline. This is primarily designed to be used in `defer`, for example, `defer log.Close(resp.Body, "readData fetching /foo/bar")`. http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/common/util/join.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/common/util/join.go b/traffic_monitor/experimental/common/util/join.go new file mode 100644 index 0000000..d5b647b --- /dev/null +++ b/traffic_monitor/experimental/common/util/join.go @@ -0,0 +1,60 @@ +package util + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "fmt" +) + +func JoinErrorsString(errs []error) string { + joined := JoinErrors(errs) + + if joined == nil { + return "" + } + + return joined.Error() +} + +func JoinErrors(errs []error) error { + return JoinErrorsSep(errs, "") +} + +func JoinErrorsSep(errs []error, separator string) error { + if separator == "" { + separator = ", " + } + + joinedErrors := "" + + for _, err := range errs { + if err != nil { + joinedErrors += err.Error() + separator + } + } + + if len(joinedErrors) == 0 { + return nil + } + + joinedErrors = joinedErrors[:len(joinedErrors)-len(separator)] // strip trailing separator + + return fmt.Errorf("%s", joinedErrors) +} http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/conf/traffic_monitor.cfg ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/conf/traffic_monitor.cfg b/traffic_monitor/experimental/conf/traffic_monitor.cfg index f2c25a0..0ab06bb 100644 --- a/traffic_monitor/experimental/conf/traffic_monitor.cfg +++ b/traffic_monitor/experimental/conf/traffic_monitor.cfg @@ -4,6 +4,7 @@ "monitor_config_polling_interval_ms": 5000, "http_timeout_ms": 2000, "peer_polling_interval_ms": 5000, + "peer_optimistic": true, "max_events": 200, "max_stat_history": 5, "max_health_history": 5, http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/cache/data.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/cache/data.go b/traffic_monitor/experimental/traffic_monitor/cache/data.go index e22e174..811babb 100644 --- a/traffic_monitor/experimental/traffic_monitor/cache/data.go +++ b/traffic_monitor/experimental/traffic_monitor/cache/data.go @@ -50,17 +50,6 @@ func (a AvailableStatuses) Copy() AvailableStatuses { return b } -// Event represents an event change in aggregated data. For example, a cache being marked as unavailable. -type Event struct { - Index uint64 `json:"index"` - Time int64 `json:"time"` - Description string `json:"description"` - Name enum.CacheName `json:"name"` - Hostname enum.CacheName `json:"hostname"` - Type string `json:"type"` - Available bool `json:"isAvailable"` -} - // ResultHistory is a map of cache names, to an array of result history from each cache. type ResultHistory map[enum.CacheName][]Result http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/config/config.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/config/config.go b/traffic_monitor/experimental/traffic_monitor/config/config.go index 24ba42a..1941ec6 100644 --- a/traffic_monitor/experimental/traffic_monitor/config/config.go +++ b/traffic_monitor/experimental/traffic_monitor/config/config.go @@ -46,6 +46,7 @@ type Config struct { MonitorConfigPollingInterval time.Duration `json:"-"` HTTPTimeout time.Duration `json:"-"` PeerPollingInterval time.Duration `json:"-"` + PeerOptimistic bool `json:"peer_optimistic"` MaxEvents uint64 `json:"max_events"` MaxStatHistory uint64 `json:"max_stat_history"` MaxHealthHistory uint64 `json:"max_health_history"` @@ -55,6 +56,7 @@ type Config struct { LogLocationWarning string `json:"log_location_warning"` LogLocationInfo string `json:"log_location_info"` LogLocationDebug string `json:"log_location_debug"` + LogLocationEvent string `json:"log_location_event"` ServeReadTimeout time.Duration `json:"-"` ServeWriteTimeout time.Duration `json:"-"` HealthToStatRatio uint64 `json:"health_to_stat_ratio"` @@ -69,6 +71,7 @@ var DefaultConfig = Config{ MonitorConfigPollingInterval: 5 * time.Second, HTTPTimeout: 2 * time.Second, PeerPollingInterval: 5 * time.Second, + PeerOptimistic: true, MaxEvents: 200, MaxStatHistory: 5, MaxHealthHistory: 5, @@ -78,6 +81,7 @@ var DefaultConfig = Config{ LogLocationWarning: LogLocationStdout, LogLocationInfo: LogLocationNull, LogLocationDebug: LogLocationNull, + LogLocationEvent: LogLocationStdout, ServeReadTimeout: 10 * time.Second, ServeWriteTimeout: 10 * time.Second, HealthToStatRatio: 4, @@ -94,6 +98,7 @@ func (c *Config) MarshalJSON() ([]byte, error) { MonitorConfigPollingIntervalMs uint64 `json:"monitor_config_polling_interval_ms"` HTTPTimeoutMS uint64 `json:"http_timeout_ms"` PeerPollingIntervalMs uint64 `json:"peer_polling_interval_ms"` + PeerOptimistic bool `json:"peer_optimistic"` HealthFlushIntervalMs uint64 `json:"health_flush_interval_ms"` StatFlushIntervalMs uint64 `json:"stat_flush_interval_ms"` ServeReadTimeoutMs uint64 `json:"serve_read_timeout_ms"` @@ -105,6 +110,7 @@ func (c *Config) MarshalJSON() ([]byte, error) { MonitorConfigPollingIntervalMs: uint64(c.MonitorConfigPollingInterval / time.Millisecond), HTTPTimeoutMS: uint64(c.HTTPTimeout / time.Millisecond), PeerPollingIntervalMs: uint64(c.PeerPollingInterval / time.Millisecond), + PeerOptimistic: bool(true), HealthFlushIntervalMs: uint64(c.HealthFlushInterval / time.Millisecond), StatFlushIntervalMs: uint64(c.StatFlushInterval / time.Millisecond), Alias: (*Alias)(c), @@ -120,6 +126,7 @@ func (c *Config) UnmarshalJSON(data []byte) error { MonitorConfigPollingIntervalMs *uint64 `json:"monitor_config_polling_interval_ms"` HTTPTimeoutMS *uint64 `json:"http_timeout_ms"` PeerPollingIntervalMs *uint64 `json:"peer_polling_interval_ms"` + PeerOptimistic *bool `json:"peer_optimistic"` HealthFlushIntervalMs *uint64 `json:"health_flush_interval_ms"` StatFlushIntervalMs *uint64 `json:"stat_flush_interval_ms"` ServeReadTimeoutMs *uint64 `json:"serve_read_timeout_ms"` @@ -159,6 +166,9 @@ func (c *Config) UnmarshalJSON(data []byte) error { if aux.ServeWriteTimeoutMs != nil { c.ServeWriteTimeout = time.Duration(*aux.ServeWriteTimeoutMs) * time.Millisecond } + if aux.PeerOptimistic != nil { + c.PeerOptimistic = *aux.PeerOptimistic + } return nil } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go index 538eb02..3b7f35c 100644 --- a/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go +++ b/traffic_monitor/experimental/traffic_monitor/deliveryservice/stat.go @@ -22,6 +22,7 @@ package deliveryservice import ( "fmt" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/util" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache" dsdata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservicedata" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum" @@ -253,20 +254,6 @@ func addLastStat(lastData LastStatData, newStat int64, newStatTime time.Time) (L return lastData, nil } -func combineErrs(errs []error) error { - combinedErr := "" - for _, err := range errs { - if err != nil { - combinedErr += err.Error() + ", " - } - } - if len(combinedErr) == 0 { - return nil - } - combinedErr = combinedErr[:len(combinedErr)-2] // strip trailing ', ' - return fmt.Errorf("%s", combinedErr) -} - func addLastStats(lastData LastStatsData, newStats dsdata.StatCacheStats, newStatsTime time.Time) (LastStatsData, error) { errs := []error{nil, nil, nil, nil, nil} lastData.Bytes, errs[0] = addLastStat(lastData.Bytes, newStats.OutBytes.Value, newStatsTime) @@ -274,7 +261,7 @@ func addLastStats(lastData LastStatsData, newStats dsdata.StatCacheStats, newSta lastData.Status3xx, errs[2] = addLastStat(lastData.Status3xx, newStats.Status3xx.Value, newStatsTime) lastData.Status4xx, errs[3] = addLastStat(lastData.Status4xx, newStats.Status4xx.Value, newStatsTime) lastData.Status5xx, errs[4] = addLastStat(lastData.Status5xx, newStats.Status5xx.Value, newStatsTime) - return lastData, combineErrs(errs) + return lastData, util.JoinErrors(errs) } func addLastStatsToStatCacheStats(s dsdata.StatCacheStats, l LastStatsData) dsdata.StatCacheStats { http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/enum/enum.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/enum/enum.go b/traffic_monitor/experimental/traffic_monitor/enum/enum.go index 733bba5..ce838ac 100644 --- a/traffic_monitor/experimental/traffic_monitor/enum/enum.go +++ b/traffic_monitor/experimental/traffic_monitor/enum/enum.go @@ -49,6 +49,14 @@ const ( CacheTypeInvalid = CacheType("") ) +func (c CacheName) String() string { + return string(c) +} + +func (t TrafficMonitorName) String() string { + return string(t) +} + // String returns a string representation of this cache type. func (t CacheType) String() string { switch t { http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/health/cache_health.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go index b57d143..7e9af51 100644 --- a/traffic_monitor/experimental/traffic_monitor/health/cache_health.go +++ b/traffic_monitor/experimental/traffic_monitor/health/cache_health.go @@ -127,30 +127,39 @@ func cacheCapacityKbps(result cache.Result) int64 { func EvalCache(result cache.Result, mc *traffic_ops.TrafficMonitorConfigMap) (bool, string) { toServer := mc.TrafficServer[string(result.ID)] status := enum.CacheStatusFromString(toServer.Status) - if status == enum.CacheStatusInvalid { - log.Errorf("Cache %v got invalid status from Traffic Ops '%v' - treating as Reported\n", result.ID, toServer.Status) - } params := mc.Profile[toServer.Profile].Parameters kbpsThreshold, hasKbpsThreshold := getKbpsThreshold(params.HealthThresholdAvailableBandwidthInKbps) queryTimeThreshold, hasQueryTimeThreshold := getQueryThreshold(int64(params.HealthThresholdQueryTime)) + availability := "available" + if !result.Available { + availability = "unavailable" + } + switch { + case status == enum.CacheStatusInvalid: + log.Errorf("Cache %v got invalid status from Traffic Ops '%v' - treating as OFFLINE\n", result.ID, toServer.Status) + return false, getEventDescription(status, availability+"; invalid status") case status == enum.CacheStatusAdminDown: - return false, "set to " + status.String() + return false, getEventDescription(status, availability) case status == enum.CacheStatusOffline: - log.Errorf("Cache %v set to offline, but still polled\n", result.ID) - return false, "set to " + status.String() + log.Errorf("Cache %v set to OFFLINE, but still polled\n", result.ID) + return false, getEventDescription(status, availability) case status == enum.CacheStatusOnline: - return true, "set to " + status.String() + return true, getEventDescription(status, availability) case result.Error != nil: - return false, fmt.Sprintf("error: %v", result.Error) + return false, getEventDescription(status, fmt.Sprintf("%v", result.Error)) case result.Vitals.LoadAvg > params.HealthThresholdLoadAvg && params.HealthThresholdLoadAvg != 0: - return false, fmt.Sprintf("load average %f exceeds threshold %f", result.Vitals.LoadAvg, params.HealthThresholdLoadAvg) - case hasKbpsThreshold && result.Vitals.KbpsOut > cacheCapacityKbps(result)-kbpsThreshold: - return false, fmt.Sprintf("%dkbps exceeds max %dkbps", result.Vitals.KbpsOut, kbpsThreshold) + return false, getEventDescription(status, fmt.Sprintf("loadavg too high (%.5f > %.5f)", result.Vitals.LoadAvg, params.HealthThresholdLoadAvg)) + case hasKbpsThreshold && cacheCapacityKbps(result)-result.Vitals.KbpsOut < kbpsThreshold: + return false, getEventDescription(status, fmt.Sprintf("availableBandwidthInKbps too low (%d < %d)", cacheCapacityKbps(result)-result.Vitals.KbpsOut, kbpsThreshold)) case hasQueryTimeThreshold && result.RequestTime > queryTimeThreshold: - return false, fmt.Sprintf("request time %v exceeds max %v", result.RequestTime, queryTimeThreshold) + return false, getEventDescription(status, fmt.Sprintf("queryTime too high (%.5f > %.5f)", float64(result.RequestTime.Nanoseconds())/1e6, float64(queryTimeThreshold.Nanoseconds())/1e6)) default: - return result.Available, "reported" + return result.Available, getEventDescription(status, availability) } } + +func getEventDescription(status enum.CacheStatus, message string) string { + return fmt.Sprintf("%s - %s", status, message) +} http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/health/event.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/health/event.go b/traffic_monitor/experimental/traffic_monitor/health/event.go new file mode 100644 index 0000000..47563fa --- /dev/null +++ b/traffic_monitor/experimental/traffic_monitor/health/event.go @@ -0,0 +1,36 @@ +package health + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "time" +) + +// Event represents an event change in aggregated data. For example, a cache being marked as unavailable. +type Event struct { + Time time.Time `json:"-"` + Index uint64 `json:"index"` + Unix int64 `json:"time"` + Description string `json:"description"` + Name string `json:"name"` + Hostname string `json:"hostname"` + Type string `json:"type"` + Available bool `json:"isAvailable"` +} http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go index b3beee8..e4a35ed 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go @@ -35,6 +35,7 @@ import ( ds "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservice" dsdata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/deliveryservicedata" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/srvhttp" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe" @@ -45,7 +46,7 @@ import ( // JSONEvents represents the structure we wish to serialize to JSON, for Events. type JSONEvents struct { - Events []cache.Event `json:"events"` + Events []health.Event `json:"events"` } // CacheState represents the available state of a cache. @@ -612,7 +613,7 @@ func srvPeerStates(params url.Values, errorCount threadsafe.Uint, path string, t HandleErr(errorCount, path, err) return []byte(err.Error()), http.StatusBadRequest } - bytes, err := json.Marshal(createAPIPeerStates(peerStates.Get(), filter, params)) + bytes, err := json.Marshal(createAPIPeerStates(peerStates.GetCrstates(), filter, params)) return WrapErrCode(errorCount, path, bytes, err) } @@ -965,11 +966,16 @@ func createCacheStatuses( log.Warnf("cache not in statuses %s\n", cacheName) } else { statusString := statusVal.Status + " - " - if statusVal.Available { + + // this should match the event string, use as the default if possible + if statusVal.Why != "" { + statusString = statusVal.Why + } else if statusVal.Available { statusString += "available" } else { statusString += fmt.Sprintf("unavailable (%s)", statusVal.Why) } + status = &statusString } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go index 9c150e4..ed9f9fc 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go @@ -86,9 +86,9 @@ func StartHealthResultManager( fetchCount threadsafe.Uint, errorCount threadsafe.Uint, cfg config.Config, -) (DurationMapThreadsafe, threadsafe.Events, threadsafe.CacheAvailableStatus, threadsafe.ResultHistory) { + events threadsafe.Events, +) (DurationMapThreadsafe, threadsafe.CacheAvailableStatus, threadsafe.ResultHistory) { lastHealthDurations := NewDurationMapThreadsafe() - events := threadsafe.NewEvents(cfg.MaxEvents) localCacheStatus := threadsafe.NewCacheAvailableStatus() healthHistory := threadsafe.NewResultHistory() go healthResultManagerListen( @@ -106,7 +106,7 @@ func StartHealthResultManager( localCacheStatus, cfg, ) - return lastHealthDurations, events, localCacheStatus, healthHistory + return lastHealthDurations, localCacheStatus, healthHistory } func healthResultManagerListen( @@ -235,7 +235,7 @@ func processHealthResult( isAvailable, whyAvailable := health.EvalCache(healthResult, &monitorConfigCopy) if available, ok := localStates.GetCache(healthResult.ID); !ok || available.IsAvailable != isAvailable { log.Infof("Changing state for %s was: %t now: %t because %s error: %v", healthResult.ID, prevResult.Available, isAvailable, whyAvailable, healthResult.Error) - events.Add(cache.Event{Time: time.Now().Unix(), Description: whyAvailable, Name: healthResult.ID, Hostname: healthResult.ID, Type: toDataCopy.ServerTypes[healthResult.ID].String(), Available: isAvailable}) + events.Add(health.Event{Time: healthResult.Time, Unix: healthResult.Time.Unix(), Description: whyAvailable, Name: healthResult.ID.String(), Hostname: healthResult.ID.String(), Type: toDataCopy.ServerTypes[healthResult.ID].String(), Available: isAvailable}) } localCacheStatus[healthResult.ID] = cache.AvailableStatus{ http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/manager/manager.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go index e794b7c..26ebb37 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go @@ -63,6 +63,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) // TODO investigate whether a unique client per cache to be polled is faster sharedClient := &http.Client{ Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}, + Timeout: cfg.HTTPTimeout, } localStates := peer.NewCRStatesThreadsafe() // this is the local state as discoverer by this traffic_monitor @@ -86,6 +87,8 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) go cacheStatPoller.Poll() go peerPoller.Poll() + events := threadsafe.NewEvents(cfg.MaxEvents) + cachesChanged := make(chan struct{}) monitorConfig := StartMonitorConfigManager( @@ -103,6 +106,9 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) peerHandler.ResultChannel, localStates, peerStates, + events, + cfg.PeerOptimistic, + toData, ) statInfoHistory, statResultHistory, statMaxKbpses, _, lastKbpsStats, dsStats, unpolledCaches := StartStatHistoryManager( @@ -116,7 +122,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) monitorConfig, ) - lastHealthDurations, events, localCacheStatus, healthHistory := StartHealthResultManager( + lastHealthDurations, localCacheStatus, healthHistory := StartHealthResultManager( cacheHealthHandler.ResultChannel, toData, localStates, @@ -126,6 +132,7 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) fetchCount, errorCount, cfg, + events, ) StartOpsConfigManager( http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/manager/peer.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/peer.go b/traffic_monitor/experimental/traffic_monitor/manager/peer.go index 5a94e70..2f8cbdb 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/peer.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/peer.go @@ -20,11 +20,18 @@ package manager */ import ( + "fmt" "sort" + "strings" + "time" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/util" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/enum" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health" "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/peer" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/threadsafe" + todata "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/trafficopsdata" ) // StartPeerManager listens for peer results, and when it gets one, it adds it to the peerStates list, and optimistically combines the good results into combinedStates @@ -32,43 +39,83 @@ func StartPeerManager( peerChan <-chan peer.Result, localStates peer.CRStatesThreadsafe, peerStates peer.CRStatesPeersThreadsafe, + events threadsafe.Events, + peerOptimistic bool, + toData todata.TODataThreadsafe, ) peer.CRStatesThreadsafe { combinedStates := peer.NewCRStatesThreadsafe() + overrideMap := map[enum.CacheName]bool{} + go func() { - for crStatesResult := range peerChan { - peerStates.Set(crStatesResult.ID, crStatesResult.PeerStats) - combineCrStates(peerStates.Get(), localStates.Get(), combinedStates) - crStatesResult.PollFinished <- crStatesResult.PollID + for peerResult := range peerChan { + comparePeerState(events, peerResult, peerStates) + peerStates.Set(peerResult) + combineCrStates(events, peerOptimistic, peerStates, localStates.Get(), combinedStates, overrideMap, toData) + peerResult.PollFinished <- peerResult.PollID } }() return combinedStates } +func comparePeerState(events threadsafe.Events, result peer.Result, peerStates peer.CRStatesPeersThreadsafe) { + if result.Available != peerStates.GetPeerAvailability(result.ID) { + events.Add(health.Event{Time: result.Time, Unix: result.Time.Unix(), Description: util.JoinErrorsString(result.Errors), Name: result.ID.String(), Hostname: result.ID.String(), Type: "Peer", Available: result.Available}) + } +} + // TODO JvD: add deliveryservice stuff -func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, localStates peer.Crstates, combinedStates peer.CRStatesThreadsafe) { +func combineCrStates(events threadsafe.Events, peerOptimistic bool, peerStates peer.CRStatesPeersThreadsafe, localStates peer.Crstates, combinedStates peer.CRStatesThreadsafe, overrideMap map[enum.CacheName]bool, toData todata.TODataThreadsafe) { + toDataCopy := toData.Get() + for cacheName, localCacheState := range localStates.Caches { // localStates gets pruned when servers are disabled, it's the source of truth - downVotes := 0 // TODO JvD: change to use parameter when deciding to be optimistic or pessimistic. + var overrideCondition string available := false + override := overrideMap[cacheName] + if localCacheState.IsAvailable { - // log.Infof(cacheName, " is available locally - setting to IsAvailable: true") available = true // we don't care about the peers, we got a "good one", and we're optimistic - } else { - downVotes++ // localStates says it's not happy - for _, peerCrStates := range peerStates { - if peerCrStates.Caches[cacheName].IsAvailable { - // log.Infoln(cacheName, "- locally we think it's down, but", peerName, "says IsAvailable: ", peerCrStates.Caches[cacheName].IsAvailable, "trusting the peer.") - available = true // we don't care about the peers, we got a "good one", and we're optimistic - break // one peer that thinks we're good is all we need. + + if override { + overrideCondition = "cleared; healthy locally" + overrideMap[cacheName] = false + } + } else if peerOptimistic { + if !peerStates.HasAvailablePeers() { + if override { + overrideCondition = "irrelevant; no peers online" + overrideMap[cacheName] = false + } + } else { + onlineOnPeers := make([]string, 0) + + for peer, peerCrStates := range peerStates.GetCrstates() { + if peerStates.GetPeerAvailability(peer) { + if peerCrStates.Caches[cacheName].IsAvailable { + onlineOnPeers = append(onlineOnPeers, peer.String()) + } + } + } + + if len(onlineOnPeers) > 0 { + available = true + + if !override { + overrideCondition = fmt.Sprintf("detected; healthy on (at least) %s", strings.Join(onlineOnPeers, ", ")) + overrideMap[cacheName] = true + } } else { - // log.Infoln(cacheName, "- locally we think it's down, and", peerName, "says IsAvailable: ", peerCrStates.Caches[cacheName].IsAvailable, "down voting") - downVotes++ // peerStates for this peer doesn't like it + if override { + overrideCondition = "irrelevant; not online on any peers" + overrideMap[cacheName] = false + } } } } - if downVotes > len(peerStates) { - // log.Infoln(cacheName, "-", downVotes, "down votes, setting to IsAvailable: false") - available = false + + if overrideCondition != "" { + events.Add(health.Event{Time: time.Now(), Unix: time.Now().Unix(), Description: fmt.Sprintf("Health protocol override condition %s", overrideCondition), Name: cacheName.String(), Hostname: cacheName.String(), Type: toDataCopy.ServerTypes[cacheName].String(), Available: available}) } + combinedStates.SetCache(cacheName, peer.IsAvailable{IsAvailable: available}) } @@ -79,7 +126,7 @@ func combineCrStates(peerStates map[enum.TrafficMonitorName]peer.Crstates, local } deliveryService.DisabledLocations = localDeliveryService.DisabledLocations - for peerName, iPeerStates := range peerStates { + for peerName, iPeerStates := range peerStates.GetCrstates() { peerDeliveryService, ok := iPeerStates.Deliveryservice[deliveryServiceName] if !ok { log.Warnf("local delivery service %s not found in peer %s\n", deliveryServiceName, peerName) http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/peer/crstates.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go index ef5176a..cac90e1 100644 --- a/traffic_monitor/experimental/traffic_monitor/peer/crstates.go +++ b/traffic_monitor/experimental/traffic_monitor/peer/crstates.go @@ -177,17 +177,18 @@ func (t *CRStatesThreadsafe) DeleteDeliveryService(name enum.DeliveryServiceName // CRStatesPeersThreadsafe provides safe access for multiple goroutines to read a map of Traffic Monitor peers to their returned Crstates, with a single goroutine writer. // This could be made lock-free, if the performance was necessary type CRStatesPeersThreadsafe struct { - crStates map[enum.TrafficMonitorName]Crstates - m *sync.RWMutex + crStates map[enum.TrafficMonitorName]Crstates + peerStates map[enum.TrafficMonitorName]bool + m *sync.RWMutex } // NewCRStatesPeersThreadsafe creates a new CRStatesPeers object safe for multiple goroutine readers and a single writer. func NewCRStatesPeersThreadsafe() CRStatesPeersThreadsafe { - return CRStatesPeersThreadsafe{m: &sync.RWMutex{}, crStates: map[enum.TrafficMonitorName]Crstates{}} + return CRStatesPeersThreadsafe{m: &sync.RWMutex{}, crStates: map[enum.TrafficMonitorName]Crstates{}, peerStates: map[enum.TrafficMonitorName]bool{}} } -// Get returns the internal Traffic Monitor peer Crstates data. This MUST NOT be modified. -func (t *CRStatesPeersThreadsafe) Get() map[enum.TrafficMonitorName]Crstates { +// GetCrstates returns the internal Traffic Monitor peer Crstates data. This MUST NOT be modified. +func (t *CRStatesPeersThreadsafe) GetCrstates() map[enum.TrafficMonitorName]Crstates { t.m.RLock() m := map[enum.TrafficMonitorName]Crstates{} for k, v := range t.crStates { @@ -197,9 +198,36 @@ func (t *CRStatesPeersThreadsafe) Get() map[enum.TrafficMonitorName]Crstates { return m } -// Set sets the internal Traffic Monitor peer Crstates data. This MUST NOT be called by multiple goroutines. -func (t *CRStatesPeersThreadsafe) Set(peerName enum.TrafficMonitorName, peerState Crstates) { +// GetPeerAvailability returns the state of the given peer +func (t *CRStatesPeersThreadsafe) GetPeerAvailability(peer enum.TrafficMonitorName) bool { + t.m.RLock() + availability := t.peerStates[peer] + t.m.RUnlock() + return availability +} + +// HasAvailablePeers returns true if at least one peer is online +func (t *CRStatesPeersThreadsafe) HasAvailablePeers() bool { + availablePeers := false + + t.m.RLock() + + for _, available := range t.peerStates { + if available { + availablePeers = true + break + } + } + + t.m.RUnlock() + + return availablePeers +} + +// Set sets the internal Traffic Monitor peer state and Crstates data. This MUST NOT be called by multiple goroutines. +func (t *CRStatesPeersThreadsafe) Set(result Result) { t.m.Lock() - t.crStates[peerName] = peerState + t.crStates[result.ID] = result.PeerStates + t.peerStates[result.ID] = result.Available t.m.Unlock() } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/peer/peer.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/peer/peer.go b/traffic_monitor/experimental/traffic_monitor/peer/peer.go index c8958c1..66755f8 100644 --- a/traffic_monitor/experimental/traffic_monitor/peer/peer.go +++ b/traffic_monitor/experimental/traffic_monitor/peer/peer.go @@ -8,9 +8,9 @@ package peer * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,6 @@ package peer * under the License. */ - import ( "encoding/json" "io" @@ -44,9 +43,10 @@ type Result struct { ID enum.TrafficMonitorName Available bool Errors []error - PeerStats Crstates + PeerStates Crstates PollID uint64 PollFinished chan<- uint64 + Time time.Time } // Handle handles a response from a polled Traffic Monitor peer, parsing the data and forwarding it to the ResultChannel. @@ -57,6 +57,7 @@ func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, err Errors: []error{}, PollID: pollID, PollFinished: pollFinished, + Time: time.Now(), } if err != nil { @@ -65,10 +66,11 @@ func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, err if r != nil { dec := json.NewDecoder(r) + err = dec.Decode(&result.PeerStates) - if err := dec.Decode(&result.PeerStats); err == io.EOF { + if err == nil { result.Available = true - } else if err != nil { + } else { result.Errors = append(result.Errors, err) } } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go b/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go index 6f92481..fe78ba5 100644 --- a/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go +++ b/traffic_monitor/experimental/traffic_monitor/threadsafe/events.go @@ -22,19 +22,20 @@ package threadsafe import ( "sync" - "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/cache" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/common/log" + "github.com/apache/incubator-trafficcontrol/traffic_monitor/experimental/traffic_monitor/health" ) // Events provides safe access for multiple goroutines readers and a single writer to a stored Events slice. type Events struct { - events *[]cache.Event + events *[]health.Event m *sync.RWMutex nextIndex *uint64 max uint64 } -func copyEvents(a []cache.Event) []cache.Event { - b := make([]cache.Event, len(a), len(a)) +func copyEvents(a []health.Event) []health.Event { + b := make([]health.Event, len(a), len(a)) copy(b, a) return b } @@ -42,21 +43,23 @@ func copyEvents(a []cache.Event) []cache.Event { // NewEvents creates a new single-writer-multiple-reader Threadsafe object func NewEvents(maxEvents uint64) Events { i := uint64(0) - return Events{m: &sync.RWMutex{}, events: &[]cache.Event{}, nextIndex: &i, max: maxEvents} + return Events{m: &sync.RWMutex{}, events: &[]health.Event{}, nextIndex: &i, max: maxEvents} } // Get returns the internal slice of Events for reading. This MUST NOT be modified. If modification is necessary, copy the slice. -func (o *Events) Get() []cache.Event { +func (o *Events) Get() []health.Event { o.m.RLock() defer o.m.RUnlock() return *o.events } // Add adds the given event. This is threadsafe for one writer, multiple readers. This MUST NOT be called by multiple threads, as it non-atomically fetches and adds. -func (o *Events) Add(e cache.Event) { +func (o *Events) Add(e health.Event) { + // host="hostname", type=EDGE, available=true, msg="REPORTED - available" + log.Eventf(e.Time, "host=\"%s\", type=%s, available=%t, msg=\"%s\"", e.Hostname, e.Type, e.Available, e.Description) events := copyEvents(*o.events) e.Index = *o.nextIndex - events = append([]cache.Event{e}, events...) + events = append([]health.Event{e}, events...) if len(events) > int(o.max) { events = (events)[:o.max-1] } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/0da25442/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go b/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go index 671e10e..f97cf3d 100644 --- a/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go +++ b/traffic_monitor/experimental/traffic_monitor/traffic_monitor.go @@ -92,27 +92,31 @@ func getLogWriter(location string) (io.Writer, error) { case config.LogLocationNull: return ioutil.Discard, nil default: - return os.Open(location) + return os.OpenFile(location, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) } } -func getLogWriters(errLoc, warnLoc, infoLoc, debugLoc string) (io.Writer, io.Writer, io.Writer, io.Writer, error) { +func getLogWriters(eventLoc, errLoc, warnLoc, infoLoc, debugLoc string) (io.Writer, io.Writer, io.Writer, io.Writer, io.Writer, error) { + eventW, err := getLogWriter(eventLoc) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("getting log event writer %v: %v", eventLoc, err) + } errW, err := getLogWriter(errLoc) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("getting log error writer %v: %v", errLoc, err) + return nil, nil, nil, nil, nil, fmt.Errorf("getting log error writer %v: %v", errLoc, err) } warnW, err := getLogWriter(warnLoc) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("getting log warning writer %v: %v", warnLoc, err) + return nil, nil, nil, nil, nil, fmt.Errorf("getting log warning writer %v: %v", warnLoc, err) } infoW, err := getLogWriter(infoLoc) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("getting log info writer %v: %v", infoLoc, err) + return nil, nil, nil, nil, nil, fmt.Errorf("getting log info writer %v: %v", infoLoc, err) } debugW, err := getLogWriter(debugLoc) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("getting log debug writer %v: %v", debugLoc, err) + return nil, nil, nil, nil, nil, fmt.Errorf("getting log debug writer %v: %v", debugLoc, err) } - return errW, warnW, infoW, debugW, nil + return eventW, errW, warnW, infoW, debugW, nil } func main() { @@ -140,12 +144,12 @@ func main() { os.Exit(1) } - errW, warnW, infoW, debugW, err := getLogWriters(cfg.LogLocationError, cfg.LogLocationWarning, cfg.LogLocationInfo, cfg.LogLocationDebug) + eventW, errW, warnW, infoW, debugW, err := getLogWriters(cfg.LogLocationEvent, cfg.LogLocationError, cfg.LogLocationWarning, cfg.LogLocationInfo, cfg.LogLocationDebug) if err != nil { fmt.Printf("Error starting service: failed to create log writers: %v\n", err) os.Exit(1) } - log.Init(errW, warnW, infoW, debugW) + log.Init(eventW, errW, warnW, infoW, debugW) log.Infof("Starting with config %+v\n", cfg)