This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dd5087  chore: consumer data structures (#470)
1dd5087 is described below

commit 1dd5087aea443a0aeddb62a8aa0af90aab2bf48e
Author: Alex Zhang <[email protected]>
AuthorDate: Tue May 25 15:27:12 2021 +0800

    chore: consumer data structures (#470)
---
 pkg/apisix/apisix.go                         |  20 ++-
 pkg/apisix/cache/cache.go                    |   8 +
 pkg/apisix/cache/memdb.go                    |  28 ++++
 pkg/apisix/cache/memdb_test.go               |  45 +++++-
 pkg/apisix/cache/schema.go                   |  10 ++
 pkg/apisix/cluster.go                        |  16 ++
 pkg/apisix/consumer.go                       | 224 +++++++++++++++++++++++++++
 pkg/apisix/consumer_test.go                  | 199 ++++++++++++++++++++++++
 pkg/apisix/nonexistentclient.go              |   4 +
 pkg/apisix/resource.go                       |  10 ++
 pkg/types/apisix/v1/types.go                 |   9 ++
 pkg/types/apisix/v1/zz_generated.deepcopy.go |  24 +++
 12 files changed, 591 insertions(+), 6 deletions(-)

diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index a33c716..ffcb405 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -53,7 +53,7 @@ type Cluster interface {
 }
 
 // Route is the specific client interface to take over the create, update,
-// list and delete for APISIX's Route resource.
+// list and delete for APISIX Route resource.
 type Route interface {
        Get(context.Context, string) (*v1.Route, error)
        List(context.Context) ([]*v1.Route, error)
@@ -63,7 +63,7 @@ type Route interface {
 }
 
 // SSL is the specific client interface to take over the create, update,
-// list and delete for APISIX's SSL resource.
+// list and delete for APISIX SSL resource.
 type SSL interface {
        Get(context.Context, string) (*v1.Ssl, error)
        List(context.Context) ([]*v1.Ssl, error)
@@ -73,7 +73,7 @@ type SSL interface {
 }
 
 // Upstream is the specific client interface to take over the create, update,
-// list and delete for APISIX's Upstream resource.
+// list and delete for APISIX Upstream resource.
 type Upstream interface {
        Get(context.Context, string) (*v1.Upstream, error)
        List(context.Context) ([]*v1.Upstream, error)
@@ -83,7 +83,7 @@ type Upstream interface {
 }
 
 // StreamRoute is the specific client interface to take over the create, 
update,
-// list and delete for APISIX's Stream Route resource.
+// list and delete for APISIX Stream Route resource.
 type StreamRoute interface {
        Get(context.Context, string) (*v1.StreamRoute, error)
        List(context.Context) ([]*v1.StreamRoute, error)
@@ -93,7 +93,7 @@ type StreamRoute interface {
 }
 
 // GlobalRule is the specific client interface to take over the create, update,
-// list and delete for APISIX's Global Rule resource.
+// list and delete for APISIX Global Rule resource.
 type GlobalRule interface {
        Get(context.Context, string) (*v1.GlobalRule, error)
        List(context.Context) ([]*v1.GlobalRule, error)
@@ -102,6 +102,16 @@ type GlobalRule interface {
        Update(context.Context, *v1.GlobalRule) (*v1.GlobalRule, error)
 }
 
+// Consumer it the specific client interface to take over the create, update,
+// list and delete for APISIX Consumer resource.
+type Consumer interface {
+       Get(context.Context, string) (*v1.Consumer, error)
+       List(context.Context) ([]*v1.Consumer, error)
+       Create(context.Context, *v1.Consumer) (*v1.Consumer, error)
+       Delete(context.Context, *v1.Consumer) error
+       Update(context.Context, *v1.Consumer) (*v1.Consumer, error)
+}
+
 type apisix struct {
        mu                 sync.RWMutex
        nonExistentCluster Cluster
diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go
index c213f3e..48fc93b 100644
--- a/pkg/apisix/cache/cache.go
+++ b/pkg/apisix/cache/cache.go
@@ -33,6 +33,8 @@ type Cache interface {
        InsertStreamRoute(*v1.StreamRoute) error
        // InsertGlobalRule adds or updates global_rule to cache.
        InsertGlobalRule(*v1.GlobalRule) error
+       // InsertConsumer adds or updates consumer to cache.
+       InsertConsumer(*v1.Consumer) error
 
        // GetRoute finds the route from cache according to the primary index 
(id).
        GetRoute(string) (*v1.Route, error)
@@ -44,6 +46,8 @@ type Cache interface {
        GetStreamRoute(string) (*v1.StreamRoute, error)
        // GetGlobalRule finds the global_rule from cache according to the 
primary index (id).
        GetGlobalRule(string) (*v1.GlobalRule, error)
+       // GetConsumer finds the consumer from cache according to the primary 
index (id).
+       GetConsumer(string) (*v1.Consumer, error)
 
        // ListRoutes lists all routes in cache.
        ListRoutes() ([]*v1.Route, error)
@@ -55,6 +59,8 @@ type Cache interface {
        ListStreamRoutes() ([]*v1.StreamRoute, error)
        // ListGlobalRules lists all global_rule objects in cache.
        ListGlobalRules() ([]*v1.GlobalRule, error)
+       // ListConsumers lists all consumer objects in cache.
+       ListConsumers() ([]*v1.Consumer, error)
 
        // DeleteRoute deletes the specified route in cache.
        DeleteRoute(*v1.Route) error
@@ -66,4 +72,6 @@ type Cache interface {
        DeleteStreamRoute(*v1.StreamRoute) error
        // DeleteGlobalRule deletes the specified stream_route in cache.
        DeleteGlobalRule(*v1.GlobalRule) error
+       // DeleteConsumer deletes the specified consumer in cache.
+       DeleteConsumer(*v1.Consumer) error
 }
diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go
index 18de938..4502034 100644
--- a/pkg/apisix/cache/memdb.go
+++ b/pkg/apisix/cache/memdb.go
@@ -66,6 +66,10 @@ func (c *dbCache) InsertGlobalRule(gr *v1.GlobalRule) error {
        return c.insert("global_rule", gr.DeepCopy())
 }
 
+func (c *dbCache) InsertConsumer(consumer *v1.Consumer) error {
+       return c.insert("consumer", consumer.DeepCopy())
+}
+
 func (c *dbCache) insert(table string, obj interface{}) error {
        txn := c.db.Txn(true)
        defer txn.Abort()
@@ -116,6 +120,14 @@ func (c *dbCache) GetGlobalRule(id string) 
(*v1.GlobalRule, error) {
        return obj.(*v1.GlobalRule).DeepCopy(), nil
 }
 
+func (c *dbCache) GetConsumer(username string) (*v1.Consumer, error) {
+       obj, err := c.get("consumer", username)
+       if err != nil {
+               return nil, err
+       }
+       return obj.(*v1.Consumer).DeepCopy(), nil
+}
+
 func (c *dbCache) get(table, id string) (interface{}, error) {
        txn := c.db.Txn(false)
        defer txn.Abort()
@@ -192,6 +204,18 @@ func (c *dbCache) ListGlobalRules() ([]*v1.GlobalRule, 
error) {
        return globalRules, nil
 }
 
+func (c *dbCache) ListConsumers() ([]*v1.Consumer, error) {
+       raws, err := c.list("consumer")
+       if err != nil {
+               return nil, err
+       }
+       consumers := make([]*v1.Consumer, 0, len(raws))
+       for _, raw := range raws {
+               consumers = append(consumers, raw.(*v1.Consumer).DeepCopy())
+       }
+       return consumers, nil
+}
+
 func (c *dbCache) list(table string) ([]interface{}, error) {
        txn := c.db.Txn(false)
        defer txn.Abort()
@@ -229,6 +253,10 @@ func (c *dbCache) DeleteGlobalRule(gr *v1.GlobalRule) 
error {
        return c.delete("global_rule", gr)
 }
 
+func (c *dbCache) DeleteConsumer(consumer *v1.Consumer) error {
+       return c.delete("consumer", consumer)
+}
+
 func (c *dbCache) delete(table string, obj interface{}) error {
        txn := c.db.Txn(true)
        defer txn.Abort()
diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go
index 91858d7..46d7816 100644
--- a/pkg/apisix/cache/memdb_test.go
+++ b/pkg/apisix/cache/memdb_test.go
@@ -284,7 +284,7 @@ func TestMemDBCacheGlobalRule(t *testing.T) {
 
        gr, err = c.GetGlobalRule("3")
        assert.Nil(t, err)
-       assert.Equal(t, gr, gr)
+       assert.Equal(t, gr, gr3)
 
        assert.Nil(t, c.DeleteGlobalRule(gr), "delete global_rule r3")
 
@@ -302,3 +302,46 @@ func TestMemDBCacheGlobalRule(t *testing.T) {
        }
        assert.Error(t, ErrNotFound, c.DeleteGlobalRule(gr4))
 }
+
+func TestMemDBCacheConsumer(t *testing.T) {
+       c, err := NewMemDBCache()
+       assert.Nil(t, err, "NewMemDBCache")
+
+       c1 := &v1.Consumer{
+               Username: "jack",
+       }
+       assert.Nil(t, c.InsertConsumer(c1), "inserting consumer c1")
+
+       c11, err := c.GetConsumer("jack")
+       assert.Nil(t, err)
+       assert.Equal(t, c1, c11)
+
+       c2 := &v1.Consumer{
+               Username: "tom",
+       }
+       c3 := &v1.Consumer{
+               Username: "jerry",
+       }
+       assert.Nil(t, c.InsertConsumer(c2), "inserting consumer c2")
+       assert.Nil(t, c.InsertConsumer(c3), "inserting consumer c3")
+
+       c22, err := c.GetConsumer("tom")
+       assert.Nil(t, err)
+       assert.Equal(t, c2, c22)
+
+       assert.Nil(t, c.DeleteConsumer(c3), "delete consumer c3")
+
+       consumers, err := c.ListConsumers()
+       assert.Nil(t, err, "listing consumers")
+
+       if consumers[0].Username > consumers[1].Username {
+               consumers[0], consumers[1] = consumers[1], consumers[0]
+       }
+       assert.Equal(t, consumers[0], c1)
+       assert.Equal(t, consumers[1], c2)
+
+       c4 := &v1.Consumer{
+               Username: "chandler",
+       }
+       assert.Error(t, ErrNotFound, c.DeleteConsumer(c4))
+}
diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go
index c3e8877..9925d39 100644
--- a/pkg/apisix/cache/schema.go
+++ b/pkg/apisix/cache/schema.go
@@ -96,6 +96,16 @@ var (
                                        },
                                },
                        },
+                       "consumer": {
+                               Name: "consumer",
+                               Indexes: map[string]*memdb.IndexSchema{
+                                       "id": {
+                                               Name:    "id",
+                                               Unique:  true,
+                                               Indexer: 
&memdb.StringFieldIndex{Field: "Username"},
+                                       },
+                               },
+                       },
                },
        }
 )
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 846a17a..d41e5dd 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -74,6 +74,7 @@ type cluster struct {
        ssl          SSL
        streamRoute  StreamRoute
        globalRules  GlobalRule
+       consumer     Consumer
 }
 
 func newCluster(o *ClusterOptions) (Cluster, error) {
@@ -105,6 +106,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
        c.ssl = newSSLClient(c)
        c.streamRoute = newStreamRouteClient(c)
        c.globalRules = newGlobalRuleClient(c)
+       c.consumer = newConsumerClient(c)
 
        go c.syncCache()
 
@@ -179,6 +181,11 @@ func (c *cluster) syncCacheOnce() (bool, error) {
                log.Errorf("failed to list global_rules in APISIX: %s", err)
                return false, err
        }
+       consumers, err := c.consumer.List(context.TODO())
+       if err != nil {
+               log.Errorf("failed to list consumers in APISIX: %s", err)
+               return false, err
+       }
 
        for _, r := range routes {
                if err := c.cache.InsertRoute(r); err != nil {
@@ -230,6 +237,15 @@ func (c *cluster) syncCacheOnce() (bool, error) {
                        return false, err
                }
        }
+       for _, consumer := range consumers {
+               if err := c.cache.InsertConsumer(consumer); err != nil {
+                       log.Errorw("failed to insert consumer to cache",
+                               zap.Any("consumer", consumer),
+                               zap.String("cluster", c.name),
+                               zap.String("error", err.Error()),
+                       )
+               }
+       }
        return true, nil
 }
 
diff --git a/pkg/apisix/consumer.go b/pkg/apisix/consumer.go
new file mode 100644
index 0000000..7f7a53a
--- /dev/null
+++ b/pkg/apisix/consumer.go
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+
+       "go.uber.org/zap"
+
+       "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+       "github.com/apache/apisix-ingress-controller/pkg/log"
+       v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+type consumerClient struct {
+       url     string
+       cluster *cluster
+}
+
+func newConsumerClient(c *cluster) Consumer {
+       return &consumerClient{
+               url:     c.baseURL + "/consumers",
+               cluster: c,
+       }
+}
+
+// Get returns the Consumer.
+// FIXME, currently if caller pass a non-existent resource, the Get always 
passes
+// through cache.
+func (r *consumerClient) Get(ctx context.Context, name string) (*v1.Consumer, 
error) {
+       log.Debugw("try to look up consumer",
+               zap.String("name", name),
+               zap.String("url", r.url),
+               zap.String("cluster", "default"),
+       )
+       consumer, err := r.cluster.cache.GetConsumer(name)
+       if err == nil {
+               return consumer, nil
+       }
+       if err != cache.ErrNotFound {
+               log.Errorw("failed to find consumer in cache, will try to 
lookup from APISIX",
+                       zap.String("name", name),
+                       zap.Error(err),
+               )
+       } else {
+               log.Debugw("consumer not found in cache, will try to lookup 
from APISIX",
+                       zap.String("name", name),
+                       zap.Error(err),
+               )
+       }
+
+       // TODO Add mutex here to avoid dog-pile effect.
+       url := r.url + "/" + name
+       resp, err := r.cluster.getResource(ctx, url)
+       if err != nil {
+               if err == cache.ErrNotFound {
+                       log.Warnw("consumer not found",
+                               zap.String("name", name),
+                               zap.String("url", url),
+                               zap.String("cluster", "default"),
+                       )
+               } else {
+                       log.Errorw("failed to get consumer from APISIX",
+                               zap.String("name", name),
+                               zap.String("url", url),
+                               zap.String("cluster", "default"),
+                               zap.Error(err),
+                       )
+               }
+               return nil, err
+       }
+
+       consumer, err = resp.Item.consumer()
+       if err != nil {
+               log.Errorw("failed to convert consumer item",
+                       zap.String("url", r.url),
+                       zap.String("consumer_key", resp.Item.Key),
+                       zap.String("consumer_value", string(resp.Item.Value)),
+                       zap.Error(err),
+               )
+               return nil, err
+       }
+
+       if err := r.cluster.cache.InsertConsumer(consumer); err != nil {
+               log.Errorf("failed to reflect consumer create to cache: %s", 
err)
+               return nil, err
+       }
+       return consumer, nil
+}
+
+// List is only used in cache warming up. So here just pass through
+// to APISIX.
+func (r *consumerClient) List(ctx context.Context) ([]*v1.Consumer, error) {
+       log.Debugw("try to list consumers in APISIX",
+               zap.String("cluster", "default"),
+               zap.String("url", r.url),
+       )
+       consumerItems, err := r.cluster.listResource(ctx, r.url)
+       if err != nil {
+               log.Errorf("failed to list consumers: %s", err)
+               return nil, err
+       }
+
+       var items []*v1.Consumer
+       for i, item := range consumerItems.Node.Items {
+               consumer, err := item.consumer()
+               if err != nil {
+                       log.Errorw("failed to convert consumer item",
+                               zap.String("url", r.url),
+                               zap.String("consumer_key", item.Key),
+                               zap.String("consumer_value", 
string(item.Value)),
+                               zap.Error(err),
+                       )
+                       return nil, err
+               }
+
+               items = append(items, consumer)
+               log.Debugf("list consumer #%d, body: %s", i, string(item.Value))
+       }
+
+       return items, nil
+}
+
+func (r *consumerClient) Create(ctx context.Context, obj *v1.Consumer) 
(*v1.Consumer, error) {
+       log.Debugw("try to create consumer",
+               zap.String("name", obj.Username),
+               zap.Any("plugins", obj.Plugins),
+               zap.String("cluster", "default"),
+               zap.String("url", r.url),
+       )
+
+       if err := r.cluster.HasSynced(ctx); err != nil {
+               return nil, err
+       }
+       data, err := json.Marshal(obj)
+       if err != nil {
+               return nil, err
+       }
+
+       url := r.url + "/" + obj.Username
+       log.Debugw("creating consumer", zap.ByteString("body", data), 
zap.String("url", url))
+       resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
+       if err != nil {
+               log.Errorf("failed to create consumer: %s", err)
+               return nil, err
+       }
+
+       consumer, err := resp.Item.consumer()
+       if err != nil {
+               return nil, err
+       }
+       if err := r.cluster.cache.InsertConsumer(consumer); err != nil {
+               log.Errorf("failed to reflect consumer create to cache: %s", 
err)
+               return nil, err
+       }
+       return consumer, nil
+}
+
+func (r *consumerClient) Delete(ctx context.Context, obj *v1.Consumer) error {
+       log.Debugw("try to delete consumer",
+               zap.String("name", obj.Username),
+               zap.String("cluster", "default"),
+               zap.String("url", r.url),
+       )
+       if err := r.cluster.HasSynced(ctx); err != nil {
+               return err
+       }
+       url := r.url + "/" + obj.Username
+       if err := r.cluster.deleteResource(ctx, url); err != nil {
+               return err
+       }
+       if err := r.cluster.cache.DeleteConsumer(obj); err != nil {
+               log.Errorf("failed to reflect consumer delete to cache: %s", 
err)
+               if err != cache.ErrNotFound {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (r *consumerClient) Update(ctx context.Context, obj *v1.Consumer) 
(*v1.Consumer, error) {
+       log.Debugw("try to update consumer",
+               zap.String("name", obj.Username),
+               zap.Any("plugins", obj.Plugins),
+               zap.String("cluster", "default"),
+               zap.String("url", r.url),
+       )
+       if err := r.cluster.HasSynced(ctx); err != nil {
+               return nil, err
+       }
+       body, err := json.Marshal(obj)
+       if err != nil {
+               return nil, err
+       }
+       url := r.url + "/" + obj.Username
+       log.Debugw("updating username", zap.ByteString("body", body), 
zap.String("url", url))
+       resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
+       if err != nil {
+               return nil, err
+       }
+       consumer, err := resp.Item.consumer()
+       if err != nil {
+               return nil, err
+       }
+       if err := r.cluster.cache.InsertConsumer(consumer); err != nil {
+               log.Errorf("failed to reflect consumer update to cache: %s", 
err)
+               return nil, err
+       }
+       return consumer, nil
+}
diff --git a/pkg/apisix/consumer_test.go b/pkg/apisix/consumer_test.go
new file mode 100644
index 0000000..e029d13
--- /dev/null
+++ b/pkg/apisix/consumer_test.go
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package apisix
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "net/url"
+       "sort"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "golang.org/x/net/nettest"
+
+       v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+type fakeAPISIXConsumerSrv struct {
+       consumer map[string]json.RawMessage
+}
+
+func (srv *fakeAPISIXConsumerSrv) ServeHTTP(w http.ResponseWriter, r 
*http.Request) {
+       defer r.Body.Close()
+
+       if !strings.HasPrefix(r.URL.Path, "/apisix/admin/consumers") {
+               w.WriteHeader(http.StatusNotFound)
+               return
+       }
+
+       if r.Method == http.MethodGet {
+               resp := fakeListResp{
+                       Count: strconv.Itoa(len(srv.consumer)),
+                       Node: fakeNode{
+                               Key: "/apisix/consumers",
+                       },
+               }
+               var keys []string
+               for key := range srv.consumer {
+                       keys = append(keys, key)
+               }
+               sort.Strings(keys)
+               for _, key := range keys {
+                       resp.Node.Items = append(resp.Node.Items, fakeItem{
+                               Key:   key,
+                               Value: srv.consumer[key],
+                       })
+               }
+               w.WriteHeader(http.StatusOK)
+               data, _ := json.Marshal(resp)
+               _, _ = w.Write(data)
+               return
+       }
+
+       if r.Method == http.MethodDelete {
+               id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/consumers/")
+               id = "/apisix/admin/consumers/" + id
+               code := http.StatusNotFound
+               if _, ok := srv.consumer[id]; ok {
+                       delete(srv.consumer, id)
+                       code = http.StatusOK
+               }
+               w.WriteHeader(code)
+       }
+
+       if r.Method == http.MethodPut {
+               paths := strings.Split(r.URL.Path, "/")
+               key := fmt.Sprintf("/apisix/admin/consumers/%s", 
paths[len(paths)-1])
+               data, _ := ioutil.ReadAll(r.Body)
+               srv.consumer[key] = data
+               w.WriteHeader(http.StatusCreated)
+               resp := fakeCreateResp{
+                       Action: "create",
+                       Node: fakeItem{
+                               Key:   key,
+                               Value: json.RawMessage(data),
+                       },
+               }
+               data, _ = json.Marshal(resp)
+               _, _ = w.Write(data)
+               return
+       }
+
+       if r.Method == http.MethodPatch {
+               id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/consumers/")
+               id = "/apisix/consumers/" + id
+               if _, ok := srv.consumer[id]; !ok {
+                       w.WriteHeader(http.StatusNotFound)
+                       return
+               }
+
+               data, _ := ioutil.ReadAll(r.Body)
+               srv.consumer[id] = data
+
+               w.WriteHeader(http.StatusOK)
+               output := fmt.Sprintf(`{"action": "compareAndSwap", "node": 
{"key": "%s", "value": %s}}`, id, string(data))
+               _, _ = w.Write([]byte(output))
+               return
+       }
+}
+
+func runFakeConsumerSrv(t *testing.T) *http.Server {
+       srv := &fakeAPISIXConsumerSrv{
+               consumer: make(map[string]json.RawMessage),
+       }
+
+       ln, _ := nettest.NewLocalListener("tcp")
+
+       httpSrv := &http.Server{
+               Addr:    ln.Addr().String(),
+               Handler: srv,
+       }
+
+       go func() {
+               if err := httpSrv.Serve(ln); err != nil && err != 
http.ErrServerClosed {
+                       t.Errorf("failed to run http server: %s", err)
+               }
+       }()
+
+       return httpSrv
+}
+
+func TestConsumerClient(t *testing.T) {
+       srv := runFakeConsumerSrv(t)
+       defer func() {
+               assert.Nil(t, srv.Shutdown(context.Background()))
+       }()
+
+       u := url.URL{
+               Scheme: "http",
+               Host:   srv.Addr,
+               Path:   "/apisix/admin",
+       }
+
+       closedCh := make(chan struct{})
+       close(closedCh)
+       cli := newConsumerClient(&cluster{
+               baseURL:     u.String(),
+               cli:         http.DefaultClient,
+               cache:       &dummyCache{},
+               cacheSynced: closedCh,
+       })
+
+       // Create
+       obj, err := cli.Create(context.Background(), &v1.Consumer{
+               Username: "1",
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, obj.Username, "1")
+
+       obj, err = cli.Create(context.Background(), &v1.Consumer{
+               Username: "2",
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, obj.Username, "2")
+
+       // List
+       objs, err := cli.List(context.Background())
+       assert.Nil(t, err)
+       assert.Len(t, objs, 2)
+       assert.Equal(t, objs[0].Username, "1")
+       assert.Equal(t, objs[1].Username, "2")
+
+       // Delete then List
+       assert.Nil(t, cli.Delete(context.Background(), objs[0]))
+       objs, err = cli.List(context.Background())
+       assert.Nil(t, err)
+       assert.Len(t, objs, 1)
+       assert.Equal(t, "2", objs[0].Username)
+
+       // Patch then List
+       _, err = cli.Update(context.Background(), &v1.Consumer{
+               Username: "2",
+               Plugins: map[string]interface{}{
+                       "prometheus": struct{}{},
+               },
+       })
+       assert.Nil(t, err)
+       objs, err = cli.List(context.Background())
+       assert.Nil(t, err)
+       assert.Len(t, objs, 1)
+       assert.Equal(t, "2", objs[0].Username)
+}
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index d1a2c45..63c12a4 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -193,18 +193,22 @@ func (c *dummyCache) InsertSSL(_ *v1.Ssl) error           
             { return
 func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error              { 
return nil }
 func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error        { 
return nil }
 func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error          { 
return nil }
+func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error              { 
return nil }
 func (c *dummyCache) GetRoute(_ string) (*v1.Route, error)             { 
return nil, cache.ErrNotFound }
 func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error)                 { 
return nil, cache.ErrNotFound }
 func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error)       { 
return nil, cache.ErrNotFound }
 func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { 
return nil, cache.ErrNotFound }
 func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error)   { 
return nil, cache.ErrNotFound }
+func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error)       { 
return nil, cache.ErrNotFound }
 func (c *dummyCache) ListRoutes() ([]*v1.Route, error)                 { 
return nil, nil }
 func (c *dummyCache) ListSSL() ([]*v1.Ssl, error)                      { 
return nil, nil }
 func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error)           { 
return nil, nil }
 func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error)     { 
return nil, nil }
 func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error)       { 
return nil, nil }
+func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error)           { 
return nil, nil }
 func (c *dummyCache) DeleteRoute(_ *v1.Route) error                    { 
return nil }
 func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error                        { 
return nil }
 func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error              { 
return nil }
 func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error        { 
return nil }
 func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error          { 
return nil }
+func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error              { 
return nil }
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index d24791f..9efde6c 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -147,3 +147,13 @@ func (i *item) globalRule() (*v1.GlobalRule, error) {
        }
        return &globalRule, nil
 }
+
+// consumer decodes item.Value and converts it to v1.Consumer.
+func (i *item) consumer() (*v1.Consumer, error) {
+       log.Debugf("got consumer: %s", string(i.Value))
+       var consumer v1.Consumer
+       if err := json.Unmarshal(i.Value, &consumer); err != nil {
+               return nil, err
+       }
+       return &consumer, nil
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 4dfc71f..153e97d 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -322,6 +322,15 @@ type GlobalRule struct {
        Plugins Plugins `json:"plugins,omitempty" yaml:"plugins,omitempty"`
 }
 
+// Consumer represents the consumer object in APISIX.
+// +k8s:deepcopy-gen=true
+type Consumer struct {
+       Username string            `json:"username" yaml:"username"`
+       Desc     string            `json:"desc,omitempty" yaml:"desc,omitempty"`
+       Labels   map[string]string `json:"labels,omitempty" 
yaml:"labels,omitempty"`
+       Plugins  Plugins           `json:"plugins,omitempty" 
yaml:"plugins,omitempty"`
+}
+
 // NewDefaultUpstream returns an empty Upstream with default values.
 func NewDefaultUpstream() *Upstream {
        return &Upstream{
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go 
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 9ba4c4f..249f427 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -21,6 +21,30 @@ limitations under the License.
 package v1
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *Consumer) DeepCopyInto(out *Consumer) {
+       *out = *in
+       if in.Labels != nil {
+               in, out := &in.Labels, &out.Labels
+               *out = make(map[string]string, len(*in))
+               for key, val := range *in {
+                       (*out)[key] = val
+               }
+       }
+       in.Plugins.DeepCopyInto(&out.Plugins)
+       return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Consumer.
+func (in *Consumer) DeepCopy() *Consumer {
+       if in == nil {
+               return nil
+       }
+       out := new(Consumer)
+       in.DeepCopyInto(out)
+       return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *CorsConfig) DeepCopyInto(out *CorsConfig) {
        *out = *in
        return

Reply via email to