This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 580e7d4 feat: expose more prometheus metrics (#670)
580e7d4 is described below
commit 580e7d4117f9e3c2a8ed6c313b857beed0e2dd6a
Author: Sindweller <[email protected]>
AuthorDate: Fri Nov 19 17:22:23 2021 +0800
feat: expose more prometheus metrics (#670)
---
pkg/apisix/cluster.go | 48 +++++++++++++++++++++-----
pkg/apisix/cluster_test.go | 17 ++++++----
pkg/apisix/consumer.go | 16 ++++++---
pkg/apisix/consumer_test.go | 10 +++---
pkg/apisix/global_rule.go | 16 ++++++---
pkg/apisix/global_rule_test.go | 10 +++---
pkg/apisix/plugin.go | 2 +-
pkg/apisix/plugin_test.go | 10 +++---
pkg/apisix/route.go | 16 ++++++---
pkg/apisix/route_test.go | 10 +++---
pkg/apisix/schema.go | 4 +--
pkg/apisix/schema_test.go | 10 +++---
pkg/apisix/ssl.go | 15 +++++---
pkg/apisix/ssl_test.go | 10 +++---
pkg/apisix/stream_route.go | 15 +++++---
pkg/apisix/stream_route_test.go | 10 +++---
pkg/apisix/upstream.go | 15 +++++---
pkg/apisix/upstream_test.go | 10 +++---
pkg/ingress/apisix_cluster_config.go | 9 +++++
pkg/ingress/apisix_consumer.go | 10 ++++--
pkg/ingress/apisix_route.go | 9 +++++
pkg/ingress/apisix_tls.go | 10 ++++--
pkg/ingress/apisix_upstream.go | 8 +++++
pkg/ingress/controller.go | 19 ++++++-----
pkg/ingress/endpoint.go | 10 ++++--
pkg/ingress/endpointslice.go | 10 ++++--
pkg/ingress/ingress.go | 8 +++++
pkg/ingress/pod.go | 6 ++++
pkg/ingress/pod_test.go | 4 +++
pkg/ingress/secret.go | 10 ++++--
pkg/metrics/prometheus.go | 66 +++++++++++++++++++++++++++++++-----
pkg/metrics/prometheus_test.go | 46 ++++++++++++++++++++++++-
test/e2e/scaffold/k8s.go | 31 ++++++++++-------
33 files changed, 379 insertions(+), 121 deletions(-)
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index ef4ef98..42a7f7a 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -77,7 +77,8 @@ type ClusterOptions struct {
BaseURL string
Timeout time.Duration
// SyncInterval is the interval to sync schema.
- SyncInterval types.TimeDuration
+ SyncInterval types.TimeDuration
+ MetricsCollector metrics.Collector
}
type cluster struct {
@@ -129,7 +130,7 @@ func newCluster(ctx context.Context, o *ClusterOptions)
(Cluster, error) {
},
cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
- metricsCollector: metrics.NewPrometheusCollector(),
+ metricsCollector: o.MetricsCollector,
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
@@ -160,11 +161,13 @@ func (c *cluster) syncCache(ctx context.Context) {
zap.String("cost_time",
time.Since(now).String()),
zap.String("cluster", c.name),
)
+ c.metricsCollector.IncrCacheSyncOperation("success")
} else {
log.Errorw("failed to sync cache",
zap.String("cost_time",
time.Since(now).String()),
zap.String("cluster", c.name),
)
+ c.metricsCollector.IncrCacheSyncOperation("failure")
}
}()
@@ -486,15 +489,19 @@ func (c *cluster) do(req *http.Request) (*http.Response,
error) {
return c.cli.Do(req)
}
-func (c *cluster) getResource(ctx context.Context, url string) (*getResponse,
error) {
+func (c *cluster) getResource(ctx context.Context, url, resource string)
(*getResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "get")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
@@ -515,15 +522,19 @@ func (c *cluster) getResource(ctx context.Context, url
string) (*getResponse, er
return &res, nil
}
-func (c *cluster) listResource(ctx context.Context, url string)
(*listResponse, error) {
+func (c *cluster) listResource(ctx context.Context, url, resource string)
(*listResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "list")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
err = multierr.Append(err, fmt.Errorf("unexpected status code
%d", resp.StatusCode))
@@ -540,15 +551,18 @@ func (c *cluster) listResource(ctx context.Context, url
string) (*listResponse,
return &list, nil
}
-func (c *cluster) createResource(ctx context.Context, url string, body
io.Reader) (*createResponse, error) {
+func (c *cluster) createResource(ctx context.Context, url, resource string,
body io.Reader) (*createResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "create")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
defer drainBody(resp.Body, url)
@@ -566,15 +580,19 @@ func (c *cluster) createResource(ctx context.Context, url
string, body io.Reader
return &cr, nil
}
-func (c *cluster) updateResource(ctx context.Context, url string, body
io.Reader) (*updateResponse, error) {
+func (c *cluster) updateResource(ctx context.Context, url, resource string,
body io.Reader) (*updateResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return nil, err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "update")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK && resp.StatusCode !=
http.StatusCreated {
@@ -590,15 +608,19 @@ func (c *cluster) updateResource(ctx context.Context, url
string, body io.Reader
return &ur, nil
}
-func (c *cluster) deleteResource(ctx context.Context, url string) error {
+func (c *cluster) deleteResource(ctx context.Context, url, resource string)
error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "delete")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK && resp.StatusCode !=
http.StatusNoContent && resp.StatusCode != http.StatusNotFound {
@@ -648,15 +670,19 @@ func readBody(r io.ReadCloser, url string) string {
}
// getSchema returns the schema of APISIX object.
-func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
+func (c *cluster) getSchema(ctx context.Context, url, resource string)
(string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return "", err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getSchema")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
@@ -672,15 +698,19 @@ func (c *cluster) getSchema(ctx context.Context, url
string) (string, error) {
}
// getList returns a list of string.
-func (c *cluster) getList(ctx context.Context, url string) ([]string, error) {
+func (c *cluster) getList(ctx context.Context, url, resource string)
([]string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
+ start := time.Now()
resp, err := c.do(req)
if err != nil {
return nil, err
}
+ c.metricsCollector.RecordAPISIXLatency(time.Since(start), "getList")
+ c.metricsCollector.RecordAPISIXCode(resp.StatusCode, resource)
+
defer drainBody(resp.Body, url)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
diff --git a/pkg/apisix/cluster_test.go b/pkg/apisix/cluster_test.go
index 9f31e3e..cd6f323 100644
--- a/pkg/apisix/cluster_test.go
+++ b/pkg/apisix/cluster_test.go
@@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -29,7 +30,8 @@ func TestAddCluster(t *testing.T) {
assert.Nil(t, err)
err = apisix.AddCluster(context.Background(), &ClusterOptions{
- BaseURL: "http://service1:9080/apisix/admin",
+ BaseURL: "http://service1:9080/apisix/admin",
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)
@@ -37,14 +39,16 @@ func TestAddCluster(t *testing.T) {
assert.Len(t, clusters, 1)
err = apisix.AddCluster(context.Background(), &ClusterOptions{
- Name: "service2",
- BaseURL: "http://service2:9080/apisix/admin",
+ Name: "service2",
+ BaseURL: "http://service2:9080/apisix/admin",
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)
err = apisix.AddCluster(context.Background(), &ClusterOptions{
- Name: "service2",
- AdminKey: "http://service3:9080/apisix/admin",
+ Name: "service2",
+ AdminKey: "http://service3:9080/apisix/admin",
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Equal(t, ErrDuplicatedCluster, err)
@@ -57,7 +61,8 @@ func TestNonExistentCluster(t *testing.T) {
assert.Nil(t, err)
err = apisix.AddCluster(context.Background(), &ClusterOptions{
- BaseURL: "http://service1:9080/apisix/admin",
+ BaseURL: "http://service1:9080/apisix/admin",
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
assert.Nil(t, err)
diff --git a/pkg/apisix/consumer.go b/pkg/apisix/consumer.go
index 7f7a53a..01557ed 100644
--- a/pkg/apisix/consumer.go
+++ b/pkg/apisix/consumer.go
@@ -65,7 +65,8 @@ func (r *consumerClient) Get(ctx context.Context, name
string) (*v1.Consumer, er
// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + name
- resp, err := r.cluster.getResource(ctx, url)
+ resp, err := r.cluster.getResource(ctx, url, "consumer")
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("consumer not found",
@@ -109,7 +110,8 @@ func (r *consumerClient) List(ctx context.Context)
([]*v1.Consumer, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
- consumerItems, err := r.cluster.listResource(ctx, r.url)
+ consumerItems, err := r.cluster.listResource(ctx, r.url, "consumer")
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to list consumers: %s", err)
return nil, err
@@ -153,7 +155,8 @@ func (r *consumerClient) Create(ctx context.Context, obj
*v1.Consumer) (*v1.Cons
url := r.url + "/" + obj.Username
log.Debugw("creating consumer", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
+ resp, err := r.cluster.createResource(ctx, url, "consumer",
bytes.NewReader(data))
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
log.Errorf("failed to create consumer: %s", err)
return nil, err
@@ -180,9 +183,11 @@ func (r *consumerClient) Delete(ctx context.Context, obj
*v1.Consumer) error {
return err
}
url := r.url + "/" + obj.Username
- if err := r.cluster.deleteResource(ctx, url); err != nil {
+ if err := r.cluster.deleteResource(ctx, url, "consumer"); err != nil {
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
return err
}
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err := r.cluster.cache.DeleteConsumer(obj); err != nil {
log.Errorf("failed to reflect consumer delete to cache: %s",
err)
if err != cache.ErrNotFound {
@@ -208,7 +213,8 @@ func (r *consumerClient) Update(ctx context.Context, obj
*v1.Consumer) (*v1.Cons
}
url := r.url + "/" + obj.Username
log.Debugw("updating username", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ resp, err := r.cluster.updateResource(ctx, url, "consumer",
bytes.NewReader(body))
+ r.cluster.metricsCollector.IncrAPISIXRequest("consumer")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/consumer_test.go b/pkg/apisix/consumer_test.go
index e029d13..95bb0b4 100644
--- a/pkg/apisix/consumer_test.go
+++ b/pkg/apisix/consumer_test.go
@@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -151,10 +152,11 @@ func TestConsumerClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newConsumerClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/apisix/global_rule.go b/pkg/apisix/global_rule.go
index e2e9108..df9586f 100644
--- a/pkg/apisix/global_rule.go
+++ b/pkg/apisix/global_rule.go
@@ -67,7 +67,8 @@ func (r *globalRuleClient) Get(ctx context.Context, name
string) (*v1.GlobalRule
// TODO Add mutex here to avoid dog-pile effect.
url := r.url + "/" + rid
- resp, err := r.cluster.getResource(ctx, url)
+ resp, err := r.cluster.getResource(ctx, url, "globalRule")
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("global_rule not found",
@@ -111,7 +112,8 @@ func (r *globalRuleClient) List(ctx context.Context)
([]*v1.GlobalRule, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
- globalRuleItems, err := r.cluster.listResource(ctx, r.url)
+ globalRuleItems, err := r.cluster.listResource(ctx, r.url, "globalRule")
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to list global_rules: %s", err)
return nil, err
@@ -155,7 +157,8 @@ func (r *globalRuleClient) Create(ctx context.Context, obj
*v1.GlobalRule) (*v1.
url := r.url + "/" + obj.ID
log.Debugw("creating global_rule", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
+ resp, err := r.cluster.createResource(ctx, url, "globalRule",
bytes.NewReader(data))
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
log.Errorf("failed to create global_rule: %s", err)
return nil, err
@@ -182,9 +185,11 @@ func (r *globalRuleClient) Delete(ctx context.Context, obj
*v1.GlobalRule) error
return err
}
url := r.url + "/" + obj.ID
- if err := r.cluster.deleteResource(ctx, url); err != nil {
+ if err := r.cluster.deleteResource(ctx, url, "globalRule"); err != nil {
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
return err
}
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err := r.cluster.cache.DeleteGlobalRule(obj); err != nil {
log.Errorf("failed to reflect global_rule delete to cache: %s",
err)
if err != cache.ErrNotFound {
@@ -210,7 +215,8 @@ func (r *globalRuleClient) Update(ctx context.Context, obj
*v1.GlobalRule) (*v1.
}
url := r.url + "/" + obj.ID
log.Debugw("updating global_rule", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ resp, err := r.cluster.updateResource(ctx, url, "globalRule",
bytes.NewReader(body))
+ r.cluster.metricsCollector.IncrAPISIXRequest("globalRule")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/global_rule_test.go b/pkg/apisix/global_rule_test.go
index 01a4d38..770fc65 100644
--- a/pkg/apisix/global_rule_test.go
+++ b/pkg/apisix/global_rule_test.go
@@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -151,10 +152,11 @@ func TestGlobalRuleClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newGlobalRuleClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/apisix/plugin.go b/pkg/apisix/plugin.go
index 03acce8..92a9c96 100644
--- a/pkg/apisix/plugin.go
+++ b/pkg/apisix/plugin.go
@@ -41,7 +41,7 @@ func (p *pluginClient) List(ctx context.Context) ([]string,
error) {
zap.String("cluster", "default"),
zap.String("url", p.url),
)
- pluginList, err := p.cluster.getList(ctx, p.url+"/list")
+ pluginList, err := p.cluster.getList(ctx, p.url+"/list", "plugin")
if err != nil {
log.Errorf("failed to list plugins' names: %s", err)
return nil, err
diff --git a/pkg/apisix/plugin_test.go b/pkg/apisix/plugin_test.go
index 3ee6c71..159e887 100644
--- a/pkg/apisix/plugin_test.go
+++ b/pkg/apisix/plugin_test.go
@@ -22,6 +22,7 @@ import (
"strings"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
@@ -89,10 +90,11 @@ func TestPluginClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newPluginClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// List
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index e4c0ef2..f1ab4a8 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -68,7 +68,8 @@ func (r *routeClient) Get(ctx context.Context, name string)
(*v1.Route, error) {
// TODO Add mutex here to avoid dog-pile effection.
url := r.url + "/" + rid
- resp, err := r.cluster.getResource(ctx, url)
+ resp, err := r.cluster.getResource(ctx, url, "route")
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("route not found",
@@ -112,7 +113,8 @@ func (r *routeClient) List(ctx context.Context)
([]*v1.Route, error) {
zap.String("cluster", "default"),
zap.String("url", r.url),
)
- routeItems, err := r.cluster.listResource(ctx, r.url)
+ routeItems, err := r.cluster.listResource(ctx, r.url, "route")
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err != nil {
log.Errorf("failed to list routes: %s", err)
return nil, err
@@ -156,7 +158,8 @@ func (r *routeClient) Create(ctx context.Context, obj
*v1.Route) (*v1.Route, err
url := r.url + "/" + obj.ID
log.Debugw("creating route", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
+ resp, err := r.cluster.createResource(ctx, url, "route",
bytes.NewReader(data))
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err != nil {
log.Errorf("failed to create route: %s", err)
return nil, err
@@ -184,9 +187,11 @@ func (r *routeClient) Delete(ctx context.Context, obj
*v1.Route) error {
return err
}
url := r.url + "/" + obj.ID
- if err := r.cluster.deleteResource(ctx, url); err != nil {
+ if err := r.cluster.deleteResource(ctx, url, "route"); err != nil {
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
return err
}
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err := r.cluster.cache.DeleteRoute(obj); err != nil {
log.Errorf("failed to reflect route delete to cache: %s", err)
if err != cache.ErrNotFound {
@@ -212,7 +217,8 @@ func (r *routeClient) Update(ctx context.Context, obj
*v1.Route) (*v1.Route, err
}
url := r.url + "/" + obj.ID
log.Debugw("updating route", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ resp, err := r.cluster.updateResource(ctx, url, "route",
bytes.NewReader(body))
+ r.cluster.metricsCollector.IncrAPISIXRequest("route")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/route_test.go b/pkg/apisix/route_test.go
index 9aef459..050c368 100644
--- a/pkg/apisix/route_test.go
+++ b/pkg/apisix/route_test.go
@@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
@@ -172,10 +173,11 @@ func TestRouteClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newRouteClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/apisix/schema.go b/pkg/apisix/schema.go
index 97c6c97..1a292f7 100644
--- a/pkg/apisix/schema.go
+++ b/pkg/apisix/schema.go
@@ -63,8 +63,8 @@ func (sc schemaClient) getSchema(ctx context.Context, name
string) (*v1.Schema,
)
}
- url := sc.url + name
- content, err := sc.cluster.getSchema(ctx, url)
+ url := sc.url + "/" + name
+ content, err := sc.cluster.getSchema(ctx, url, "schema")
if err != nil {
log.Errorw("failed to get schema from APISIX",
zap.String("name", name),
diff --git a/pkg/apisix/schema_test.go b/pkg/apisix/schema_test.go
index 74eccbd..a9cb4a1 100644
--- a/pkg/apisix/schema_test.go
+++ b/pkg/apisix/schema_test.go
@@ -21,6 +21,7 @@ import (
"strings"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
)
@@ -105,10 +106,11 @@ func TestSchemaClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newSchemaClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
ctx := context.TODO()
diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go
index 257117b..390e9cc 100644
--- a/pkg/apisix/ssl.go
+++ b/pkg/apisix/ssl.go
@@ -65,7 +65,7 @@ func (s *sslClient) Get(ctx context.Context, name string)
(*v1.Ssl, error) {
// TODO Add mutex here to avoid dog-pile effection.
url := s.url + "/" + sid
- resp, err := s.cluster.getResource(ctx, url)
+ resp, err := s.cluster.getResource(ctx, url, "ssl")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("ssl not found",
@@ -108,7 +108,8 @@ func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl,
error) {
zap.String("cluster", "default"),
)
- sslItems, err := s.cluster.listResource(ctx, s.url)
+ sslItems, err := s.cluster.listResource(ctx, s.url, "ssl")
+ s.cluster.metricsCollector.IncrAPISIXRequest("ssl")
if err != nil {
log.Errorf("failed to list ssl: %s", err)
return nil, err
@@ -147,7 +148,8 @@ func (s *sslClient) Create(ctx context.Context, obj
*v1.Ssl) (*v1.Ssl, error) {
}
url := s.url + "/" + obj.ID
log.Debugw("creating ssl", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(data))
+ resp, err := s.cluster.createResource(ctx, url, "ssl",
bytes.NewReader(data))
+ s.cluster.metricsCollector.IncrAPISIXRequest("ssl")
if err != nil {
log.Errorf("failed to create ssl: %s", err)
return nil, err
@@ -174,9 +176,11 @@ func (s *sslClient) Delete(ctx context.Context, obj
*v1.Ssl) error {
return err
}
url := s.url + "/" + obj.ID
- if err := s.cluster.deleteResource(ctx, url); err != nil {
+ if err := s.cluster.deleteResource(ctx, url, "ssl"); err != nil {
+ s.cluster.metricsCollector.IncrAPISIXRequest("ssl")
return err
}
+ s.cluster.metricsCollector.IncrAPISIXRequest("ssl")
if err := s.cluster.cache.DeleteSSL(obj); err != nil {
log.Errorf("failed to reflect ssl delete to cache: %s", err)
if err != cache.ErrNotFound {
@@ -201,7 +205,8 @@ func (s *sslClient) Update(ctx context.Context, obj
*v1.Ssl) (*v1.Ssl, error) {
return nil, err
}
log.Debugw("updating ssl", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(data))
+ resp, err := s.cluster.updateResource(ctx, url, "ssl",
bytes.NewReader(data))
+ s.cluster.metricsCollector.IncrAPISIXRequest("ssl")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go
index 41602a7..c1af804 100644
--- a/pkg/apisix/ssl_test.go
+++ b/pkg/apisix/ssl_test.go
@@ -27,6 +27,7 @@ import (
"strings"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
@@ -151,10 +152,11 @@ func TestSSLClient(t *testing.T) {
close(closedCh)
cli := newSSLClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/apisix/stream_route.go b/pkg/apisix/stream_route.go
index 34ebe33..e835eab 100644
--- a/pkg/apisix/stream_route.go
+++ b/pkg/apisix/stream_route.go
@@ -67,7 +67,7 @@ func (r *streamRouteClient) Get(ctx context.Context, name
string) (*v1.StreamRou
// TODO Add mutex here to avoid dog-pile effection.
url := r.url + "/" + rid
- resp, err := r.cluster.getResource(ctx, url)
+ resp, err := r.cluster.getResource(ctx, url, "streamRoute")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("stream_route not found",
@@ -111,7 +111,8 @@ func (r *streamRouteClient) List(ctx context.Context)
([]*v1.StreamRoute, error)
zap.String("cluster", "default"),
zap.String("url", r.url),
)
- streamRouteItems, err := r.cluster.listResource(ctx, r.url)
+ streamRouteItems, err := r.cluster.listResource(ctx, r.url,
"streamRoute")
+ r.cluster.metricsCollector.IncrAPISIXRequest("streamRoute")
if err != nil {
log.Errorf("failed to list stream_routes: %s", err)
return nil, err
@@ -154,7 +155,8 @@ func (r *streamRouteClient) Create(ctx context.Context, obj
*v1.StreamRoute) (*v
url := r.url + "/" + obj.ID
log.Debugw("creating stream_route", zap.ByteString("body", data),
zap.String("url", url))
- resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
+ resp, err := r.cluster.createResource(ctx, url, "streamRoute",
bytes.NewReader(data))
+ r.cluster.metricsCollector.IncrAPISIXRequest("streamRoute")
if err != nil {
log.Errorf("failed to create stream_route: %s", err)
return nil, err
@@ -181,9 +183,11 @@ func (r *streamRouteClient) Delete(ctx context.Context,
obj *v1.StreamRoute) err
return err
}
url := r.url + "/" + obj.ID
- if err := r.cluster.deleteResource(ctx, url); err != nil {
+ if err := r.cluster.deleteResource(ctx, url, "streamRoute"); err != nil
{
+ r.cluster.metricsCollector.IncrAPISIXRequest("streamRoute")
return err
}
+ r.cluster.metricsCollector.IncrAPISIXRequest("streamRoute")
if err := r.cluster.cache.DeleteStreamRoute(obj); err != nil {
log.Errorf("failed to reflect stream_route delete to cache:
%s", err)
if err != cache.ErrNotFound {
@@ -208,7 +212,8 @@ func (r *streamRouteClient) Update(ctx context.Context, obj
*v1.StreamRoute) (*v
}
url := r.url + "/" + obj.ID
log.Debugw("updating stream_route", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ resp, err := r.cluster.updateResource(ctx, url, "streamRoute",
bytes.NewReader(body))
+ r.cluster.metricsCollector.IncrAPISIXRequest("streamRoute")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/stream_route_test.go b/pkg/apisix/stream_route_test.go
index 778131b..83326fc 100644
--- a/pkg/apisix/stream_route_test.go
+++ b/pkg/apisix/stream_route_test.go
@@ -26,6 +26,7 @@ import (
"strings"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
@@ -151,10 +152,11 @@ func TestStreamRouteClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newStreamRouteClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index 25cf18f..de0f79f 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -64,7 +64,7 @@ func (u *upstreamClient) Get(ctx context.Context, name
string) (*v1.Upstream, er
// TODO Add mutex here to avoid dog-pile effection.
url := u.url + "/" + uid
- resp, err := u.cluster.getResource(ctx, url)
+ resp, err := u.cluster.getResource(ctx, url, "upstream")
if err != nil {
if err == cache.ErrNotFound {
log.Warnw("upstream not found",
@@ -108,7 +108,8 @@ func (u *upstreamClient) List(ctx context.Context)
([]*v1.Upstream, error) {
zap.String("cluster", "default"),
)
- upsItems, err := u.cluster.listResource(ctx, u.url)
+ upsItems, err := u.cluster.listResource(ctx, u.url, "upstream")
+ u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
if err != nil {
log.Errorf("failed to list upstreams: %s", err)
return nil, err
@@ -149,7 +150,8 @@ func (u *upstreamClient) Create(ctx context.Context, obj
*v1.Upstream) (*v1.Upst
url := u.url + "/" + obj.ID
log.Debugw("creating upstream", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := u.cluster.createResource(ctx, url, bytes.NewReader(body))
+ resp, err := u.cluster.createResource(ctx, url, "upstream",
bytes.NewReader(body))
+ u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
if err != nil {
log.Errorf("failed to create upstream: %s", err)
return nil, err
@@ -177,9 +179,11 @@ func (u *upstreamClient) Delete(ctx context.Context, obj
*v1.Upstream) error {
return err
}
url := u.url + "/" + obj.ID
- if err := u.cluster.deleteResource(ctx, url); err != nil {
+ if err := u.cluster.deleteResource(ctx, url, "upstream"); err != nil {
+ u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
return err
}
+ u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
if err := u.cluster.cache.DeleteUpstream(obj); err != nil {
log.Errorf("failed to reflect upstream delete to cache: %s",
err.Error())
if err != cache.ErrNotFound {
@@ -208,7 +212,8 @@ func (u *upstreamClient) Update(ctx context.Context, obj
*v1.Upstream) (*v1.Upst
url := u.url + "/" + obj.ID
log.Debugw("updating upstream", zap.ByteString("body", body),
zap.String("url", url))
- resp, err := u.cluster.updateResource(ctx, url, bytes.NewReader(body))
+ resp, err := u.cluster.updateResource(ctx, url, "upstream",
bytes.NewReader(body))
+ u.cluster.metricsCollector.IncrAPISIXRequest("upstream")
if err != nil {
return nil, err
}
diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go
index 13ef65b..c6b75d4 100644
--- a/pkg/apisix/upstream_test.go
+++ b/pkg/apisix/upstream_test.go
@@ -26,6 +26,7 @@ import (
"strings"
"testing"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
@@ -150,10 +151,11 @@ func TestUpstreamClient(t *testing.T) {
closedCh := make(chan struct{})
close(closedCh)
cli := newUpstreamClient(&cluster{
- baseURL: u.String(),
- cli: http.DefaultClient,
- cache: &dummyCache{},
- cacheSynced: closedCh,
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ metricsCollector: metrics.NewPrometheusCollector(),
})
// Create
diff --git a/pkg/ingress/apisix_cluster_config.go
b/pkg/ingress/apisix_cluster_config.go
index ea2146c..6eea19d 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -187,13 +187,16 @@ func (c *apisixClusterConfigController) sync(ctx
context.Context, ev *types.Even
func (c *apisixClusterConfigController) handleSyncErr(obj interface{}, err
error) {
if err == nil {
c.workqueue.Forget(obj)
+
c.controller.MetricsCollector.IncrSyncOperation("clusterConfig", "success")
return
}
log.Warnw("sync ApisixClusterConfig failed, will retry",
zap.Any("object", obj),
zap.Error(err),
)
+
c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("clusterConfig",
"failure")
}
func (c *apisixClusterConfigController) onAdd(obj interface{}) {
@@ -211,6 +214,8 @@ func (c *apisixClusterConfigController) onAdd(obj
interface{}) {
Type: types.EventAdd,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("clusterConfig", "add")
}
func (c *apisixClusterConfigController) onUpdate(oldObj, newObj interface{}) {
@@ -233,6 +238,8 @@ func (c *apisixClusterConfigController) onUpdate(oldObj,
newObj interface{}) {
Type: types.EventUpdate,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("clusterConfig", "update")
}
func (c *apisixClusterConfigController) onDelete(obj interface{}) {
@@ -258,4 +265,6 @@ func (c *apisixClusterConfigController) onDelete(obj
interface{}) {
Object: key,
Tombstone: acc,
})
+
+ c.controller.MetricsCollector.IncrEvents("clusterConfig", "delete")
}
diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go
index 28ce7e3..c76270a 100644
--- a/pkg/ingress/apisix_consumer.go
+++ b/pkg/ingress/apisix_consumer.go
@@ -131,11 +131,9 @@ func (c *apisixConsumerController) sync(ctx
context.Context, ev *types.Event) er
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning,
_resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err,
metav1.ConditionFalse, ac.GetGeneration())
- c.controller.metricsCollector.IncrSyncOperation("consumer",
"failure")
return err
}
- c.controller.metricsCollector.IncrSyncOperation("consumer", "success")
c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced,
nil)
return nil
}
@@ -143,6 +141,7 @@ func (c *apisixConsumerController) sync(ctx
context.Context, ev *types.Event) er
func (c *apisixConsumerController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("consumer",
"success")
return
}
log.Warnw("sync ApisixConsumer failed, will retry",
@@ -150,6 +149,7 @@ func (c *apisixConsumerController) handleSyncErr(obj
interface{}, err error) {
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("consumer", "failure")
}
func (c *apisixConsumerController) onAdd(obj interface{}) {
@@ -169,6 +169,8 @@ func (c *apisixConsumerController) onAdd(obj interface{}) {
Type: types.EventAdd,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("consumer", "add")
}
func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
@@ -194,6 +196,8 @@ func (c *apisixConsumerController) onUpdate(oldObj, newObj
interface{}) {
Type: types.EventUpdate,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("consumer", "update")
}
func (c *apisixConsumerController) onDelete(obj interface{}) {
@@ -222,4 +226,6 @@ func (c *apisixConsumerController) onDelete(obj
interface{}) {
Object: key,
Tombstone: ac,
})
+
+ c.controller.MetricsCollector.IncrEvents("consumer", "delete")
}
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index d1af33a..bd95638 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -249,6 +249,7 @@ func (c *apisixRouteController) handleSyncErr(obj
interface{}, errOrigin error)
namespace, name, errLocal := cache.SplitMetaNamespaceKey(event.Key)
if errLocal != nil {
log.Errorf("invalid resource key: %s", event.Key)
+ c.controller.MetricsCollector.IncrSyncOperation("route",
"failure")
return
}
var ar kube.ApisixRoute
@@ -287,6 +288,7 @@ func (c *apisixRouteController) handleSyncErr(obj
interface{}, errOrigin error)
}
}
c.workqueue.Forget(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("route",
"success")
return
}
log.Warnw("sync ApisixRoute failed, will retry",
@@ -315,6 +317,7 @@ func (c *apisixRouteController) handleSyncErr(obj
interface{}, errOrigin error)
)
}
c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("route", "failure")
}
func (c *apisixRouteController) onAdd(obj interface{}) {
@@ -337,6 +340,8 @@ func (c *apisixRouteController) onAdd(obj interface{}) {
GroupVersion: ar.GroupVersion(),
},
})
+
+ c.controller.MetricsCollector.IncrEvents("route", "add")
}
func (c *apisixRouteController) onUpdate(oldObj, newObj interface{}) {
@@ -365,6 +370,8 @@ func (c *apisixRouteController) onUpdate(oldObj, newObj
interface{}) {
OldObject: prev,
},
})
+
+ c.controller.MetricsCollector.IncrEvents("route", "update")
}
func (c *apisixRouteController) onDelete(obj interface{}) {
@@ -395,4 +402,6 @@ func (c *apisixRouteController) onDelete(obj interface{}) {
},
Tombstone: ar,
})
+
+ c.controller.MetricsCollector.IncrEvents("route", "delete")
}
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index a289b31..400872a 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -172,7 +172,7 @@ func (c *apisixTlsController) syncSecretSSL(secretKey
string, apisixTlsKey strin
func (c *apisixTlsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
- c.controller.metricsCollector.IncrSyncOperation("ssl",
"success")
+ c.controller.MetricsCollector.IncrSyncOperation("TLS",
"success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
@@ -180,7 +180,7 @@ func (c *apisixTlsController) handleSyncErr(obj
interface{}, err error) {
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
- c.controller.metricsCollector.IncrSyncOperation("ssl", "failure")
+ c.controller.MetricsCollector.IncrSyncOperation("TLS", "failure")
}
func (c *apisixTlsController) onAdd(obj interface{}) {
@@ -199,6 +199,8 @@ func (c *apisixTlsController) onAdd(obj interface{}) {
Type: types.EventAdd,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("TLS", "add")
}
func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
@@ -223,6 +225,8 @@ func (c *apisixTlsController) onUpdate(prev, curr
interface{}) {
Type: types.EventUpdate,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("TLS", "update")
}
func (c *apisixTlsController) onDelete(obj interface{}) {
@@ -253,4 +257,6 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
Object: key,
Tombstone: tls,
})
+
+ c.controller.MetricsCollector.IncrEvents("TLS", "delete")
}
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index f2e6261..35853d3 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -204,6 +204,7 @@ func (c *apisixUpstreamController) sync(ctx
context.Context, ev *types.Event) er
func (c *apisixUpstreamController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("upstream",
"success")
return
}
log.Warnw("sync ApisixUpstream failed, will retry",
@@ -211,6 +212,7 @@ func (c *apisixUpstreamController) handleSyncErr(obj
interface{}, err error) {
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("upstream", "failure")
}
func (c *apisixUpstreamController) onAdd(obj interface{}) {
@@ -229,6 +231,8 @@ func (c *apisixUpstreamController) onAdd(obj interface{}) {
Type: types.EventAdd,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("upstream", "add")
}
func (c *apisixUpstreamController) onUpdate(oldObj, newObj interface{}) {
@@ -254,6 +258,8 @@ func (c *apisixUpstreamController) onUpdate(oldObj, newObj
interface{}) {
Type: types.EventUpdate,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("upstream", "update")
}
func (c *apisixUpstreamController) onDelete(obj interface{}) {
@@ -282,4 +288,6 @@ func (c *apisixUpstreamController) onDelete(obj
interface{}) {
Object: key,
Tombstone: au,
})
+
+ c.controller.MetricsCollector.IncrEvents("upstream", "delete")
}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 14b024b..fe2c812 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -77,7 +77,7 @@ type Controller struct {
podCache types.PodCache
translator translation.Translator
apiServer *api.Server
- metricsCollector metrics.Collector
+ MetricsCollector metrics.Collector
kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
@@ -179,7 +179,7 @@ func NewController(cfg *config.Config) (*Controller, error)
{
cfg: cfg,
apiServer: apiSrv,
apisix: client,
- metricsCollector: metrics.NewPrometheusCollector(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
watchingLabels: watchingLabels,
@@ -313,7 +313,7 @@ func (c *Controller) Run(stop chan struct{}) error {
<-stop
rootCancel()
}()
- c.metricsCollector.ResetLeader(false)
+ c.MetricsCollector.ResetLeader(false)
go func() {
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
@@ -353,7 +353,7 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
- c.metricsCollector.ResetLeader(false)
+ c.MetricsCollector.ResetLeader(false)
},
},
// Set it to false as current leaderelection implementation
will report
@@ -395,9 +395,10 @@ func (c *Controller) run(ctx context.Context) {
defer c.leaderContextCancelFunc()
clusterOpts := &apisix.ClusterOptions{
- Name: c.cfg.APISIX.DefaultClusterName,
- AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
- BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
+ Name: c.cfg.APISIX.DefaultClusterName,
+ AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
+ BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
+ MetricsCollector: c.MetricsCollector,
}
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
@@ -503,7 +504,7 @@ func (c *Controller) run(ctx context.Context) {
c.apisixConsumerController.run(ctx)
})
- c.metricsCollector.ResetLeader(true)
+ c.MetricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
@@ -655,6 +656,6 @@ func (c *Controller) checkClusterHealth(ctx
context.Context, cancelFunc context.
return
}
log.Debugf("success check health for default cluster")
- c.metricsCollector.IncrCheckClusterHealth(c.name)
+ c.MetricsCollector.IncrCheckClusterHealth(c.name)
}
}
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index 53889f8..bb3d129 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -90,14 +90,14 @@ func (c *endpointsController) sync(ctx context.Context, ev
*types.Event) error {
func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
- c.controller.metricsCollector.IncrSyncOperation("endpoint",
"success")
+ c.controller.MetricsCollector.IncrSyncOperation("endpoints",
"success")
return
}
log.Warnw("sync endpoints failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
- c.controller.metricsCollector.IncrSyncOperation("endpoint", "failure")
+ c.controller.MetricsCollector.IncrSyncOperation("endpoints", "failure")
}
func (c *endpointsController) onAdd(obj interface{}) {
@@ -117,6 +117,8 @@ func (c *endpointsController) onAdd(obj interface{}) {
// TODO pass key.
Object: kube.NewEndpoint(obj.(*corev1.Endpoints)),
})
+
+ c.controller.MetricsCollector.IncrEvents("endpoints", "add")
}
func (c *endpointsController) onUpdate(prev, curr interface{}) {
@@ -143,6 +145,8 @@ func (c *endpointsController) onUpdate(prev, curr
interface{}) {
// TODO pass key.
Object: kube.NewEndpoint(currEp),
})
+
+ c.controller.MetricsCollector.IncrEvents("endpoints", "update")
}
func (c *endpointsController) onDelete(obj interface{}) {
@@ -169,4 +173,6 @@ func (c *endpointsController) onDelete(obj interface{}) {
Type: types.EventDelete,
Object: kube.NewEndpoint(ep),
})
+
+ c.controller.MetricsCollector.IncrEvents("endpoints", "delete")
}
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index 691e4ed..26a7b85 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -109,14 +109,14 @@ func (c *endpointSliceController) sync(ctx
context.Context, ev *types.Event) err
func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
-
c.controller.metricsCollector.IncrSyncOperation("endpointSlices", "success")
+
c.controller.MetricsCollector.IncrSyncOperation("endpointSlice", "success")
return
}
log.Warnw("sync endpointSlice failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
- c.controller.metricsCollector.IncrSyncOperation("endpointSlices",
"failure")
+ c.controller.MetricsCollector.IncrSyncOperation("endpointSlice",
"failure")
}
func (c *endpointSliceController) onAdd(obj interface{}) {
@@ -149,6 +149,8 @@ func (c *endpointSliceController) onAdd(obj interface{}) {
ServiceName: svcName,
},
})
+
+ c.controller.MetricsCollector.IncrEvents("endpointSlice", "add")
}
func (c *endpointSliceController) onUpdate(prev, curr interface{}) {
@@ -188,6 +190,8 @@ func (c *endpointSliceController) onUpdate(prev, curr
interface{}) {
ServiceName: svcName,
},
})
+
+ c.controller.MetricsCollector.IncrEvents("endpointSlice", "update")
}
func (c *endpointSliceController) onDelete(obj interface{}) {
@@ -224,4 +228,6 @@ func (c *endpointSliceController) onDelete(obj interface{})
{
ServiceName: svcName,
},
})
+
+ c.controller.MetricsCollector.IncrEvents("endpointSlice", "delete")
}
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 4cc221f..8bbb9cd 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -186,6 +186,7 @@ func (c *ingressController) sync(ctx context.Context, ev
*types.Event) error {
func (c *ingressController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("ingress",
"success")
return
}
log.Warnw("sync ingress failed, will retry",
@@ -193,6 +194,7 @@ func (c *ingressController) handleSyncErr(obj interface{},
err error) {
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
+ c.controller.MetricsCollector.IncrSyncOperation("ingress", "failure")
}
func (c *ingressController) onAdd(obj interface{}) {
@@ -225,6 +227,8 @@ func (c *ingressController) onAdd(obj interface{}) {
GroupVersion: ing.GroupVersion(),
},
})
+
+ c.controller.MetricsCollector.IncrEvents("ingress", "add")
}
func (c *ingressController) onUpdate(oldObj, newObj interface{}) {
@@ -261,6 +265,8 @@ func (c *ingressController) onUpdate(oldObj, newObj
interface{}) {
OldObject: prev,
},
})
+
+ c.controller.MetricsCollector.IncrEvents("ingress", "update")
}
func (c *ingressController) OnDelete(obj interface{}) {
@@ -300,6 +306,8 @@ func (c *ingressController) OnDelete(obj interface{}) {
},
Tombstone: ing,
})
+
+ c.controller.MetricsCollector.IncrEvents("ingress", "delete")
}
func (c *ingressController) isIngressEffective(ing kube.Ingress) bool {
diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go
index efaf881..be961a8 100644
--- a/pkg/ingress/pod.go
+++ b/pkg/ingress/pod.go
@@ -80,6 +80,8 @@ func (c *podController) onAdd(obj interface{}) {
)
}
}
+
+ c.controller.MetricsCollector.IncrEvents("pod", "add")
}
func (c *podController) onUpdate(_, cur interface{}) {
@@ -108,6 +110,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
)
}
}
+
+ c.controller.MetricsCollector.IncrEvents("pod", "update")
}
func (c *podController) onDelete(obj interface{}) {
@@ -133,4 +137,6 @@ func (c *podController) onDelete(obj interface{}) {
zap.Any("pod", pod),
)
}
+
+ c.controller.MetricsCollector.IncrEvents("pod", "delete")
}
diff --git a/pkg/ingress/pod_test.go b/pkg/ingress/pod_test.go
index 7069783..a0d8549 100644
--- a/pkg/ingress/pod_test.go
+++ b/pkg/ingress/pod_test.go
@@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
@@ -33,6 +34,7 @@ func TestPodOnAdd(t *testing.T) {
controller: &Controller{
watchingNamespace: watchingNamespace,
podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
@@ -74,6 +76,7 @@ func TestPodOnDelete(t *testing.T) {
controller: &Controller{
watchingNamespace: watchingNamespace,
podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
@@ -118,6 +121,7 @@ func TestPodOnUpdate(t *testing.T) {
controller: &Controller{
watchingNamespace: watchingNamespace,
podCache: types.NewPodCache(),
+ MetricsCollector: metrics.NewPrometheusCollector(),
},
}
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 839fba3..6b6bb3e 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -218,7 +218,7 @@ func (c *secretController) sync(ctx context.Context, ev
*types.Event) error {
func (c *secretController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
- c.controller.metricsCollector.IncrSyncOperation("secret",
"success")
+ c.controller.MetricsCollector.IncrSyncOperation("secret",
"success")
return
}
log.Warnw("sync ApisixTls failed, will retry",
@@ -226,7 +226,7 @@ func (c *secretController) handleSyncErr(obj interface{},
err error) {
zap.Error(err),
)
c.workqueue.AddRateLimited(obj)
- c.controller.metricsCollector.IncrSyncOperation("secret", "failure")
+ c.controller.MetricsCollector.IncrSyncOperation("secret", "failure")
}
func (c *secretController) onAdd(obj interface{}) {
@@ -246,6 +246,8 @@ func (c *secretController) onAdd(obj interface{}) {
Type: types.EventAdd,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("secret", "add")
}
func (c *secretController) onUpdate(prev, curr interface{}) {
@@ -271,6 +273,8 @@ func (c *secretController) onUpdate(prev, curr interface{})
{
Type: types.EventUpdate,
Object: key,
})
+
+ c.controller.MetricsCollector.IncrEvents("secret", "update")
}
func (c *secretController) onDelete(obj interface{}) {
@@ -303,4 +307,6 @@ func (c *secretController) onDelete(obj interface{}) {
Object: key,
Tombstone: sec,
})
+
+ c.controller.MetricsCollector.IncrEvents("secret", "delete")
}
diff --git a/pkg/metrics/prometheus.go b/pkg/metrics/prometheus.go
index 56aa1f1..f2a4d27 100644
--- a/pkg/metrics/prometheus.go
+++ b/pkg/metrics/prometheus.go
@@ -35,7 +35,7 @@ type Collector interface {
RecordAPISIXCode(int, string)
// RecordAPISIXLatency records the latency for a round trip from
ingress apisix
// to apisix.
- RecordAPISIXLatency(time.Duration)
+ RecordAPISIXLatency(time.Duration, string)
// IncrAPISIXRequest increases the number of requests to apisix.
IncrAPISIXRequest(string)
// IncrCheckClusterHealth increases the number of cluster health check
operations
@@ -44,19 +44,27 @@ type Collector interface {
// IncrSyncOperation increases the number of sync operations with the
resource
// type label.
IncrSyncOperation(string, string)
+ // IncrCacheSyncOperation increases the number of cache sync operations
with the
+ // resource type label.
+ IncrCacheSyncOperation(string)
+ // IncrEvents increases the number of events handled by controllers
with the
+ // operation label.
+ IncrEvents(string, string)
}
// collector contains necessary messages to collect Prometheus metrics.
type collector struct {
isLeader prometheus.Gauge
- apisixLatency prometheus.Summary
+ apisixLatency *prometheus.SummaryVec
apisixRequests *prometheus.CounterVec
apisixCodes *prometheus.GaugeVec
checkClusterHealth *prometheus.CounterVec
syncOperation *prometheus.CounterVec
+ cacheSyncOperation *prometheus.CounterVec
+ controllerEvents *prometheus.CounterVec
}
-// NewPrometheusCollectors creates the Prometheus metrics collector.
+// NewPrometheusCollector creates the Prometheus metrics collector.
// It also registers all internal metric collector to prometheus,
// so do not call this function duplicately.
func NewPrometheusCollector() Collector {
@@ -83,18 +91,19 @@ func NewPrometheusCollector() Collector {
prometheus.GaugeOpts{
Name: "apisix_bad_status_codes",
Namespace: _namespace,
- Help: "Whether the role of controller
instance is leader",
+ Help: "Status codes of requests to
APISIX",
ConstLabels: constLabels,
},
[]string{"resource", "status_code"},
),
- apisixLatency: prometheus.NewSummary(
+ apisixLatency: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: _namespace,
Name: "apisix_request_latencies",
Help: "Request latencies with APISIX",
ConstLabels: constLabels,
},
+ []string{"operation"},
),
apisixRequests: prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -123,6 +132,24 @@ func NewPrometheusCollector() Collector {
},
[]string{"resource", "result"},
),
+ cacheSyncOperation: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: _namespace,
+ Name: "cache_sync_total",
+ Help: "Number of cache sync operations",
+ ConstLabels: constLabels,
+ },
+ []string{"result"},
+ ),
+ controllerEvents: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: _namespace,
+ Name: "events_total",
+ Help: "Number of events handled by the
controller",
+ ConstLabels: constLabels,
+ },
+ []string{"operation", "resource"},
+ ),
}
// Since we use the DefaultRegisterer, in test cases, the metrics
@@ -133,6 +160,8 @@ func NewPrometheusCollector() Collector {
prometheus.Unregister(collector.apisixRequests)
prometheus.Unregister(collector.checkClusterHealth)
prometheus.Unregister(collector.syncOperation)
+ prometheus.Unregister(collector.cacheSyncOperation)
+ prometheus.Unregister(collector.controllerEvents)
prometheus.MustRegister(
collector.isLeader,
@@ -141,6 +170,8 @@ func NewPrometheusCollector() Collector {
collector.apisixRequests,
collector.checkClusterHealth,
collector.syncOperation,
+ collector.cacheSyncOperation,
+ collector.controllerEvents,
)
return collector
@@ -166,8 +197,8 @@ func (c *collector) RecordAPISIXCode(code int, resource
string) {
// RecordAPISIXLatency records the latency for a complete round trip
// from controller to APISIX.
-func (c *collector) RecordAPISIXLatency(latency time.Duration) {
- c.apisixLatency.Observe(float64(latency.Nanoseconds()))
+func (c *collector) RecordAPISIXLatency(latency time.Duration, resource
string) {
+
c.apisixLatency.WithLabelValues(resource).Observe(float64(latency.Nanoseconds()))
}
// IncrAPISIXRequest increases the number of requests for specific
@@ -191,15 +222,31 @@ func (c *collector) IncrSyncOperation(resource, result
string) {
}).Inc()
}
+// IncrCacheSyncOperation increases the number of cache sync operations for
+// cluster.
+func (c *collector) IncrCacheSyncOperation(result string) {
+ c.cacheSyncOperation.WithLabelValues(result).Inc()
+}
+
+// IncrEvents increases the number of events handled by controllers for
+// specific operation.
+func (c *collector) IncrEvents(resource, operation string) {
+ c.controllerEvents.With(prometheus.Labels{
+ "operation": operation,
+ "resource": resource,
+ }).Inc()
+}
+
// Collect collects the prometheus.Collect.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.isLeader.Collect(ch)
c.apisixLatency.Collect(ch)
c.apisixRequests.Collect(ch)
- c.apisixLatency.Collect(ch)
c.apisixCodes.Collect(ch)
c.checkClusterHealth.Collect(ch)
c.syncOperation.Collect(ch)
+ c.cacheSyncOperation.Collect(ch)
+ c.controllerEvents.Collect(ch)
}
// Describe describes the prometheus.Describe.
@@ -207,8 +254,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.isLeader.Describe(ch)
c.apisixLatency.Describe(ch)
c.apisixRequests.Describe(ch)
- c.apisixLatency.Describe(ch)
c.apisixCodes.Describe(ch)
c.checkClusterHealth.Describe(ch)
c.syncOperation.Describe(ch)
+ c.cacheSyncOperation.Describe(ch)
+ c.controllerEvents.Describe(ch)
}
diff --git a/pkg/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go
index e62cdd9..09d9d7f 100644
--- a/pkg/metrics/prometheus_test.go
+++ b/pkg/metrics/prometheus_test.go
@@ -83,6 +83,8 @@ func apisixLatencyTestHandler(t *testing.T, metrics
[]*io_prometheus_client.Metr
assert.Equal(t, *m[0].Label[0].Value, "default")
assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
assert.Equal(t, *m[0].Label[1].Value, "")
+ assert.Equal(t, *m[0].Label[2].Name, "operation")
+ assert.Equal(t, *m[0].Label[2].Value, "create")
}
}
@@ -160,18 +162,58 @@ func syncOperationTestHandler(t *testing.T, metrics
[]*io_prometheus_client.Metr
}
}
+func cacheSncOperationTestHandler(t *testing.T, metrics
[]*io_prometheus_client.MetricFamily) func(t *testing.T) {
+ return func(t *testing.T) {
+ metric :=
findMetric("apisix_ingress_controller_cache_sync_total", metrics)
+ assert.NotNil(t, metric)
+ assert.Equal(t, metric.Type.String(), "COUNTER")
+ m := metric.GetMetric()
+ assert.Len(t, m, 1)
+
+ assert.Equal(t, *m[0].Counter.Value, float64(1))
+ assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+ assert.Equal(t, *m[0].Label[0].Value, "default")
+ assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+ assert.Equal(t, *m[0].Label[1].Value, "")
+ assert.Equal(t, *m[0].Label[2].Name, "result")
+ assert.Equal(t, *m[0].Label[2].Value, "failure")
+ }
+}
+
+func controllerEventsTestHandler(t *testing.T, metrics
[]*io_prometheus_client.MetricFamily) func(t *testing.T) {
+ return func(t *testing.T) {
+ metric := findMetric("apisix_ingress_controller_events_total",
metrics)
+ assert.NotNil(t, metric)
+ assert.Equal(t, metric.Type.String(), "COUNTER")
+ m := metric.GetMetric()
+ assert.Len(t, m, 1)
+
+ assert.Equal(t, *m[0].Counter.Value, float64(1))
+ assert.Equal(t, *m[0].Label[0].Name, "controller_namespace")
+ assert.Equal(t, *m[0].Label[0].Value, "default")
+ assert.Equal(t, *m[0].Label[1].Name, "controller_pod")
+ assert.Equal(t, *m[0].Label[1].Value, "")
+ assert.Equal(t, *m[0].Label[2].Name, "operation")
+ assert.Equal(t, *m[0].Label[2].Value, "add")
+ assert.Equal(t, *m[0].Label[3].Name, "resource")
+ assert.Equal(t, *m[0].Label[3].Value, "pod")
+ }
+}
+
func TestPrometheusCollector(t *testing.T) {
c := NewPrometheusCollector()
c.ResetLeader(true)
c.RecordAPISIXCode(404, "route")
c.RecordAPISIXCode(500, "upstream")
- c.RecordAPISIXLatency(500 * time.Millisecond)
+ c.RecordAPISIXLatency(500*time.Millisecond, "create")
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("route")
c.IncrAPISIXRequest("upstream")
c.IncrCheckClusterHealth("test")
c.IncrSyncOperation("schema", "failure")
c.IncrSyncOperation("endpoint", "success")
+ c.IncrCacheSyncOperation("failure")
+ c.IncrEvents("pod", "add")
metrics, err := prometheus.DefaultGatherer.Gather()
assert.Nil(t, err)
@@ -182,6 +224,8 @@ func TestPrometheusCollector(t *testing.T) {
t.Run("apisix_requests", apisixRequestTestHandler(t, metrics))
t.Run("check_cluster_health_total", checkClusterHealthTestHandler(t,
metrics))
t.Run("sync_operation_total", syncOperationTestHandler(t, metrics))
+ t.Run("cache_sync_total", cacheSncOperationTestHandler(t, metrics))
+ t.Run("events_total", controllerEventsTestHandler(t, metrics))
}
func findMetric(name string, metrics []*io_prometheus_client.MetricFamily)
*io_prometheus_client.MetricFamily {
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 0d0bd92..8dbb70c 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/apache/apisix-ingress-controller/pkg/metrics"
"io/ioutil"
"net/http"
"net/url"
@@ -275,8 +276,9 @@ func (s *Scaffold) ListApisixUpstreams() ([]*v1.Upstream,
error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err
@@ -296,8 +298,9 @@ func (s *Scaffold) ListApisixGlobalRules()
([]*v1.GlobalRule, error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err
@@ -317,8 +320,9 @@ func (s *Scaffold) ListApisixRoutes() ([]*v1.Route, error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err
@@ -338,8 +342,9 @@ func (s *Scaffold) ListApisixConsumers() ([]*v1.Consumer,
error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err
@@ -359,8 +364,9 @@ func (s *Scaffold) ListApisixStreamRoutes()
([]*v1.StreamRoute, error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err
@@ -380,8 +386,9 @@ func (s *Scaffold) ListApisixSsl() ([]*v1.Ssl, error) {
return nil, err
}
err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
- BaseURL: u.String(),
- AdminKey: s.opts.APISIXAdminAPIKey,
+ BaseURL: u.String(),
+ AdminKey: s.opts.APISIXAdminAPIKey,
+ MetricsCollector: metrics.NewPrometheusCollector(),
})
if err != nil {
return nil, err