This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c712f8  feat: add Prometheus service discovery in Kubernetes (#59)
1c712f8 is described below

commit 1c712f850df1ffbd4b9cbf918929ef2fc4733dc6
Author: kv <[email protected]>
AuthorDate: Wed Aug 11 07:20:16 2021 +0800

    feat: add Prometheus service discovery in Kubernetes (#59)
---
 configs/satellite_config.yaml                      | 48 ++++++++++++++++++++
 go.sum                                             |  4 ++
 internal/satellite/config/override_by_env.go       |  2 +
 .../satellite/module/gatherer/fetcher_gatherer.go  | 18 +++++++-
 plugins/fetcher/prometheus/fetcher.go              | 51 ++++++++++++++++++----
 plugins/fetcher/prometheus/fetcher_test.go         | 18 ++++++++
 6 files changed, 131 insertions(+), 10 deletions(-)

diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index b7b64f6..0ea59ab 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -247,3 +247,51 @@ pipes:
       client_name: grpc-client
       forwarders:
         - plugin_name: nativemeter-grpc-forwarder
+  - common_config:
+      pipe_name: prometheus-fetcher
+    gatherer:
+      fetcher:
+        plugin_name: "prometheus-metrics-fetcher"
+        scrape_configs:
+          - job_name: 'prometheus'
+            metrics_path: '/metrics'
+            scrape_interval: 10s
+            static_configs:
+              - targets:
+                  - "127.0.0.1:9100"
+          - job_name: 'prometheus-k8s'
+            metrics_path: '/metrics'
+            scrape_interval: 10s
+            tls_config:
+              ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
+            bearer_token_file: 
/var/run/secrets/kubernetes.io/serviceaccount/token
+            kubernetes_sd_configs:
+              - role: pod
+                selectors:
+                  - role: pod
+                    label: "app=prometheus"
+            relabel_configs:
+              - source_labels: [ __meta_kubernetes_pod_name ]
+                separator: ;
+                regex: (.*)
+                target_label: pod
+                replacement: $1
+                action: replace
+      queue:
+        plugin_name: "memory-queue"
+        # The maximum buffer event size.
+        event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is 
millisecond.
+      flush_time: ${SATELLITE_PROMETHEUS_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_PROMETHEUS_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_PROMETHEUS_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: nativecds-grpc-forwarder
diff --git a/go.sum b/go.sum
index 4d603ef..e915536 100644
--- a/go.sum
+++ b/go.sum
@@ -14,6 +14,7 @@ cloud.google.com/go v0.54.0/go.mod 
h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP
 cloud.google.com/go v0.56.0/go.mod 
h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
 cloud.google.com/go v0.57.0/go.mod 
h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
 cloud.google.com/go v0.62.0/go.mod 
h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
+cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8=
 cloud.google.com/go v0.65.0/go.mod 
h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
 cloud.google.com/go/bigquery v1.0.1/go.mod 
h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
 cloud.google.com/go/bigquery v1.3.0/go.mod 
h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
@@ -408,6 +409,7 @@ github.com/google/uuid v1.1.1/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
 github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
 github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod 
h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
+github.com/googleapis/gax-go/v2 v2.0.5 
h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod 
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
 github.com/googleapis/gnostic v0.4.1 
h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
 github.com/googleapis/gnostic v0.4.1/go.mod 
h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
@@ -838,6 +840,7 @@ go.opencensus.io v0.21.0/go.mod 
h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
 go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.uber.org/atomic v1.3.2/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@@ -1140,6 +1143,7 @@ google.golang.org/api v0.24.0/go.mod 
h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M
 google.golang.org/api v0.28.0/go.mod 
h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
 google.golang.org/api v0.29.0/go.mod 
h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM=
 google.golang.org/api v0.30.0/go.mod 
h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
+google.golang.org/api v0.32.0 h1:Le77IccnTqEa8ryp9wIpX5W3zYm7Gf9LhOp9PHcwFts=
 google.golang.org/api v0.32.0/go.mod 
h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
 google.golang.org/appengine v1.1.0/go.mod 
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
 google.golang.org/appengine v1.2.0/go.mod 
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
diff --git a/internal/satellite/config/override_by_env.go 
b/internal/satellite/config/override_by_env.go
index 6ae678f..f8b4858 100644
--- a/internal/satellite/config/override_by_env.go
+++ b/internal/satellite/config/override_by_env.go
@@ -70,6 +70,8 @@ func overrideSlice(m []interface{}, regex *regexp.Regexp) 
[]interface{} {
                        res = append(res, overrideMapStringInterface(val, 
regex))
                case map[interface{}]interface{}:
                        res = append(res, overrideMapInterfaceInterface(val, 
regex))
+               case string:
+                       res = append(res, val)
                }
        }
        return res
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go 
b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 3e3a791..ab68276 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -19,6 +19,7 @@ package gatherer
 
 import (
        "context"
+       "errors"
        "sync"
        "time"
 
@@ -26,6 +27,7 @@ import (
        "github.com/apache/skywalking-satellite/internal/satellite/event"
        module 
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
        
"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
+       processor 
"github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
        "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
        fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
        queue "github.com/apache/skywalking-satellite/plugins/queue/api"
@@ -45,9 +47,17 @@ type FetcherGatherer struct {
        // metrics
        fetchCounter       *telemetry.Counter
        queueOutputCounter *telemetry.Counter
+
+       // sync invoker
+       processor processor.Processor
 }
 
 func (f *FetcherGatherer) Prepare() error {
+       log.Logger.WithField("pipe", f.config.PipeName).Info("fetcher gatherer 
module is preparing...")
+       if err := f.runningQueue.Initialize(); err != nil {
+               log.Logger.WithField("pipe", f.config.PipeName).Infof("the %s 
queue failed when initializing", f.runningQueue.Name())
+               return err
+       }
        f.fetchCounter = telemetry.NewCounter("gatherer_fetch_count", "Total 
number of the receiving count in the Gatherer.", "pipe", "status")
        f.queueOutputCounter = telemetry.NewCounter("queue_output_count", 
"Total number of the output count in the Queue of Gatherer.", "pipe", "status")
        return nil
@@ -118,6 +128,10 @@ func (f *FetcherGatherer) Ack(lastOffset event.Offset) {
        f.runningQueue.Ack(lastOffset)
 }
 
-func (f *FetcherGatherer) SetProcessor(_ module.Module) error {
-       return nil
+func (f *FetcherGatherer) SetProcessor(m module.Module) error {
+       if p, ok := m.(processor.Processor); ok {
+               f.processor = p
+               return nil
+       }
+       return errors.New("set processor only supports to inject processor 
module")
 }
diff --git a/plugins/fetcher/prometheus/fetcher.go 
b/plugins/fetcher/prometheus/fetcher.go
index 1867341..e1f4bdf 100644
--- a/plugins/fetcher/prometheus/fetcher.go
+++ b/plugins/fetcher/prometheus/fetcher.go
@@ -27,6 +27,7 @@ import (
 
        promConfig "github.com/prometheus/prometheus/config"
        "github.com/prometheus/prometheus/discovery"
+       _ "github.com/prometheus/prometheus/discovery/install" // Need the 
init() func in this package to register service discovery implement.
        "github.com/prometheus/prometheus/scrape"
        yaml "gopkg.in/yaml.v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -38,10 +39,39 @@ const (
 )
 
 type scrapeConfig struct {
-       JobName        string                   `yaml:"job_name" 
mapstructure:"job_name"`
-       ScrapeInterval time.Duration            
`yaml:"scrape_interval,omitempty" mapstructure:"scrape_interval,omitempty"`
-       StaticConfigs  []map[string]interface{} `yaml:"static_configs" 
mapstructure:"static_configs"`
-       MetricsPath    string                   `yaml:"metrics_path,omitempty" 
mapstructure:"metrics_path,omitempty"`
+       JobName             string                   `yaml:"job_name" 
mapstructure:"job_name"`
+       ScrapeInterval      time.Duration            
`yaml:"scrape_interval,omitempty" mapstructure:"scrape_interval,omitempty"`
+       StaticConfigs       []map[string]interface{} 
`yaml:"static_configs,omitempty" mapstructure:"static_configs,omitempty"`
+       MetricsPath         string                   
`yaml:"metrics_path,omitempty" mapstructure:"metrics_path,omitempty"`
+       TLSConfig           tlsConfig                
`yaml:"tls_config,omitempty" mapstructure:"tls_config,omitempty"`
+       BearerTokenFile     string                   
`yaml:"bearer_token_file,omitempty" mapstructure:"bearer_token_file,omitempty"`
+       KubernetesSdConfigs []kubernetesSdConfig     
`yaml:"kubernetes_sd_configs,omitempty" 
mapstructure:"kubernetes_sd_configs,omitempty"`
+       RelabelConfigs      []relabelConfig          
`yaml:"relabel_configs,omitempty" mapstructure:"relabel_configs,omitempty"`
+}
+
+type tlsConfig struct {
+       CaFile string `yaml:"ca_file" mapstructure:"ca_file"`
+}
+
+// kubernetes_sd_configs []kubernetesSdConfig
+type kubernetesSdConfig struct {
+       Role      string     `yaml:"role,omitempty" 
mapstructure:"role,omitempty"`
+       Selectors []selector `yaml:"selectors,omitempty" 
mapstructure:"selectors,omitempty"`
+}
+
+// relabel_configs []relabelConfig
+type relabelConfig struct {
+       SourceLabels []string `yaml:"source_labels,omitempty" 
mapstructure:"source_labels,omitempty"`
+       Separator    string   `yaml:"separator,omitempty" 
mapstructure:"separator,omitempty"`
+       Regex        string   `yaml:"regex,omitempty" 
mapstructure:"regex,omitempty"`
+       TargetLabel  string   `yaml:"target_label,omitempty" 
mapstructure:"target_label,omitempty"`
+       Replacement  string   `yaml:"replacement,omitempty" 
mapstructure:"replacement,omitempty"`
+       Action       string   `yaml:"action,omitempty" 
mapstructure:"action,omitempty"`
+}
+
+type selector struct {
+       Role  string `yaml:"role,omitempty" mapstructure:"role,omitempty"`
+       Label string `yaml:"label,omitempty" mapstructure:"label,omitempty"`
 }
 
 // Fetcher is the struct for Prometheus fetcher
@@ -68,21 +98,27 @@ func (f *Fetcher) Description() string {
 
 func (f *Fetcher) DefaultConfig() string {
        return `
-## some config here
 scrape_configs:
  - job_name: 'prometheus'
    metrics_path: '/metrics'
    scrape_interval: 10s
    static_configs:
-   - targets: ['127.0.0.1:2020']
+   - targets: ['127.0.0.1:9100']
 `
 }
 
 func (f *Fetcher) Prepare() {}
 
 func (f *Fetcher) Fetch(ctx context.Context) {
+       f.OutputChannel = make(chan *v1.SniffData, 100)
+       f.ScrapeConfig(ctx)
+       fetch(ctx, f.ScrapeConfigs, f.OutputChannel)
+}
+
+func (f *Fetcher) ScrapeConfig(ctx context.Context) {
        // yaml
        configDeclare := make(map[string]interface{})
+       log.Logger.Info(f.ScrapeConfigs)
        configDeclare["scrape_configs"] = f.ScrapeConfigsMap
        configBytes, err := yaml.Marshal(configDeclare)
        if err != nil {
@@ -91,10 +127,9 @@ func (f *Fetcher) Fetch(ctx context.Context) {
        log.Logger.Debug(string(configBytes))
        configStruct, err := promConfig.Load(string(configBytes))
        if err != nil {
-               log.Logger.Fatal("prometheus fetcher configure load failed", 
err.Error())
+               log.Logger.Fatal("prometheus fetcher configure load failed ", 
err.Error())
        }
        f.ScrapeConfigs = configStruct.ScrapeConfigs
-       fetch(ctx, f.ScrapeConfigs, f.OutputChannel)
 }
 
 func (f *Fetcher) Channel() <-chan *v1.SniffData {
diff --git a/plugins/fetcher/prometheus/fetcher_test.go 
b/plugins/fetcher/prometheus/fetcher_test.go
index 0b47b1f..eacca28 100644
--- a/plugins/fetcher/prometheus/fetcher_test.go
+++ b/plugins/fetcher/prometheus/fetcher_test.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-satellite/plugins/fetcher/api"
 
        promcfg "github.com/prometheus/prometheus/config"
+       "github.com/spf13/viper"
        "github.com/stretchr/testify/require"
        yaml "gopkg.in/yaml.v3"
        "gotest.tools/assert"
@@ -280,3 +281,20 @@ func verifyTarget1(t *testing.T, em *v1.SniffData) {
                assert.Assert(t, is.Contains(histogramElems, 
histogram.GetName()), "Mismatch histogram meter")
        }
 }
+
+type Config map[string]interface{}
+
+func TestFetcher_ScrapeConfig(t *testing.T) {
+       f := &Fetcher{}
+       configYaml := f.DefaultConfig()
+       t.Log(configYaml)
+       // viper
+       v := viper.New()
+       v.SetConfigType("yaml")
+       err := v.ReadConfig(strings.NewReader(configYaml))
+       assert.NilError(t, err, "cannot read default config in the fetcher 
plugin")
+       cfg := Config{}
+       if err := v.MergeConfigMap(cfg); err != nil {
+               assert.NilError(t, err, "config merge error")
+       }
+}

Reply via email to