This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new a3279e4  chore: optimize the apisix cluster processing (#414)
a3279e4 is described below

commit a3279e4028643799d0d3e5b0e3298f5366435f51
Author: Alex Zhang <[email protected]>
AuthorDate: Thu May 6 18:14:32 2021 +0800

    chore: optimize the apisix cluster processing (#414)
    
    * chore: optimize the apisix cluster processing
    
    * fix
---
 cmd/ingress/ingress.go         |  7 +++++--
 conf/config-default.yaml       | 15 +++++++++++++--
 pkg/apisix/apisix.go           | 26 ++++++++++++++++++++++++++
 pkg/config/config.go           | 26 ++++++++++++++++++++++++--
 pkg/config/config_test.go      | 14 ++++++++------
 pkg/ingress/apisix_upstream.go |  6 ++++--
 pkg/ingress/controller.go      | 15 ++++++++-------
 pkg/ingress/manifest.go        | 14 ++++++++------
 8 files changed, 96 insertions(+), 27 deletions(-)

diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index f90b50f..df931ef 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,8 +147,11 @@ the apisix cluster and others are created`,
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, 
"election-id", config.IngressAPISIXLeader, "election id used for campaign the 
controller leader")
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, 
"ingress-version", config.IngressNetworkingV1, "the supported ingress api group 
version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes 
version v1.19.0 or higher) and \"extensions/v1beta1\"")
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, 
"apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute 
api group version, can be \"apisix.apache.org/v1\" or 
\"apisix.apache.org/v2alpha1\"")
-       cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", 
"", "the base URL for APISIX admin api / manager api")
-       cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, 
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin 
api / manager api")
+       cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", 
"", "the base URL for APISIX admin api / manager api (deprecated, using 
--default-apisix-cluster-base-url instead)")
+       cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, 
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin 
api / manager api (deprecated, using --default-apisix-cluster-admin-key 
instead)")
+       cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, 
"default-apisix-cluster-base-url", "", "the base URL of admin api / manager api 
for the default APISIX cluster")
+       cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, 
"default-apisix-cluster-admin-key", "", "admin key used for the authorization 
of admin api / manager api for the default APISIX cluster")
+       cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, 
"default-apisix-cluster-name", "default", "name of the default apisix cluster")
 
        return cmd
 }
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index e255b18..78c4f5d 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -43,7 +43,7 @@ kubernetes:
   election_id: "ingress-apisix-leader" # the election id for the controller 
leader campaign,
                                        # only the leader will watch and 
delivery resource changes,
                                        # other instances (as candidates) stand 
by.
-  ingress_class: "apisix"              # The class of an Ingress object is set 
using the field
+  ingress_class: "apisix"              # the class of an Ingress object is set 
using the field
                                        # IngressClassName in Kubernetes 
clusters version v1.18.0
                                        # or higher or the annotation 
"kubernetes.io/ingress.class"
                                        # (deprecated).
@@ -56,5 +56,16 @@ kubernetes:
                                                      # default is 
"apisix.apache.org/v2alpha1".
 # APISIX related configurations.
 apisix:
-  base_url: "http://127.0.0.1:9080/apisix/admin"; # the APISIX admin api / 
manager api
+  base_url: "http://127.0.0.1:9080/apisix/admin"; # (Deprecated, use 
default_cluster_base_url) the APISIX admin api / manager api
                                                  # base url, it's required.
+
+  default_cluster_base_url: "http://127.0.0.1:9080/apisix/admin"; # The base 
url of admin api / manager api
+                                                                 # of the 
default APISIX cluster
+
+  admin_key: "" # (Deprecated, use default_cluster_admin_key) the admin key 
used for the authentication of
+                # admin api / manager api in the default APISIX cluster, by 
default this field is unset.
+
+  default_cluster_admin_key: "" # the admin key used for the authentication of 
admin api / manager api in the
+                                # default APISIX cluster, by default this 
field is unset.
+
+  default_cluster_name: "default" # name of the default APISIX cluster.
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index e407b7e..e6c346c 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -16,6 +16,7 @@ package apisix
 
 import (
        "context"
+       "sync"
 
        v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
@@ -26,6 +27,8 @@ type APISIX interface {
        Cluster(string) Cluster
        // AddCluster adds a new cluster.
        AddCluster(*ClusterOptions) error
+       // UpdateCluster updates an existing cluster.
+       UpdateCluster(*ClusterOptions) error
        // ListClusters lists all APISIX clusters.
        ListClusters() []Cluster
 }
@@ -98,6 +101,7 @@ type GlobalRule interface {
 }
 
 type apisix struct {
+       mu                 sync.RWMutex
        nonExistentCluster Cluster
        clusters           map[string]Cluster
 }
@@ -112,6 +116,8 @@ func NewClient() (APISIX, error) {
 
 // Cluster implements APISIX.Cluster method.
 func (c *apisix) Cluster(name string) Cluster {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
        cluster, ok := c.clusters[name]
        if !ok {
                return c.nonExistentCluster
@@ -121,6 +127,8 @@ func (c *apisix) Cluster(name string) Cluster {
 
 // ListClusters implements APISIX.ListClusters method.
 func (c *apisix) ListClusters() []Cluster {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
        clusters := make([]Cluster, 0, len(c.clusters))
        for _, cluster := range c.clusters {
                clusters = append(clusters, cluster)
@@ -130,6 +138,8 @@ func (c *apisix) ListClusters() []Cluster {
 
 // AddCluster implements APISIX.AddCluster method.
 func (c *apisix) AddCluster(co *ClusterOptions) error {
+       c.mu.Lock()
+       defer c.mu.Unlock()
        _, ok := c.clusters[co.Name]
        if ok {
                return ErrDuplicatedCluster
@@ -144,3 +154,19 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
        c.clusters[co.Name] = cluster
        return nil
 }
+
+func (c *apisix) UpdateCluster(co *ClusterOptions) error {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if _, ok := c.clusters[co.Name]; !ok {
+               return ErrClusterNotExist
+       }
+
+       cluster, err := newCluster(co)
+       if err != nil {
+               return err
+       }
+
+       c.clusters[co.Name] = cluster
+       return nil
+}
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 72043be..aa7f681 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -78,8 +78,20 @@ type KubernetesConfig struct {
 
 // APISIXConfig contains all APISIX related config items.
 type APISIXConfig struct {
-       BaseURL string `json:"base_url" yaml:"base_url"`
+       // DefaultClusterName is the name of default cluster.
+       DefaultClusterName string `json:"default_cluster_name"`
+       // DefaultClusterBaseURL is the base url configuration for the default 
cluster.
+       DefaultClusterBaseURL string `json:"default_cluster_base_url" 
yaml:"default_cluster_base_url"`
+       // DefaultClusterAdminKey is the admin key for the default cluster.
        // TODO: Obsolete the plain way to specify admin_key, which is insecure.
+       DefaultClusterAdminKey string `json:"default_cluster_admin_key" 
yaml:"default_cluster_admin_key"`
+       // BaseURL is same to DefaultClusterBaseURL.
+       // Deprecated: use DefaultClusterBaseURL instead. BaseURL will be 
removed
+       // once v1.0.0 is released.
+       BaseURL string `json:"base_url" yaml:"base_url"`
+       // AdminKey is same to DefaultClusterAdminKey.
+       // Deprecated: use DefaultClusterAdminKey instead. AdminKey will be 
removed
+       //      // once v1.0.0 is released.
        AdminKey string `json:"admin_key" yaml:"admin_key"`
 }
 
@@ -130,7 +142,17 @@ func (cfg *Config) Validate() error {
        if cfg.Kubernetes.ResyncInterval.Duration < _minimalResyncInterval {
                return errors.New("controller resync interval too small")
        }
-       if cfg.APISIX.BaseURL == "" {
+       if cfg.APISIX.DefaultClusterAdminKey == "" {
+               cfg.APISIX.DefaultClusterAdminKey = cfg.APISIX.AdminKey
+       }
+       if cfg.APISIX.DefaultClusterBaseURL == "" {
+               cfg.APISIX.DefaultClusterBaseURL = cfg.APISIX.BaseURL
+       }
+       if cfg.APISIX.DefaultClusterName == "" {
+               cfg.APISIX.DefaultClusterName = "default"
+       }
+
+       if cfg.APISIX.DefaultClusterBaseURL == "" {
                return errors.New("apisix base url is required")
        }
        switch cfg.Kubernetes.IngressVersion {
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 6c814f7..93be369 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -42,8 +42,9 @@ func TestNewConfigFromFile(t *testing.T) {
                        ApisixRouteVersion: ApisixRouteV2alpha1,
                },
                APISIX: APISIXConfig{
-                       BaseURL:  "http://127.0.0.1:8080/apisix";,
-                       AdminKey: "123456",
+                       DefaultClusterName:     "default",
+                       DefaultClusterBaseURL:  "http://127.0.0.1:8080/apisix";,
+                       DefaultClusterAdminKey: "123456",
                },
        }
 
@@ -80,8 +81,8 @@ kubernetes:
   ingress_class: apisix
   ingress_version: networking/v1
 apisix:
-  base_url: http://127.0.0.1:8080/apisix
-  admin_key: "123456"
+  default_cluster_base_url: http://127.0.0.1:8080/apisix
+  default_cluster_admin_key: "123456"
 `
        tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
        assert.Nil(t, err, "failed to create temporary yaml configuration file: 
", err)
@@ -101,7 +102,7 @@ apisix:
 func TestConfigDefaultValue(t *testing.T) {
        yamlData := `
 apisix:
-  base_url: http://127.0.0.1:8080/apisix
+  default_cluster_base_url: http://127.0.0.1:8080/apisix
 `
        tmpYAML, err := ioutil.TempFile("/tmp", "config-*.yaml")
        assert.Nil(t, err, "failed to create temporary yaml configuration file: 
", err)
@@ -116,7 +117,8 @@ apisix:
        assert.Nil(t, newCfg.Validate(), "failed to validate config")
 
        defaultCfg := NewDefaultConfig()
-       defaultCfg.APISIX.BaseURL = "http://127.0.0.1:8080/apisix";
+       defaultCfg.APISIX.DefaultClusterBaseURL = "http://127.0.0.1:8080/apisix";
+       defaultCfg.APISIX.DefaultClusterName = "default"
 
        assert.Equal(t, defaultCfg, newCfg, "bad configuration")
 }
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index 634b60d..9c9d8eb 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -128,10 +128,11 @@ func (c *apisixUpstreamController) sync(ctx 
context.Context, ev *types.Event) er
                return err
        }
 
+       clusterName := c.controller.cfg.APISIX.DefaultClusterName
        for _, port := range svc.Spec.Ports {
                upsName := apisixv1.ComposeUpstreamName(namespace, name, 
port.Port)
                // TODO: multiple cluster
-               ups, err := c.controller.apisix.Cluster("").Upstream().Get(ctx, 
upsName)
+               ups, err := 
c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName)
                if err != nil {
                        if err == apisixcache.ErrNotFound {
                                continue
@@ -169,11 +170,12 @@ func (c *apisixUpstreamController) sync(ctx 
context.Context, ev *types.Event) er
                        zap.Any("upstream", newUps),
                        zap.Any("ApisixUpstream", au),
                )
-               if _, err := 
c.controller.apisix.Cluster("").Upstream().Update(ctx, newUps); err != nil {
+               if _, err := 
c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != 
nil {
                        log.Errorw("failed to update upstream",
                                zap.Error(err),
                                zap.Any("upstream", newUps),
                                zap.Any("ApisixUpstream", au),
+                               zap.String("cluster", clusterName),
                        )
                        c.controller.recorderEvent(au, corev1.EventTypeWarning, 
_resourceSyncAborted, err)
                        recordStatus(au, _resourceSyncAborted, err, 
metav1.ConditionFalse)
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 89876e2..7b2c9f9 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -318,9 +318,9 @@ func (c *Controller) run(ctx context.Context) {
        c.metricsCollector.ResetLeader(true)
 
        err := c.apisix.AddCluster(&apisix.ClusterOptions{
-               Name:     "",
-               AdminKey: c.cfg.APISIX.AdminKey,
-               BaseURL:  c.cfg.APISIX.BaseURL,
+               Name:     c.cfg.APISIX.DefaultClusterName,
+               AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
+               BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
        })
        if err != nil && err != apisix.ErrDuplicatedCluster {
                // TODO give up the leader role.
@@ -328,7 +328,7 @@ func (c *Controller) run(ctx context.Context) {
                return
        }
 
-       if err := c.apisix.Cluster("").HasSynced(ctx); err != nil {
+       if err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
                // TODO give up the leader role.
                log.Errorf("failed to wait the default cluster to be ready: 
%s", err)
                return
@@ -400,12 +400,13 @@ func (c *Controller) syncSSL(ctx context.Context, ssl 
*apisixv1.Ssl, event types
        var (
                err error
        )
+       clusterName := c.cfg.APISIX.DefaultClusterName
        if event == types.EventDelete {
-               err = c.apisix.Cluster("").SSL().Delete(ctx, ssl)
+               err = c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl)
        } else if event == types.EventUpdate {
-               _, err = c.apisix.Cluster("").SSL().Update(ctx, ssl)
+               _, err = c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl)
        } else {
-               _, err = c.apisix.Cluster("").SSL().Create(ctx, ssl)
+               _, err = c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl)
        }
        return err
 }
diff --git a/pkg/ingress/manifest.go b/pkg/ingress/manifest.go
index 25901d1..8e3034c 100644
--- a/pkg/ingress/manifest.go
+++ b/pkg/ingress/manifest.go
@@ -111,14 +111,16 @@ func (m *manifest) diff(om *manifest) (added, updated, 
deleted *manifest) {
 
 func (c *Controller) syncManifests(ctx context.Context, added, updated, 
deleted *manifest) error {
        var merr *multierror.Error
+
+       clusterName := c.cfg.APISIX.DefaultClusterName
        if deleted != nil {
                for _, r := range deleted.routes {
-                       if err := c.apisix.Cluster("").Route().Delete(ctx, r); 
err != nil {
+                       if err := 
c.apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }
                for _, u := range deleted.upstreams {
-                       if err := c.apisix.Cluster("").Upstream().Delete(ctx, 
u); err != nil {
+                       if err := 
c.apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }
@@ -126,24 +128,24 @@ func (c *Controller) syncManifests(ctx context.Context, 
added, updated, deleted
        if added != nil {
                // Should create upstreams firstly due to the dependencies.
                for _, u := range added.upstreams {
-                       if _, err := 
c.apisix.Cluster("").Upstream().Create(ctx, u); err != nil {
+                       if _, err := 
c.apisix.Cluster(clusterName).Upstream().Create(ctx, u); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }
                for _, r := range added.routes {
-                       if _, err := c.apisix.Cluster("").Route().Create(ctx, 
r); err != nil {
+                       if _, err := 
c.apisix.Cluster(clusterName).Route().Create(ctx, r); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }
        }
        if updated != nil {
                for _, r := range updated.upstreams {
-                       if _, err := 
c.apisix.Cluster("").Upstream().Update(ctx, r); err != nil {
+                       if _, err := 
c.apisix.Cluster(clusterName).Upstream().Update(ctx, r); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }
                for _, r := range updated.routes {
-                       if _, err := c.apisix.Cluster("").Route().Update(ctx, 
r); err != nil {
+                       if _, err := 
c.apisix.Cluster(clusterName).Route().Update(ctx, r); err != nil {
                                merr = multierror.Append(merr, err)
                        }
                }

Reply via email to