This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new 5deff9f  feat: handle idempotent key in PrepareConf (#27)
5deff9f is described below

commit 5deff9f2e5ba0b5e5c06d52408b2ebc07a388574
Author: 罗泽轩 <[email protected]>
AuthorDate: Wed Aug 4 09:19:43 2021 +0800

    feat: handle idempotent key in PrepareConf (#27)
---
 go.mod                         |   2 +-
 go.sum                         |   4 ++
 internal/plugin/conf.go        | 104 ++++++++++++++++++++++++++++++-----------
 internal/plugin/conf_test.go   |  80 ++++++++++++++++++++++++++++++-
 internal/plugin/plugin_test.go |   2 +-
 5 files changed, 162 insertions(+), 30 deletions(-)

diff --git a/go.mod b/go.mod
index 7e94fc9..18b7b72 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.15
 
 require (
        github.com/ReneKroon/ttlcache/v2 v2.4.0
-       github.com/api7/ext-plugin-proto v0.1.4
+       github.com/api7/ext-plugin-proto v0.2.1
        github.com/google/flatbuffers v2.0.0+incompatible
        github.com/spf13/cobra v1.1.3
        github.com/stretchr/testify v1.7.0
diff --git a/go.sum b/go.sum
index defc2f9..54dfc62 100644
--- a/go.sum
+++ b/go.sum
@@ -22,6 +22,10 @@ github.com/alecthomas/units 
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/alvaroloes/enumer v1.1.2/go.mod 
h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
 github.com/api7/ext-plugin-proto v0.1.4 
h1:gxw+fmtM2UPsSku+mEOFNTQpAGEOKrkWZvQgIcglaG0=
 github.com/api7/ext-plugin-proto v0.1.4/go.mod 
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.2.0 
h1:XYws0+h7xHlAjSJ7OwmGm51vZwqeEwzTQrgwU0rY0P0=
+github.com/api7/ext-plugin-proto v0.2.0/go.mod 
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.2.1 
h1:NRz4CxPM10KPHAJSv+5jcOMjQBJN8mninu9V6O62Mxw=
+github.com/api7/ext-plugin-proto v0.2.1/go.mod 
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod 
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod 
h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod 
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index cf2e76e..5758f0a 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -19,7 +19,7 @@ package plugin
 
 import (
        "strconv"
-       "sync/atomic"
+       "sync"
        "time"
 
        "github.com/ReneKroon/ttlcache/v2"
@@ -31,35 +31,60 @@ import (
        "github.com/apache/apisix-go-plugin-runner/pkg/log"
 )
 
+var (
+       cache *ConfCache
+)
+
 type ConfEntry struct {
        Name  string
        Value interface{}
 }
 type RuleConf []ConfEntry
 
-var (
-       cache        *ttlcache.Cache
-       cacheCounter uint32 = 0
-)
+type ConfCache struct {
+       lock sync.Mutex
 
-func InitConfCache(ttl time.Duration) {
-       cache = ttlcache.NewCache()
-       err := cache.SetTTL(ttl)
-       if err != nil {
-               log.Fatalf("failed to set global ttl for cache: %s", err)
-       }
-       cache.SkipTTLExtensionOnHit(false)
-       cacheCounter = 0
+       tokenCache *ttlcache.Cache
+       keyCache   *ttlcache.Cache
+
+       tokenCounter uint32
 }
 
-func genCacheToken() uint32 {
-       return atomic.AddUint32(&cacheCounter, 1)
+func newConfCache(ttl time.Duration) *ConfCache {
+       cc := &ConfCache{
+               tokenCounter: 0,
+       }
+       for _, c := range []**ttlcache.Cache{&cc.tokenCache, &cc.keyCache} {
+               cache := ttlcache.NewCache()
+               err := cache.SetTTL(ttl)
+               if err != nil {
+                       log.Fatalf("failed to set global ttl for cache: %s", 
err)
+               }
+               cache.SkipTTLExtensionOnHit(false)
+               *c = cache
+       }
+       return cc
 }
 
-func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
-       req := pc.GetRootAsReq(buf, 0)
-       entries := RuleConf{}
+func (cc *ConfCache) Set(req *pc.Req) (uint32, error) {
+       cc.lock.Lock()
+       defer cc.lock.Unlock()
+
+       key := string(req.Key())
+       // APISIX < 2.9 doesn't send the idempotent key
+       if key != "" {
+               res, err := cc.keyCache.Get(key)
+               if err == nil {
+                       return res.(uint32), nil
+               }
 
+               if err != ttlcache.ErrNotFound {
+                       log.Errorf("failed to get cached token with key: %s", 
err)
+                       // recreate the token
+               }
+       }
+
+       entries := RuleConf{}
        te := A6.TextEntry{}
        for i := 0; i < req.ConfLength(); i++ {
                if req.Conf(&te, i) {
@@ -88,8 +113,37 @@ func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
                }
        }
 
-       token := genCacheToken()
-       err := cache.Set(strconv.FormatInt(int64(token), 10), entries)
+       cc.tokenCounter++
+       token := cc.tokenCounter
+       err := cc.tokenCache.Set(strconv.FormatInt(int64(token), 10), entries)
+       if err != nil {
+               return 0, err
+       }
+
+       err = cc.keyCache.Set(key, token)
+       return token, err
+}
+
+func (cc *ConfCache) SetInTest(token uint32, entries RuleConf) error {
+       return cc.tokenCache.Set(strconv.FormatInt(int64(token), 10), entries)
+}
+
+func (cc *ConfCache) Get(token uint32) (RuleConf, error) {
+       res, err := cc.tokenCache.Get(strconv.FormatInt(int64(token), 10))
+       if err != nil {
+               return nil, err
+       }
+       return res.(RuleConf), err
+}
+
+func InitConfCache(ttl time.Duration) {
+       cache = newConfCache(ttl)
+}
+
+func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
+       req := pc.GetRootAsReq(buf, 0)
+
+       token, err := cache.Set(req)
        if err != nil {
                return nil, err
        }
@@ -103,13 +157,9 @@ func PrepareConf(buf []byte) (*flatbuffers.Builder, error) 
{
 }
 
 func GetRuleConf(token uint32) (RuleConf, error) {
-       res, err := cache.Get(strconv.FormatInt(int64(token), 10))
-       if err != nil {
-               return nil, err
-       }
-       return res.(RuleConf), err
+       return cache.Get(token)
 }
 
-func SetRuleConf(token uint32, conf RuleConf) error {
-       return cache.Set(strconv.FormatInt(int64(token), 10), conf)
+func SetRuleConfInTest(token uint32, conf RuleConf) error {
+       return cache.SetInTest(token, conf)
 }
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index 3950cca..0a44749 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -20,6 +20,7 @@ package plugin
 import (
        "errors"
        "sort"
+       "strconv"
        "sync"
        "testing"
        "time"
@@ -104,7 +105,7 @@ func TestPrepareConfBadConf(t *testing.T) {
        assert.Equal(t, 0, len(res))
 }
 
-func TestPrepareConfConcurrently(t *testing.T) {
+func TestPrepareConfConcurrentlyWithoutKey(t *testing.T) {
        InitConfCache(10 * time.Millisecond)
 
        builder := flatbuffers.NewBuilder(1024)
@@ -139,6 +140,83 @@ func TestPrepareConfConcurrently(t *testing.T) {
        }
 }
 
+func TestPrepareConfConcurrentlyWithTheSameKey(t *testing.T) {
+       InitConfCache(10 * time.Millisecond)
+
+       builder := flatbuffers.NewBuilder(1024)
+       key := builder.CreateString("key")
+       pc.ReqStart(builder)
+       pc.ReqAddKey(builder, key)
+       root := pc.ReqEnd(builder)
+       builder.Finish(root)
+       b := builder.FinishedBytes()
+
+       n := 10
+       var wg sync.WaitGroup
+       res := make([][]byte, n)
+       for i := 0; i < n; i++ {
+               wg.Add(1)
+               go func(i int) {
+                       bd, err := PrepareConf(b)
+                       assert.Nil(t, err)
+                       res[i] = bd.FinishedBytes()[:]
+                       wg.Done()
+               }(i)
+       }
+       wg.Wait()
+
+       tokens := make([]int, n)
+       for i := 0; i < n; i++ {
+               resp := pc.GetRootAsResp(res[i], 0)
+               tokens[i] = int(resp.ConfToken())
+       }
+
+       sort.Ints(tokens)
+       for i := 0; i < n; i++ {
+               assert.Equal(t, 1, tokens[i])
+       }
+}
+
+func TestPrepareConfConcurrentlyWithTheDifferentKey(t *testing.T) {
+       InitConfCache(10 * time.Millisecond)
+
+       builder := flatbuffers.NewBuilder(1024)
+       n := 10
+       var wg sync.WaitGroup
+       var lock sync.Mutex
+       res := make([][]byte, n)
+       for i := 0; i < n; i++ {
+               wg.Add(1)
+               go func(i int) {
+                       lock.Lock()
+                       key := builder.CreateString(strconv.Itoa(i))
+                       pc.ReqStart(builder)
+                       pc.ReqAddKey(builder, key)
+                       root := pc.ReqEnd(builder)
+                       builder.Finish(root)
+                       b := builder.FinishedBytes()
+                       lock.Unlock()
+
+                       bd, err := PrepareConf(b)
+                       assert.Nil(t, err)
+                       res[i] = bd.FinishedBytes()[:]
+                       wg.Done()
+               }(i)
+       }
+       wg.Wait()
+
+       tokens := make([]int, n)
+       for i := 0; i < n; i++ {
+               resp := pc.GetRootAsResp(res[i], 0)
+               tokens[i] = int(resp.ConfToken())
+       }
+
+       sort.Ints(tokens)
+       for i := 0; i < n; i++ {
+               assert.Equal(t, i+1, tokens[i])
+       }
+}
+
 func TestGetRuleConf(t *testing.T) {
        InitConfCache(1 * time.Millisecond)
        builder := flatbuffers.NewBuilder(1024)
diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go
index 94ac093..e5c860d 100644
--- a/internal/plugin/plugin_test.go
+++ b/internal/plugin/plugin_test.go
@@ -43,7 +43,7 @@ var (
 
 func TestHTTPReqCall(t *testing.T) {
        InitConfCache(10 * time.Millisecond)
-       SetRuleConf(1, RuleConf{})
+       SetRuleConfInTest(1, RuleConf{})
 
        builder := flatbuffers.NewBuilder(1024)
        hrc.ReqStart(builder)

Reply via email to