This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git
The following commit(s) were added to refs/heads/master by this push:
new 5deff9f feat: handle idempotent key in PrepareConf (#27)
5deff9f is described below
commit 5deff9f2e5ba0b5e5c06d52408b2ebc07a388574
Author: 罗泽轩 <[email protected]>
AuthorDate: Wed Aug 4 09:19:43 2021 +0800
feat: handle idempotent key in PrepareConf (#27)
---
go.mod | 2 +-
go.sum | 4 ++
internal/plugin/conf.go | 104 ++++++++++++++++++++++++++++++-----------
internal/plugin/conf_test.go | 80 ++++++++++++++++++++++++++++++-
internal/plugin/plugin_test.go | 2 +-
5 files changed, 162 insertions(+), 30 deletions(-)
diff --git a/go.mod b/go.mod
index 7e94fc9..18b7b72 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.15
require (
github.com/ReneKroon/ttlcache/v2 v2.4.0
- github.com/api7/ext-plugin-proto v0.1.4
+ github.com/api7/ext-plugin-proto v0.2.1
github.com/google/flatbuffers v2.0.0+incompatible
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
diff --git a/go.sum b/go.sum
index defc2f9..54dfc62 100644
--- a/go.sum
+++ b/go.sum
@@ -22,6 +22,10 @@ github.com/alecthomas/units
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alvaroloes/enumer v1.1.2/go.mod
h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
github.com/api7/ext-plugin-proto v0.1.4
h1:gxw+fmtM2UPsSku+mEOFNTQpAGEOKrkWZvQgIcglaG0=
github.com/api7/ext-plugin-proto v0.1.4/go.mod
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.2.0
h1:XYws0+h7xHlAjSJ7OwmGm51vZwqeEwzTQrgwU0rY0P0=
+github.com/api7/ext-plugin-proto v0.2.0/go.mod
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.2.1
h1:NRz4CxPM10KPHAJSv+5jcOMjQBJN8mninu9V6O62Mxw=
+github.com/api7/ext-plugin-proto v0.2.1/go.mod
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod
h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index cf2e76e..5758f0a 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -19,7 +19,7 @@ package plugin
import (
"strconv"
- "sync/atomic"
+ "sync"
"time"
"github.com/ReneKroon/ttlcache/v2"
@@ -31,35 +31,60 @@ import (
"github.com/apache/apisix-go-plugin-runner/pkg/log"
)
+var (
+ cache *ConfCache
+)
+
type ConfEntry struct {
Name string
Value interface{}
}
type RuleConf []ConfEntry
-var (
- cache *ttlcache.Cache
- cacheCounter uint32 = 0
-)
+type ConfCache struct {
+ lock sync.Mutex
-func InitConfCache(ttl time.Duration) {
- cache = ttlcache.NewCache()
- err := cache.SetTTL(ttl)
- if err != nil {
- log.Fatalf("failed to set global ttl for cache: %s", err)
- }
- cache.SkipTTLExtensionOnHit(false)
- cacheCounter = 0
+ tokenCache *ttlcache.Cache
+ keyCache *ttlcache.Cache
+
+ tokenCounter uint32
}
-func genCacheToken() uint32 {
- return atomic.AddUint32(&cacheCounter, 1)
+func newConfCache(ttl time.Duration) *ConfCache {
+ cc := &ConfCache{
+ tokenCounter: 0,
+ }
+ for _, c := range []**ttlcache.Cache{&cc.tokenCache, &cc.keyCache} {
+ cache := ttlcache.NewCache()
+ err := cache.SetTTL(ttl)
+ if err != nil {
+ log.Fatalf("failed to set global ttl for cache: %s",
err)
+ }
+ cache.SkipTTLExtensionOnHit(false)
+ *c = cache
+ }
+ return cc
}
-func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
- req := pc.GetRootAsReq(buf, 0)
- entries := RuleConf{}
+func (cc *ConfCache) Set(req *pc.Req) (uint32, error) {
+ cc.lock.Lock()
+ defer cc.lock.Unlock()
+
+ key := string(req.Key())
+ // APISIX < 2.9 doesn't send the idempotent key
+ if key != "" {
+ res, err := cc.keyCache.Get(key)
+ if err == nil {
+ return res.(uint32), nil
+ }
+ if err != ttlcache.ErrNotFound {
+ log.Errorf("failed to get cached token with key: %s",
err)
+ // recreate the token
+ }
+ }
+
+ entries := RuleConf{}
te := A6.TextEntry{}
for i := 0; i < req.ConfLength(); i++ {
if req.Conf(&te, i) {
@@ -88,8 +113,37 @@ func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
}
}
- token := genCacheToken()
- err := cache.Set(strconv.FormatInt(int64(token), 10), entries)
+ cc.tokenCounter++
+ token := cc.tokenCounter
+ err := cc.tokenCache.Set(strconv.FormatInt(int64(token), 10), entries)
+ if err != nil {
+ return 0, err
+ }
+
+ err = cc.keyCache.Set(key, token)
+ return token, err
+}
+
+func (cc *ConfCache) SetInTest(token uint32, entries RuleConf) error {
+ return cc.tokenCache.Set(strconv.FormatInt(int64(token), 10), entries)
+}
+
+func (cc *ConfCache) Get(token uint32) (RuleConf, error) {
+ res, err := cc.tokenCache.Get(strconv.FormatInt(int64(token), 10))
+ if err != nil {
+ return nil, err
+ }
+ return res.(RuleConf), err
+}
+
+func InitConfCache(ttl time.Duration) {
+ cache = newConfCache(ttl)
+}
+
+func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
+ req := pc.GetRootAsReq(buf, 0)
+
+ token, err := cache.Set(req)
if err != nil {
return nil, err
}
@@ -103,13 +157,9 @@ func PrepareConf(buf []byte) (*flatbuffers.Builder, error)
{
}
func GetRuleConf(token uint32) (RuleConf, error) {
- res, err := cache.Get(strconv.FormatInt(int64(token), 10))
- if err != nil {
- return nil, err
- }
- return res.(RuleConf), err
+ return cache.Get(token)
}
-func SetRuleConf(token uint32, conf RuleConf) error {
- return cache.Set(strconv.FormatInt(int64(token), 10), conf)
+func SetRuleConfInTest(token uint32, conf RuleConf) error {
+ return cache.SetInTest(token, conf)
}
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index 3950cca..0a44749 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -20,6 +20,7 @@ package plugin
import (
"errors"
"sort"
+ "strconv"
"sync"
"testing"
"time"
@@ -104,7 +105,7 @@ func TestPrepareConfBadConf(t *testing.T) {
assert.Equal(t, 0, len(res))
}
-func TestPrepareConfConcurrently(t *testing.T) {
+func TestPrepareConfConcurrentlyWithoutKey(t *testing.T) {
InitConfCache(10 * time.Millisecond)
builder := flatbuffers.NewBuilder(1024)
@@ -139,6 +140,83 @@ func TestPrepareConfConcurrently(t *testing.T) {
}
}
+func TestPrepareConfConcurrentlyWithTheSameKey(t *testing.T) {
+ InitConfCache(10 * time.Millisecond)
+
+ builder := flatbuffers.NewBuilder(1024)
+ key := builder.CreateString("key")
+ pc.ReqStart(builder)
+ pc.ReqAddKey(builder, key)
+ root := pc.ReqEnd(builder)
+ builder.Finish(root)
+ b := builder.FinishedBytes()
+
+ n := 10
+ var wg sync.WaitGroup
+ res := make([][]byte, n)
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(i int) {
+ bd, err := PrepareConf(b)
+ assert.Nil(t, err)
+ res[i] = bd.FinishedBytes()[:]
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+
+ tokens := make([]int, n)
+ for i := 0; i < n; i++ {
+ resp := pc.GetRootAsResp(res[i], 0)
+ tokens[i] = int(resp.ConfToken())
+ }
+
+ sort.Ints(tokens)
+ for i := 0; i < n; i++ {
+ assert.Equal(t, 1, tokens[i])
+ }
+}
+
+func TestPrepareConfConcurrentlyWithTheDifferentKey(t *testing.T) {
+ InitConfCache(10 * time.Millisecond)
+
+ builder := flatbuffers.NewBuilder(1024)
+ n := 10
+ var wg sync.WaitGroup
+ var lock sync.Mutex
+ res := make([][]byte, n)
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(i int) {
+ lock.Lock()
+ key := builder.CreateString(strconv.Itoa(i))
+ pc.ReqStart(builder)
+ pc.ReqAddKey(builder, key)
+ root := pc.ReqEnd(builder)
+ builder.Finish(root)
+ b := builder.FinishedBytes()
+ lock.Unlock()
+
+ bd, err := PrepareConf(b)
+ assert.Nil(t, err)
+ res[i] = bd.FinishedBytes()[:]
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+
+ tokens := make([]int, n)
+ for i := 0; i < n; i++ {
+ resp := pc.GetRootAsResp(res[i], 0)
+ tokens[i] = int(resp.ConfToken())
+ }
+
+ sort.Ints(tokens)
+ for i := 0; i < n; i++ {
+ assert.Equal(t, i+1, tokens[i])
+ }
+}
+
func TestGetRuleConf(t *testing.T) {
InitConfCache(1 * time.Millisecond)
builder := flatbuffers.NewBuilder(1024)
diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go
index 94ac093..e5c860d 100644
--- a/internal/plugin/plugin_test.go
+++ b/internal/plugin/plugin_test.go
@@ -43,7 +43,7 @@ var (
func TestHTTPReqCall(t *testing.T) {
InitConfCache(10 * time.Millisecond)
- SetRuleConf(1, RuleConf{})
+ SetRuleConfInTest(1, RuleConf{})
builder := flatbuffers.NewBuilder(1024)
hrc.ReqStart(builder)