This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 1c712f8 feat: add Prometheus service discovery in Kubernetes (#59)
1c712f8 is described below
commit 1c712f850df1ffbd4b9cbf918929ef2fc4733dc6
Author: kv <[email protected]>
AuthorDate: Wed Aug 11 07:20:16 2021 +0800
feat: add Prometheus service discovery in Kubernetes (#59)
---
configs/satellite_config.yaml | 48 ++++++++++++++++++++
go.sum | 4 ++
internal/satellite/config/override_by_env.go | 2 +
.../satellite/module/gatherer/fetcher_gatherer.go | 18 +++++++-
plugins/fetcher/prometheus/fetcher.go | 51 ++++++++++++++++++----
plugins/fetcher/prometheus/fetcher_test.go | 18 ++++++++
6 files changed, 131 insertions(+), 10 deletions(-)
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index b7b64f6..0ea59ab 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -247,3 +247,51 @@ pipes:
client_name: grpc-client
forwarders:
- plugin_name: nativemeter-grpc-forwarder
+ - common_config:
+ pipe_name: prometheus-fetcher
+ gatherer:
+ fetcher:
+ plugin_name: "prometheus-metrics-fetcher"
+ scrape_configs:
+ - job_name: 'prometheus'
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+ static_configs:
+ - targets:
+ - "127.0.0.1:9100"
+ - job_name: 'prometheus-k8s'
+ metrics_path: '/metrics'
+ scrape_interval: 10s
+ tls_config:
+ ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
+ bearer_token_file:
/var/run/secrets/kubernetes.io/serviceaccount/token
+ kubernetes_sd_configs:
+ - role: pod
+ selectors:
+ - role: pod
+ label: "app=prometheus"
+ relabel_configs:
+ - source_labels: [ __meta_kubernetes_pod_name ]
+ separator: ;
+ regex: (.*)
+ target_label: pod
+ replacement: $1
+ action: replace
+ queue:
+ plugin_name: "memory-queue"
+ # The maximum buffer event size.
+ event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+ processor:
+ filters:
+ sender:
+ fallbacker:
+ plugin_name: none-fallbacker
+ # The time interval between two flush operations. And the time unit is
millisecond.
+ flush_time: ${SATELLITE_PROMETHEUS_SENDER_FLUSH_TIME:1000}
+ # The maximum buffer elements.
+ max_buffer_size: ${SATELLITE_PROMETHEUS_SENDER_MAX_BUFFER_SIZE:200}
+ # The minimum flush elements.
+ min_flush_events: ${SATELLITE_PROMETHEUS_SENDER_MIN_FLUSH_EVENTS:1}
+ client_name: grpc-client
+ forwarders:
+ - plugin_name: nativecds-grpc-forwarder
diff --git a/go.sum b/go.sum
index 4d603ef..e915536 100644
--- a/go.sum
+++ b/go.sum
@@ -14,6 +14,7 @@ cloud.google.com/go v0.54.0/go.mod
h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP
cloud.google.com/go v0.56.0/go.mod
h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
cloud.google.com/go v0.57.0/go.mod
h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod
h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
+cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8=
cloud.google.com/go v0.65.0/go.mod
h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go/bigquery v1.0.1/go.mod
h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod
h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
@@ -408,6 +409,7 @@ github.com/google/uuid v1.1.1/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod
h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5
h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.4.1
h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod
h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
@@ -838,6 +840,7 @@ go.opencensus.io v0.21.0/go.mod
h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@@ -1140,6 +1143,7 @@ google.golang.org/api v0.24.0/go.mod
h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
google.golang.org/api v0.28.0/go.mod
h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.29.0/go.mod
h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
google.golang.org/api v0.30.0/go.mod
h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
+google.golang.org/api v0.32.0 h1:Le77IccnTqEa8ryp9wIpX5W3zYm7Gf9LhOp9PHcwFts=
google.golang.org/api v0.32.0/go.mod
h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/appengine v1.1.0/go.mod
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
diff --git a/internal/satellite/config/override_by_env.go
b/internal/satellite/config/override_by_env.go
index 6ae678f..f8b4858 100644
--- a/internal/satellite/config/override_by_env.go
+++ b/internal/satellite/config/override_by_env.go
@@ -70,6 +70,8 @@ func overrideSlice(m []interface{}, regex *regexp.Regexp)
[]interface{} {
res = append(res, overrideMapStringInterface(val,
regex))
case map[interface{}]interface{}:
res = append(res, overrideMapInterfaceInterface(val,
regex))
+ case string:
+ res = append(res, val)
}
}
return res
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go
b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 3e3a791..ab68276 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -19,6 +19,7 @@ package gatherer
import (
"context"
+ "errors"
"sync"
"time"
@@ -26,6 +27,7 @@ import (
"github.com/apache/skywalking-satellite/internal/satellite/event"
module
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+ processor
"github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
queue "github.com/apache/skywalking-satellite/plugins/queue/api"
@@ -45,9 +47,17 @@ type FetcherGatherer struct {
// metrics
fetchCounter *telemetry.Counter
queueOutputCounter *telemetry.Counter
+
+ // sync invoker
+ processor processor.Processor
}
func (f *FetcherGatherer) Prepare() error {
+ log.Logger.WithField("pipe", f.config.PipeName).Info("fetcher gatherer
module is preparing...")
+ if err := f.runningQueue.Initialize(); err != nil {
+ log.Logger.WithField("pipe", f.config.PipeName).Infof("the %s
queue failed when initializing", f.runningQueue.Name())
+ return err
+ }
f.fetchCounter = telemetry.NewCounter("gatherer_fetch_count", "Total
number of the receiving count in the Gatherer.", "pipe", "status")
f.queueOutputCounter = telemetry.NewCounter("queue_output_count",
"Total number of the output count in the Queue of Gatherer.", "pipe", "status")
return nil
@@ -118,6 +128,10 @@ func (f *FetcherGatherer) Ack(lastOffset event.Offset) {
f.runningQueue.Ack(lastOffset)
}
-func (f *FetcherGatherer) SetProcessor(_ module.Module) error {
- return nil
+func (f *FetcherGatherer) SetProcessor(m module.Module) error {
+ if p, ok := m.(processor.Processor); ok {
+ f.processor = p
+ return nil
+ }
+ return errors.New("set processor only supports to inject processor
module")
}
diff --git a/plugins/fetcher/prometheus/fetcher.go
b/plugins/fetcher/prometheus/fetcher.go
index 1867341..e1f4bdf 100644
--- a/plugins/fetcher/prometheus/fetcher.go
+++ b/plugins/fetcher/prometheus/fetcher.go
@@ -27,6 +27,7 @@ import (
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
+ _ "github.com/prometheus/prometheus/discovery/install" // Need the
init() func in this package to register service discovery implement.
"github.com/prometheus/prometheus/scrape"
yaml "gopkg.in/yaml.v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -38,10 +39,39 @@ const (
)
type scrapeConfig struct {
- JobName string `yaml:"job_name"
mapstructure:"job_name"`
- ScrapeInterval time.Duration
`yaml:"scrape_interval,omitempty" mapstructure:"scrape_interval,omitempty"`
- StaticConfigs []map[string]interface{} `yaml:"static_configs"
mapstructure:"static_configs"`
- MetricsPath string `yaml:"metrics_path,omitempty"
mapstructure:"metrics_path,omitempty"`
+ JobName string `yaml:"job_name"
mapstructure:"job_name"`
+ ScrapeInterval time.Duration
`yaml:"scrape_interval,omitempty" mapstructure:"scrape_interval,omitempty"`
+ StaticConfigs []map[string]interface{}
`yaml:"static_configs,omitempty" mapstructure:"static_configs,omitempty"`
+ MetricsPath string
`yaml:"metrics_path,omitempty" mapstructure:"metrics_path,omitempty"`
+ TLSConfig tlsConfig
`yaml:"tls_config,omitempty" mapstructure:"tls_config,omitempty"`
+ BearerTokenFile string
`yaml:"bearer_token_file,omitempty" mapstructure:"bearer_token_file,omitempty"`
+ KubernetesSdConfigs []kubernetesSdConfig
`yaml:"kubernetes_sd_configs,omitempty"
mapstructure:"kubernetes_sd_configs,omitempty"`
+ RelabelConfigs []relabelConfig
`yaml:"relabel_configs,omitempty" mapstructure:"relabel_configs,omitempty"`
+}
+
+type tlsConfig struct {
+ CaFile string `yaml:"ca_file" mapstructure:"ca_file"`
+}
+
+// kubernetes_sd_configs []kubernetesSdConfig
+type kubernetesSdConfig struct {
+ Role string `yaml:"role,omitempty"
mapstructure:"role,omitempty"`
+ Selectors []selector `yaml:"selectors,omitempty"
mapstructure:"selectors,omitempty"`
+}
+
+// relabel_configs []relabelConfig
+type relabelConfig struct {
+ SourceLabels []string `yaml:"source_labels,omitempty"
mapstructure:"source_labels,omitempty"`
+ Separator string `yaml:"separator,omitempty"
mapstructure:"separator,omitempty"`
+ Regex string `yaml:"regex,omitempty"
mapstructure:"regex,omitempty"`
+ TargetLabel string `yaml:"target_label,omitempty"
mapstructure:"target_label,omitempty"`
+ Replacement string `yaml:"replacement,omitempty"
mapstructure:"replacement,omitempty"`
+ Action string `yaml:"action,omitempty"
mapstructure:"action,omitempty"`
+}
+
+type selector struct {
+ Role string `yaml:"role,omitempty" mapstructure:"role,omitempty"`
+ Label string `yaml:"label,omitempty" mapstructure:"label,omitempty"`
}
// Fetcher is the struct for Prometheus fetcher
@@ -68,21 +98,27 @@ func (f *Fetcher) Description() string {
func (f *Fetcher) DefaultConfig() string {
return `
-## some config here
scrape_configs:
- job_name: 'prometheus'
metrics_path: '/metrics'
scrape_interval: 10s
static_configs:
- - targets: ['127.0.0.1:2020']
+ - targets: ['127.0.0.1:9100']
`
}
func (f *Fetcher) Prepare() {}
func (f *Fetcher) Fetch(ctx context.Context) {
+ f.OutputChannel = make(chan *v1.SniffData, 100)
+ f.ScrapeConfig(ctx)
+ fetch(ctx, f.ScrapeConfigs, f.OutputChannel)
+}
+
+func (f *Fetcher) ScrapeConfig(ctx context.Context) {
// yaml
configDeclare := make(map[string]interface{})
+ log.Logger.Info(f.ScrapeConfigs)
configDeclare["scrape_configs"] = f.ScrapeConfigsMap
configBytes, err := yaml.Marshal(configDeclare)
if err != nil {
@@ -91,10 +127,9 @@ func (f *Fetcher) Fetch(ctx context.Context) {
log.Logger.Debug(string(configBytes))
configStruct, err := promConfig.Load(string(configBytes))
if err != nil {
- log.Logger.Fatal("prometheus fetcher configure load failed",
err.Error())
+ log.Logger.Fatal("prometheus fetcher configure load failed ",
err.Error())
}
f.ScrapeConfigs = configStruct.ScrapeConfigs
- fetch(ctx, f.ScrapeConfigs, f.OutputChannel)
}
func (f *Fetcher) Channel() <-chan *v1.SniffData {
diff --git a/plugins/fetcher/prometheus/fetcher_test.go
b/plugins/fetcher/prometheus/fetcher_test.go
index 0b47b1f..eacca28 100644
--- a/plugins/fetcher/prometheus/fetcher_test.go
+++ b/plugins/fetcher/prometheus/fetcher_test.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/skywalking-satellite/plugins/fetcher/api"
promcfg "github.com/prometheus/prometheus/config"
+ "github.com/spf13/viper"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v3"
"gotest.tools/assert"
@@ -280,3 +281,20 @@ func verifyTarget1(t *testing.T, em *v1.SniffData) {
assert.Assert(t, is.Contains(histogramElems,
histogram.GetName()), "Mismatch histogram meter")
}
}
+
+type Config map[string]interface{}
+
+func TestFetcher_ScrapeConfig(t *testing.T) {
+ f := &Fetcher{}
+ configYaml := f.DefaultConfig()
+ t.Log(configYaml)
+ // viper
+ v := viper.New()
+ v.SetConfigType("yaml")
+ err := v.ReadConfig(strings.NewReader(configYaml))
+ assert.NilError(t, err, "cannot read default config in the fetcher
plugin")
+ cfg := Config{}
+ if err := v.MergeConfigMap(cfg); err != nil {
+ assert.NilError(t, err, "config merge error")
+ }
+}