This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new e5441a3 feat: implement schema API (#601)
e5441a3 is described below
commit e5441a3d0877017f17e96ac44d2a804a509676e7
Author: Hoshea Jiang <[email protected]>
AuthorDate: Thu Aug 5 20:41:27 2021 +0800
feat: implement schema API (#601)
---
.golangci.yml | 8 ++
pkg/apisix/apisix.go | 29 ++++--
pkg/apisix/cache/cache.go | 8 ++
pkg/apisix/cache/memdb.go | 28 ++++++
pkg/apisix/cache/memdb_test.go | 44 +++++++++
pkg/apisix/cache/schema.go | 10 ++
pkg/apisix/cluster.go | 168 +++++++++++++++++++++++++++++---
pkg/apisix/cluster_test.go | 11 ++-
pkg/apisix/nonexistentclient.go | 28 ++++++
pkg/apisix/plugin.go | 51 ++++++++++
pkg/apisix/plugin_test.go | 105 ++++++++++++++++++++
pkg/apisix/resource_test.go | 1 +
pkg/apisix/schema.go | 92 +++++++++++++++++
pkg/apisix/schema_test.go | 125 ++++++++++++++++++++++++
pkg/apisix/ssl_test.go | 1 +
pkg/apisix/stream_route_test.go | 1 +
pkg/ingress/apisix_cluster_config.go | 2 +-
pkg/ingress/controller.go | 8 +-
pkg/kube/translation/ingress_test.go | 7 +-
pkg/kube/translation/plugin_test.go | 2 +-
pkg/kube/translation/translator_test.go | 7 +-
pkg/types/apisix/v1/types.go | 20 ++++
test/e2e/scaffold/k8s.go | 12 +--
23 files changed, 724 insertions(+), 44 deletions(-)
diff --git a/.golangci.yml b/.golangci.yml
index 7422131..b1c29ba 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -23,3 +23,11 @@ linters-settings:
govet:
disable:
- unsafeptr
+ goimports:
+ local-prefixes: github.com/apache/apisix-ingress-controller
+
+linters:
+ enable:
+ - goimports
+ - govet
+ - gofmt
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index 55733de..84f152d 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -12,6 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+
package apisix
import (
@@ -26,9 +27,9 @@ type APISIX interface {
// Cluster specifies the target cluster to talk.
Cluster(string) Cluster
// AddCluster adds a new cluster.
- AddCluster(*ClusterOptions) error
+ AddCluster(context.Context, *ClusterOptions) error
// UpdateCluster updates an existing cluster.
- UpdateCluster(*ClusterOptions) error
+ UpdateCluster(context.Context, *ClusterOptions) error
// ListClusters lists all APISIX clusters.
ListClusters() []Cluster
}
@@ -54,6 +55,10 @@ type Cluster interface {
Consumer() Consumer
// HealthCheck checks apisix cluster health in realtime.
HealthCheck(context.Context) error
+ // Plugin returns a Plugin interface that can operate Plugin resources.
+ Plugin() Plugin
+ // Schema returns a Schema interface that can fetch schema of APISIX
objects.
+ Schema() Schema
}
// Route is the specific client interface to take over the create, update,
@@ -106,7 +111,7 @@ type GlobalRule interface {
Update(context.Context, *v1.GlobalRule) (*v1.GlobalRule, error)
}
-// Consumer it the specific client interface to take over the create, update,
+// Consumer is the specific client interface to take over the create, update,
// list and delete for APISIX Consumer resource.
type Consumer interface {
Get(context.Context, string) (*v1.Consumer, error)
@@ -116,6 +121,16 @@ type Consumer interface {
Update(context.Context, *v1.Consumer) (*v1.Consumer, error)
}
+// Plugin is the specific client interface to fetch APISIX Plugin resource.
+type Plugin interface {
+ List(context.Context) ([]string, error)
+}
+
+// Schema is the specific client interface to fetch the schema of APISIX
objects.
+type Schema interface {
+ GetPluginSchema(context.Context, string) (*v1.Schema, error)
+}
+
type apisix struct {
mu sync.RWMutex
nonExistentCluster Cluster
@@ -154,14 +169,14 @@ func (c *apisix) ListClusters() []Cluster {
}
// AddCluster implements APISIX.AddCluster method.
-func (c *apisix) AddCluster(co *ClusterOptions) error {
+func (c *apisix) AddCluster(ctx context.Context, co *ClusterOptions) error {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.clusters[co.Name]
if ok {
return ErrDuplicatedCluster
}
- cluster, err := newCluster(co)
+ cluster, err := newCluster(ctx, co)
if err != nil {
return err
}
@@ -169,14 +184,14 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
return nil
}
-func (c *apisix) UpdateCluster(co *ClusterOptions) error {
+func (c *apisix) UpdateCluster(ctx context.Context, co *ClusterOptions) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.clusters[co.Name]; !ok {
return ErrClusterNotExist
}
- cluster, err := newCluster(co)
+ cluster, err := newCluster(ctx, co)
if err != nil {
return err
}
diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go
index 48fc93b..e14857f 100644
--- a/pkg/apisix/cache/cache.go
+++ b/pkg/apisix/cache/cache.go
@@ -35,6 +35,8 @@ type Cache interface {
InsertGlobalRule(*v1.GlobalRule) error
// InsertConsumer adds or updates consumer to cache.
InsertConsumer(*v1.Consumer) error
+ // InsertSchema adds or updates schema to cache.
+ InsertSchema(*v1.Schema) error
// GetRoute finds the route from cache according to the primary index
(id).
GetRoute(string) (*v1.Route, error)
@@ -48,6 +50,8 @@ type Cache interface {
GetGlobalRule(string) (*v1.GlobalRule, error)
// GetConsumer finds the consumer from cache according to the primary
index (id).
GetConsumer(string) (*v1.Consumer, error)
+ // GetSchema finds the scheme from cache according to the primary index
(id).
+ GetSchema(string) (*v1.Schema, error)
// ListRoutes lists all routes in cache.
ListRoutes() ([]*v1.Route, error)
@@ -61,6 +65,8 @@ type Cache interface {
ListGlobalRules() ([]*v1.GlobalRule, error)
// ListConsumers lists all consumer objects in cache.
ListConsumers() ([]*v1.Consumer, error)
+ // ListSchema lists all schema in cache.
+ ListSchema() ([]*v1.Schema, error)
// DeleteRoute deletes the specified route in cache.
DeleteRoute(*v1.Route) error
@@ -74,4 +80,6 @@ type Cache interface {
DeleteGlobalRule(*v1.GlobalRule) error
// DeleteConsumer deletes the specified consumer in cache.
DeleteConsumer(*v1.Consumer) error
+ // DeleteSchema deletes the specified schema in cache.
+ DeleteSchema(*v1.Schema) error
}
diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go
index 4502034..17f9cd3 100644
--- a/pkg/apisix/cache/memdb.go
+++ b/pkg/apisix/cache/memdb.go
@@ -70,6 +70,10 @@ func (c *dbCache) InsertConsumer(consumer *v1.Consumer)
error {
return c.insert("consumer", consumer.DeepCopy())
}
+func (c *dbCache) InsertSchema(schema *v1.Schema) error {
+ return c.insert("schema", schema.DeepCopy())
+}
+
func (c *dbCache) insert(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
@@ -128,6 +132,14 @@ func (c *dbCache) GetConsumer(username string)
(*v1.Consumer, error) {
return obj.(*v1.Consumer).DeepCopy(), nil
}
+func (c *dbCache) GetSchema(name string) (*v1.Schema, error) {
+ obj, err := c.get("schema", name)
+ if err != nil {
+ return nil, err
+ }
+ return obj.(*v1.Schema).DeepCopy(), nil
+}
+
func (c *dbCache) get(table, id string) (interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
@@ -216,6 +228,18 @@ func (c *dbCache) ListConsumers() ([]*v1.Consumer, error) {
return consumers, nil
}
+func (c *dbCache) ListSchema() ([]*v1.Schema, error) {
+ raws, err := c.list("schema")
+ if err != nil {
+ return nil, err
+ }
+ schemaList := make([]*v1.Schema, 0, len(raws))
+ for _, raw := range raws {
+ schemaList = append(schemaList, raw.(*v1.Schema).DeepCopy())
+ }
+ return schemaList, nil
+}
+
func (c *dbCache) list(table string) ([]interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
@@ -257,6 +281,10 @@ func (c *dbCache) DeleteConsumer(consumer *v1.Consumer)
error {
return c.delete("consumer", consumer)
}
+func (c *dbCache) DeleteSchema(schema *v1.Schema) error {
+ return c.delete("schema", schema)
+}
+
func (c *dbCache) delete(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go
index 46d7816..ff06c9a 100644
--- a/pkg/apisix/cache/memdb_test.go
+++ b/pkg/apisix/cache/memdb_test.go
@@ -345,3 +345,47 @@ func TestMemDBCacheConsumer(t *testing.T) {
}
assert.Error(t, ErrNotFound, c.DeleteConsumer(c4))
}
+
+func TestMemDBCacheSchema(t *testing.T) {
+ c, err := NewMemDBCache()
+ assert.Nil(t, err, "NewMemDBCache")
+
+ s1 := &v1.Schema{
+ Name: "plugins/p1",
+ Content: "plugin schema",
+ }
+ assert.Nil(t, c.InsertSchema(s1), "inserting schema s1")
+
+ s11, err := c.GetSchema("plugins/p1")
+ assert.Nil(t, err)
+ assert.Equal(t, s1, s11)
+
+ s2 := &v1.Schema{
+ Name: "plugins/p2",
+ }
+ s3 := &v1.Schema{
+ Name: "plugins/p3",
+ }
+ assert.Nil(t, c.InsertSchema(s2), "inserting schema s2")
+ assert.Nil(t, c.InsertSchema(s3), "inserting schema s3")
+
+ s22, err := c.GetSchema("plugins/p2")
+ assert.Nil(t, err)
+ assert.Equal(t, s2, s22)
+
+ assert.Nil(t, c.DeleteSchema(s3), "delete schema s3")
+
+ schemaList, err := c.ListSchema()
+ assert.Nil(t, err, "listing schema")
+
+ if schemaList[0].Name > schemaList[1].Name {
+ schemaList[0], schemaList[1] = schemaList[1], schemaList[0]
+ }
+ assert.Equal(t, schemaList[0], s1)
+ assert.Equal(t, schemaList[1], s2)
+
+ s4 := &v1.Schema{
+ Name: "plugins/p4",
+ }
+ assert.Error(t, ErrNotFound, c.DeleteSchema(s4))
+}
diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go
index 9925d39..6b9e8ba 100644
--- a/pkg/apisix/cache/schema.go
+++ b/pkg/apisix/cache/schema.go
@@ -106,6 +106,16 @@ var (
},
},
},
+ "schema": {
+ Name: "schema",
+ Indexes: map[string]*memdb.IndexSchema{
+ "id": {
+ Name: "id",
+ Unique: true,
+ Indexer:
&memdb.StringFieldIndex{Field: "Name"},
+ },
+ },
+ },
},
}
)
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index baad027..34e4492 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -12,6 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+
package apisix
import (
@@ -34,10 +35,12 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
)
const (
- _defaultTimeout = 5 * time.Second
+ _defaultTimeout = 5 * time.Second
+ _defaultSyncInterval = 6 * time.Hour
_cacheSyncing = iota
_cacheSynced
@@ -72,6 +75,8 @@ type ClusterOptions struct {
AdminKey string
BaseURL string
Timeout time.Duration
+ // SyncInterval is the interval to sync schema.
+ SyncInterval types.TimeDuration
}
type cluster struct {
@@ -90,15 +95,20 @@ type cluster struct {
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
+ plugin Plugin
+ schema Schema
}
-func newCluster(o *ClusterOptions) (Cluster, error) {
+func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
if o.BaseURL == "" {
return nil, errors.New("empty base url")
}
if o.Timeout == time.Duration(0) {
o.Timeout = _defaultTimeout
}
+ if o.SyncInterval.Duration == time.Duration(0) {
+ o.SyncInterval = types.TimeDuration{Duration:
_defaultSyncInterval}
+ }
o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")
u, err := url.Parse(o.BaseURL)
@@ -124,18 +134,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.streamRoute = newStreamRouteClient(c)
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)
+ c.plugin = newPluginClient(c)
+ c.schema = newSchemaClient(c)
c.cache, err = cache.NewMemDBCache()
if err != nil {
return nil, err
}
- go c.syncCache()
+ go c.syncCache(ctx)
+ go c.syncSchema(ctx, o.SyncInterval.Duration)
return c, nil
}
-func (c *cluster) syncCache() {
+func (c *cluster) syncCache(ctx context.Context) {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
defer func() {
@@ -161,7 +174,7 @@ func (c *cluster) syncCache() {
err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
// impossibly return: false, nil
// so can safe used
- done, lastSyncErr = c.syncCacheOnce()
+ done, lastSyncErr = c.syncCacheOnce(ctx)
return
})
if err != nil {
@@ -175,33 +188,33 @@ func (c *cluster) syncCache() {
}
}
-func (c *cluster) syncCacheOnce() (bool, error) {
- routes, err := c.route.List(context.TODO())
+func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
+ routes, err := c.route.List(ctx)
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
return false, err
}
- upstreams, err := c.upstream.List(context.TODO())
+ upstreams, err := c.upstream.List(ctx)
if err != nil {
log.Errorf("failed to list upstreams in APISIX: %s", err)
return false, err
}
- ssl, err := c.ssl.List(context.TODO())
+ ssl, err := c.ssl.List(ctx)
if err != nil {
log.Errorf("failed to list ssl in APISIX: %s", err)
return false, err
}
- streamRoutes, err := c.streamRoute.List(context.TODO())
+ streamRoutes, err := c.streamRoute.List(ctx)
if err != nil {
log.Errorf("failed to list stream_routes in APISIX: %s", err)
return false, err
}
- globalRules, err := c.globalRules.List(context.TODO())
+ globalRules, err := c.globalRules.List(ctx)
if err != nil {
log.Errorf("failed to list global_rules in APISIX: %s", err)
return false, err
}
- consumers, err := c.consumer.List(context.TODO())
+ consumers, err := c.consumer.List(ctx)
if err != nil {
log.Errorf("failed to list consumers in APISIX: %s", err)
return false, err
@@ -301,6 +314,74 @@ func (c *cluster) HasSynced(ctx context.Context) error {
}
}
+// syncSchema syncs schema from APISIX regularly according to the interval.
+func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+
+ for {
+ if err := c.syncSchemaOnce(ctx); err != nil {
+ log.Warnf("failed to sync schema: %s", err)
+ }
+
+ select {
+ case <-ticker.C:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// syncSchemaOnce syncs schema from APISIX once.
+// It firstly deletes all the schema in the cache,
+// then queries and inserts to the cache.
+func (c *cluster) syncSchemaOnce(ctx context.Context) error {
+ log.Infow("syncing schema", zap.String("cluster", c.name))
+
+ schemaList, err := c.cache.ListSchema()
+ if err != nil {
+ log.Errorf("failed to list schema in the cache: %s", err)
+ return err
+ }
+ for _, s := range schemaList {
+ if err := c.cache.DeleteSchema(s); err != nil {
+ log.Warnw("failed to delete schema in cache",
+ zap.String("schemaName", s.Name),
+ zap.String("schemaContent", s.Content),
+ zap.String("error", err.Error()),
+ )
+ }
+ }
+
+ // update plugins' schema.
+ pluginList, err := c.plugin.List(ctx)
+ if err != nil {
+ log.Errorf("failed to list plugin names in APISIX: %s", err)
+ return err
+ }
+ for _, p := range pluginList {
+ ps, err := c.schema.GetPluginSchema(ctx, p)
+ if err != nil {
+ log.Warnw("failed to get plugin schema",
+ zap.String("plugin", p),
+ zap.String("error", err.Error()),
+ )
+ continue
+ }
+
+ if err := c.cache.InsertSchema(ps); err != nil {
+ log.Warnw("failed to insert schema to cache",
+ zap.String("plugin", p),
+ zap.String("cluster", c.name),
+ zap.String("error", err.Error()),
+ )
+ continue
+ }
+ }
+ return nil
+}
+
// Route implements Cluster.Route method.
func (c *cluster) Route() Route {
return c.route
@@ -331,6 +412,16 @@ func (c *cluster) Consumer() Consumer {
return c.consumer
}
+// Plugin implements Cluster.Plugin method.
+func (c *cluster) Plugin() Plugin {
+ return c.plugin
+}
+
+// Schema implements Cluster.Schema method.
+func (c *cluster) Schema() Schema {
+ return c.schema
+}
+
// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
if c.cacheSyncErr != nil {
@@ -550,3 +641,56 @@ func readBody(r io.ReadCloser, url string) string {
}
return string(data)
}
+
+// getSchema returns the schema of APISIX object.
+func (c *cluster) getSchema(ctx context.Context, url string) (string, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return "", err
+ }
+ resp, err := c.do(req)
+ if err != nil {
+ return "", err
+ }
+ defer drainBody(resp.Body, url)
+ if resp.StatusCode != http.StatusOK {
+ if resp.StatusCode == http.StatusNotFound {
+ return "", cache.ErrNotFound
+ } else {
+ err = multierr.Append(err, fmt.Errorf("unexpected
status code %d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message:
%s", readBody(resp.Body, url)))
+ }
+ return "", err
+ }
+
+ return readBody(resp.Body, url), nil
+}
+
+// getList returns a list of string.
+func (c *cluster) getList(ctx context.Context, url string) ([]string, error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := c.do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer drainBody(resp.Body, url)
+ if resp.StatusCode != http.StatusOK {
+ if resp.StatusCode == http.StatusNotFound {
+ return nil, cache.ErrNotFound
+ } else {
+ err = multierr.Append(err, fmt.Errorf("unexpected
status code %d", resp.StatusCode))
+ err = multierr.Append(err, fmt.Errorf("error message:
%s", readBody(resp.Body, url)))
+ }
+ return nil, err
+ }
+
+ var listResponse []string
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&listResponse); err != nil {
+ return nil, err
+ }
+ return listResponse, nil
+}
diff --git a/pkg/apisix/cluster_test.go b/pkg/apisix/cluster_test.go
index 523b62e..9f31e3e 100644
--- a/pkg/apisix/cluster_test.go
+++ b/pkg/apisix/cluster_test.go
@@ -19,15 +19,16 @@ import (
"context"
"testing"
- v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
+
+ v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
func TestAddCluster(t *testing.T) {
apisix, err := NewClient()
assert.Nil(t, err)
- err = apisix.AddCluster(&ClusterOptions{
+ err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
})
assert.Nil(t, err)
@@ -35,13 +36,13 @@ func TestAddCluster(t *testing.T) {
clusters := apisix.ListClusters()
assert.Len(t, clusters, 1)
- err = apisix.AddCluster(&ClusterOptions{
+ err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
BaseURL: "http://service2:9080/apisix/admin",
})
assert.Nil(t, err)
- err = apisix.AddCluster(&ClusterOptions{
+ err = apisix.AddCluster(context.Background(), &ClusterOptions{
Name: "service2",
AdminKey: "http://service3:9080/apisix/admin",
})
@@ -55,7 +56,7 @@ func TestNonExistentCluster(t *testing.T) {
apisix, err := NewClient()
assert.Nil(t, err)
- err = apisix.AddCluster(&ClusterOptions{
+ err = apisix.AddCluster(context.Background(), &ClusterOptions{
BaseURL: "http://service1:9080/apisix/admin",
})
assert.Nil(t, err)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index 160a571..d0d1d4d 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -35,6 +35,8 @@ func newNonExistentCluster() *nonExistentCluster {
streamRoute: &dummyStreamRoute{},
globalRule: &dummyGlobalRule{},
consumer: &dummyConsumer{},
+ plugin: &dummyPlugin{},
+ schema: &dummySchema{},
},
}
}
@@ -46,6 +48,8 @@ type embedDummyResourceImplementer struct {
streamRoute StreamRoute
globalRule GlobalRule
consumer Consumer
+ plugin Plugin
+ schema Schema
}
type dummyRoute struct{}
@@ -180,6 +184,18 @@ func (f *dummyConsumer) Update(_ context.Context, _
*v1.Consumer) (*v1.Consumer,
return nil, ErrClusterNotExist
}
+type dummyPlugin struct{}
+
+func (f *dummyPlugin) List(_ context.Context) ([]string, error) {
+ return nil, ErrClusterNotExist
+}
+
+type dummySchema struct{}
+
+func (f *dummySchema) GetPluginSchema(_ context.Context, _ string)
(*v1.Schema, error) {
+ return nil, ErrClusterNotExist
+}
+
func (nc *nonExistentCluster) Route() Route {
return nc.route
}
@@ -204,6 +220,14 @@ func (nc *nonExistentCluster) Consumer() Consumer {
return nc.consumer
}
+func (nc *nonExistentCluster) Plugin() Plugin {
+ return nc.plugin
+}
+
+func (nc *nonExistentCluster) Schema() Schema {
+ return nc.schema
+}
+
func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}
@@ -226,21 +250,25 @@ func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error
{ return
func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error {
return nil }
func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error {
return nil }
func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error {
return nil }
+func (c *dummyCache) InsertSchema(_ *v1.Schema) error {
return nil }
func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) {
return nil, cache.ErrNotFound }
+func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) {
return nil, cache.ErrNotFound }
func (c *dummyCache) ListRoutes() ([]*v1.Route, error) {
return nil, nil }
func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) {
return nil, nil }
func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) {
return nil, nil }
func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error) {
return nil, nil }
func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error) {
return nil, nil }
func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error) {
return nil, nil }
+func (c *dummyCache) ListSchema() ([]*v1.Schema, error) {
return nil, nil }
func (c *dummyCache) DeleteRoute(_ *v1.Route) error {
return nil }
func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error {
return nil }
func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error {
return nil }
func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error {
return nil }
func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error {
return nil }
func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error {
return nil }
+func (c *dummyCache) DeleteSchema(_ *v1.Schema) error {
return nil }
diff --git a/pkg/apisix/plugin.go b/pkg/apisix/plugin.go
new file mode 100644
index 0000000..03acce8
--- /dev/null
+++ b/pkg/apisix/plugin.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "context"
+
+ "go.uber.org/zap"
+
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+)
+
+type pluginClient struct {
+ url string
+ cluster *cluster
+}
+
+func newPluginClient(c *cluster) Plugin {
+ return &pluginClient{
+ url: c.baseURL + "/plugins",
+ cluster: c,
+ }
+}
+
+// List returns the names of all plugins.
+func (p *pluginClient) List(ctx context.Context) ([]string, error) {
+ log.Debugw("try to list plugins' names in APISIX",
+ zap.String("cluster", "default"),
+ zap.String("url", p.url),
+ )
+ pluginList, err := p.cluster.getList(ctx, p.url+"/list")
+ if err != nil {
+ log.Errorf("failed to list plugins' names: %s", err)
+ return nil, err
+ }
+ log.Debugf("plugin list: %v", pluginList)
+ return pluginList, nil
+}
diff --git a/pkg/apisix/plugin_test.go b/pkg/apisix/plugin_test.go
new file mode 100644
index 0000000..3ee6c71
--- /dev/null
+++ b/pkg/apisix/plugin_test.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "golang.org/x/net/nettest"
+)
+
+type fakeAPISIXPluginSrv struct {
+ plugins []string
+}
+
+var fakePluginNames = []string{
+ "plugin-1",
+ "plugin-2",
+ "plugin-3",
+}
+
+func (srv *fakeAPISIXPluginSrv) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/plugins") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ data, _ := json.Marshal(srv.plugins)
+ _, _ = w.Write(data)
+ w.WriteHeader(http.StatusOK)
+ return
+ }
+}
+
+func runFakePluginSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXPluginSrv{
+ plugins: fakePluginNames,
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestPluginClient(t *testing.T) {
+ srv := runFakePluginSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+
+ closedCh := make(chan struct{})
+ close(closedCh)
+ cli := newPluginClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ })
+
+ // List
+ objs, err := cli.List(context.Background())
+ assert.Nil(t, err)
+ assert.Len(t, objs, len(fakePluginNames))
+ for i := range fakePluginNames {
+ assert.Equal(t, objs[i], fakePluginNames[i])
+ }
+}
diff --git a/pkg/apisix/resource_test.go b/pkg/apisix/resource_test.go
index 6723727..c640b97 100644
--- a/pkg/apisix/resource_test.go
+++ b/pkg/apisix/resource_test.go
@@ -19,6 +19,7 @@ import (
"testing"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+
"github.com/stretchr/testify/assert"
)
diff --git a/pkg/apisix/schema.go b/pkg/apisix/schema.go
new file mode 100644
index 0000000..1727124
--- /dev/null
+++ b/pkg/apisix/schema.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apisix
+
+import (
+ "context"
+
+ "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+ "github.com/apache/apisix-ingress-controller/pkg/id"
+ "github.com/apache/apisix-ingress-controller/pkg/log"
+ v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+
+ "go.uber.org/zap"
+)
+
+type schemaClient struct {
+ url string
+ cluster *cluster
+}
+
+func newSchemaClient(c *cluster) Schema {
+ return &schemaClient{
+ url: c.baseURL + "/schema/",
+ cluster: c,
+ }
+}
+
+// GetSchema returns APISIX object's schema.
+func (sc schemaClient) getSchema(ctx context.Context, name string)
(*v1.Schema, error) {
+ log.Debugw("try to look up schema",
+ zap.String("name", name),
+ zap.String("url", sc.url),
+ zap.String("cluster", "default"),
+ )
+
+ sid := id.GenID(name)
+ schema, err := sc.cluster.cache.GetSchema(sid)
+ if err == nil {
+ return schema, nil
+ }
+ if err == cache.ErrNotFound {
+ log.Debugw("failed to find schema in cache, will try to lookup
from APISIX",
+ zap.String("name", name),
+ zap.Error(err),
+ )
+ } else {
+ log.Errorw("failed to find schema in cache, will try to lookup
from APISIX",
+ zap.String("name", name),
+ zap.Error(err),
+ )
+ }
+
+ url := sc.url + "/" + name
+ content, err := sc.cluster.getSchema(ctx, url)
+ if err != nil {
+ log.Errorw("failed to get schema from APISIX",
+ zap.String("name", name),
+ zap.String("url", url),
+ zap.String("cluster", "default"),
+ zap.Error(err),
+ )
+ return nil, err
+ }
+
+ schema = &v1.Schema{
+ Name: name,
+ Content: content,
+ }
+ if err := sc.cluster.cache.InsertSchema(schema); err != nil {
+ log.Errorf("failed to reflect schema create to cache: %s", err)
+ return nil, err
+ }
+ return schema, nil
+}
+
+// GetPluginSchema returns plugin's schema.
+func (sc schemaClient) GetPluginSchema(ctx context.Context, pluginName string)
(*v1.Schema, error) {
+ return sc.getSchema(ctx, "plugins/"+pluginName)
+}
diff --git a/pkg/apisix/schema_test.go b/pkg/apisix/schema_test.go
new file mode 100644
index 0000000..cc782fb
--- /dev/null
+++ b/pkg/apisix/schema_test.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+ "context"
+ "net/http"
+ "net/url"
+ "strings"
+ "testing"
+
+ "golang.org/x/net/nettest"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type fakeAPISIXSchemaSrv struct {
+ schema map[string]string
+}
+
+var testData = map[string]string{
+ // plugins' schema
+ "plugins/key-auth": `{"$comment":"this is a mark for our
injected plugin
schema","type":"object","additionalProperties":false,"properties":{"disable":{"type":"boolean"},"header":{"default":"apikey","type":"string"}}}`,
+ "plugins/batch-requests": `{"$comment":"this is a mark for our
injected plugin
schema","type":"object","additionalProperties":false,"properties":{"disable":{"type":"boolean"}}}`,
+ "plugins/ext-plugin-pre-req":
`{"properties":{"disable":{"type":"boolean"},"extra_info":{"items":{"type":"string","minLength":1,"maxLength":64},"minItems":1,"type":"array"},"conf":{"items":{"properties":{"value":{"type":"string"},"name":{"type":"string","minLength":1,"maxLength":128}},"type":"object"},"minItems":1,"type":"array"}},"$comment":"this
is a mark for our injected plugin schema","type":"object"}`,
+}
+
+const errMsg = `{"error_msg":"not found schema"}`
+
+func (srv *fakeAPISIXSchemaSrv) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
+ defer r.Body.Close()
+
+ if !strings.HasPrefix(r.URL.Path, "/apisix/admin/schema") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ if r.Method == http.MethodGet {
+ name := strings.Trim(strings.TrimPrefix(r.URL.Path,
"/apisix/admin/schema/"), "/")
+ if len(name) < 1 {
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ if resp, ok := srv.schema[name]; ok {
+ _, _ = w.Write([]byte(resp))
+ } else {
+ _, _ = w.Write([]byte(errMsg))
+ }
+ w.WriteHeader(http.StatusOK)
+ return
+ }
+
+}
+
+func runFakeSchemaSrv(t *testing.T) *http.Server {
+ srv := &fakeAPISIXSchemaSrv{
+ schema: testData,
+ }
+
+ ln, _ := nettest.NewLocalListener("tcp")
+
+ httpSrv := &http.Server{
+ Addr: ln.Addr().String(),
+ Handler: srv,
+ }
+
+ go func() {
+ if err := httpSrv.Serve(ln); err != nil && err !=
http.ErrServerClosed {
+ t.Errorf("failed to run http server: %s", err)
+ }
+ }()
+
+ return httpSrv
+}
+
+func TestSchemaClient(t *testing.T) {
+ srv := runFakeSchemaSrv(t)
+ defer func() {
+ assert.Nil(t, srv.Shutdown(context.Background()))
+ }()
+
+ u := url.URL{
+ Scheme: "http",
+ Host: srv.Addr,
+ Path: "/apisix/admin",
+ }
+
+ closedCh := make(chan struct{})
+ close(closedCh)
+ cli := newSchemaClient(&cluster{
+ baseURL: u.String(),
+ cli: http.DefaultClient,
+ cache: &dummyCache{},
+ cacheSynced: closedCh,
+ })
+
+ // Test `GetPluginSchema`.
+ for name := range testData {
+ list := strings.Split(name, "/")
+ assert.Greater(t, len(list), 0)
+
+ schema, err := cli.GetPluginSchema(context.Background(),
list[len(list)-1])
+ assert.Nil(t, err)
+ assert.Equal(t, schema.Name, name)
+ assert.Equal(t, schema.Content, testData[name])
+ }
+
+ // Test getting a non-existent plugin's schema.
+ schema, err := cli.GetPluginSchema(context.Background(), "not-a-plugin")
+ assert.Nil(t, err)
+ assert.Equal(t, schema.Content, errMsg)
+}
diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go
index 4f8585c..41602a7 100644
--- a/pkg/apisix/ssl_test.go
+++ b/pkg/apisix/ssl_test.go
@@ -28,6 +28,7 @@ import (
"testing"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
)
diff --git a/pkg/apisix/stream_route_test.go b/pkg/apisix/stream_route_test.go
index 35411ca..778131b 100644
--- a/pkg/apisix/stream_route_test.go
+++ b/pkg/apisix/stream_route_test.go
@@ -27,6 +27,7 @@ import (
"testing"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"
)
diff --git a/pkg/ingress/apisix_cluster_config.go
b/pkg/ingress/apisix_cluster_config.go
index 9af9f3d..835203b 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -136,7 +136,7 @@ func (c *apisixClusterConfigController) sync(ctx
context.Context, ev *types.Even
)
// TODO we may first call AddCluster.
// Since now we already have the default cluster, we just call
UpdateCluster.
- if err := c.controller.apisix.UpdateCluster(clusterOpts); err
!= nil {
+ if err := c.controller.apisix.UpdateCluster(ctx, clusterOpts);
err != nil {
log.Errorw("failed to update cluster",
zap.String("cluster_name", acc.Name),
zap.Error(err),
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 9897256..4d59b52 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -21,8 +21,6 @@ import (
"sync"
"time"
- apisixcache
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
- configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -39,8 +37,10 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/api"
"github.com/apache/apisix-ingress-controller/pkg/apisix"
+ apisixcache
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
+ configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
apisixscheme
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
listersv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
listersv2alpha1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1"
@@ -379,7 +379,7 @@ func (c *Controller) run(ctx context.Context) {
AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
}
- err := c.apisix.AddCluster(clusterOpts)
+ err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
@@ -391,7 +391,7 @@ func (c *Controller) run(ctx context.Context) {
log.Errorf("failed to wait the default cluster to be ready:
%s", err)
// re-create apisix cluster, used in next c.run
- if err = c.apisix.UpdateCluster(clusterOpts); err != nil {
+ if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
diff --git a/pkg/kube/translation/ingress_test.go
b/pkg/kube/translation/ingress_test.go
index 57d119b..feb0026 100644
--- a/pkg/kube/translation/ingress_test.go
+++ b/pkg/kube/translation/ingress_test.go
@@ -16,21 +16,20 @@ package translation
import (
"context"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
"testing"
- extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
- networkingv1beta1 "k8s.io/api/networking/v1beta1"
-
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
+ extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
+ networkingv1beta1 "k8s.io/api/networking/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
fakeapisix
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake"
apisixinformers
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
)
diff --git a/pkg/kube/translation/plugin_test.go
b/pkg/kube/translation/plugin_test.go
index 2cfa206..9a2642c 100644
--- a/pkg/kube/translation/plugin_test.go
+++ b/pkg/kube/translation/plugin_test.go
@@ -16,7 +16,6 @@ package translation
import (
"context"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
"testing"
"github.com/stretchr/testify/assert"
@@ -28,6 +27,7 @@ import (
"k8s.io/client-go/tools/cache"
"github.com/apache/apisix-ingress-controller/pkg/id"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
configv2alpha1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
apisixfake
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/fake"
apisixinformers
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
diff --git a/pkg/kube/translation/translator_test.go
b/pkg/kube/translation/translator_test.go
index f744403..19fe3cf 100644
--- a/pkg/kube/translation/translator_test.go
+++ b/pkg/kube/translation/translator_test.go
@@ -18,19 +18,18 @@ import (
"context"
"testing"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
- discoveryv1 "k8s.io/api/discovery/v1"
-
- apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
+ discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+ apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
func TestTranslateUpstreamConfig(t *testing.T) {
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 9c27333..2858d53 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -463,3 +463,23 @@ func ComposeConsumerName(namespace, name string) string {
return buf.String()
}
+
+// Schema represents the schema of APISIX objects.
+type Schema struct {
+ Name string `json:"name,omitempty" yaml:"name,omitempty"`
+ Content string `json:"content,omitempty" yaml:"content,omitempty"`
+}
+
+func (s *Schema) DeepCopyInto(out *Schema) {
+ b, _ := json.Marshal(&s)
+ _ = json.Unmarshal(b, out)
+}
+
+func (s *Schema) DeepCopy() *Schema {
+ if s == nil {
+ return nil
+ }
+ out := new(Schema)
+ s.DeepCopyInto(out)
+ return out
+}
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 941ec41..cbbe869 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -238,7 +238,7 @@ func (s *Scaffold) ListApisixUpstreams() ([]*v1.Upstream,
error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})
@@ -259,7 +259,7 @@ func (s *Scaffold) ListApisixGlobalRules()
([]*v1.GlobalRule, error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})
@@ -280,7 +280,7 @@ func (s *Scaffold) ListApisixRoutes() ([]*v1.Route, error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})
@@ -301,7 +301,7 @@ func (s *Scaffold) ListApisixConsumers() ([]*v1.Consumer,
error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})
@@ -322,7 +322,7 @@ func (s *Scaffold) ListApisixStreamRoutes()
([]*v1.StreamRoute, error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})
@@ -343,7 +343,7 @@ func (s *Scaffold) ListApisixSsl() ([]*v1.Ssl, error) {
if err != nil {
return nil, err
}
- err = cli.AddCluster(&apisix.ClusterOptions{
+ err = cli.AddCluster(context.Background(), &apisix.ClusterOptions{
BaseURL: u.String(),
AdminKey: s.opts.APISIXAdminAPIKey,
})