This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e186f9  feat: register & run plugin (#3)
5e186f9 is described below

commit 5e186f907ee5d8aa00deed3c9efd887b40abce85
Author: 罗泽轩 <[email protected]>
AuthorDate: Wed Jun 2 21:32:17 2021 +0800

    feat: register & run plugin (#3)
---
 README.md                      |  1 +
 internal/http/response.go      | 10 ++++--
 internal/log/log.go            |  2 +-
 internal/plugin/conf.go        | 26 ++++++++++++--
 internal/plugin/conf_test.go   | 55 +++++++++++++++++++++++++++++
 internal/plugin/plugin.go      | 78 +++++++++++++++++++++++++++++++++++++++---
 internal/plugin/plugin_test.go | 72 ++++++++++++++++++++++++++++++++++++++
 internal/server/server.go      | 14 ++++----
 pkg/plugin/plugin.go           | 44 ++++++++++++++++++++++++
 9 files changed, 286 insertions(+), 16 deletions(-)

diff --git a/README.md b/README.md
index 0b9b041..413bc17 100644
--- a/README.md
+++ b/README.md
@@ -20,3 +20,4 @@
 # Go Plugin Runner for Apache APISIX
 
 [![Go Report 
Card](https://goreportcard.com/badge/github.com/apache/apisix-go-plugin-runner)](https://goreportcard.com/report/github.com/apache/apisix-go-plugin-runner)
+[![Build 
Status](https://github.com/apache/apisix-go-plugin-runner/workflows/unit-test-ci/badge.svg?branch=master)](https://github.com/apache/apisix-go-plugin-runner/actions)
diff --git a/internal/http/response.go b/internal/http/response.go
index d0aa8a4..4b2f64b 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -31,7 +31,9 @@ type Response struct {
 }
 
 func (r *Response) Header() http.Header {
-       r.hdr = http.Header{}
+       if r.hdr == nil {
+               r.hdr = http.Header{}
+       }
        return r.hdr
 }
 
@@ -58,8 +60,12 @@ func (r *Response) Reset() {
        r.hdr = nil
 }
 
+func (r *Response) HasChange() bool {
+       return !(r.body == nil && r.code == 0 && len(r.hdr) == 0)
+}
+
 func (r *Response) FetchChanges(id uint32, builder *flatbuffers.Builder) bool {
-       if r.body == nil && r.code == 0 && len(r.hdr) == 0 {
+       if !r.HasChange() {
                return false
        }
 
diff --git a/internal/log/log.go b/internal/log/log.go
index b68ec2a..90f0026 100644
--- a/internal/log/log.go
+++ b/internal/log/log.go
@@ -33,7 +33,7 @@ func newLogger() *zap.SugaredLogger {
                zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
                os.Stdout,
                level)
-       lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller())
+       lg := zap.New(core, zap.AddStacktrace(zap.ErrorLevel), zap.AddCaller(), 
zap.AddCallerSkip(1))
        return lg.Sugar()
 }
 
diff --git a/internal/plugin/conf.go b/internal/plugin/conf.go
index 5c4dccf..20485fe 100644
--- a/internal/plugin/conf.go
+++ b/internal/plugin/conf.go
@@ -54,13 +54,33 @@ func genCacheToken() uint32 {
 
 func PrepareConf(buf []byte) (*flatbuffers.Builder, error) {
        req := pc.GetRootAsReq(buf, 0)
-       entries := make(RuleConf, req.ConfLength())
+       entries := RuleConf{}
 
        te := A6.TextEntry{}
        for i := 0; i < req.ConfLength(); i++ {
                if req.Conf(&te, i) {
-                       entries[i].Name = string(te.Name())
-                       entries[i].Value = te.Value
+                       name := string(te.Name())
+                       plugin := findPlugin(name)
+                       if plugin == nil {
+                               log.Warnf("can't find plugin %s, skip", name)
+                               continue
+                       }
+
+                       log.Infof("prepare conf for plugin %s", name)
+
+                       v := te.Value()
+                       conf, err := plugin.ParseConf(v)
+                       if err != nil {
+                               log.Errorf(
+                                       "failed to parse configuration for 
plugin %s, configuration: %s",
+                                       name, string(v))
+                               continue
+                       }
+
+                       entries = append(entries, ConfEntry{
+                               Name:  name,
+                               Value: conf,
+                       })
                }
        }
 
diff --git a/internal/plugin/conf_test.go b/internal/plugin/conf_test.go
index aa2ded4..a84dcea 100644
--- a/internal/plugin/conf_test.go
+++ b/internal/plugin/conf_test.go
@@ -15,6 +15,7 @@
 package plugin
 
 import (
+       "errors"
        "sort"
        "sync"
        "testing"
@@ -47,6 +48,59 @@ func TestPrepareConf(t *testing.T) {
        assert.Equal(t, uint32(2), resp.ConfToken())
 }
 
+func prepareConfWithData(builder *flatbuffers.Builder, arg 
...flatbuffers.UOffsetT) {
+       tes := []flatbuffers.UOffsetT{}
+       for i := 0; i < len(arg); i += 2 {
+               A6.TextEntryStart(builder)
+               name := arg[i]
+               value := arg[i+1]
+               A6.TextEntryAddName(builder, name)
+               A6.TextEntryAddValue(builder, value)
+               te := A6.TextEntryEnd(builder)
+               tes = append(tes, te)
+       }
+
+       pc.ReqStartConfVector(builder, len(tes))
+       for i := len(tes) - 1; i >= 0; i-- {
+               builder.PrependUOffsetT(tes[i])
+       }
+       v := builder.EndVector(len(tes))
+
+       pc.ReqStart(builder)
+       pc.ReqAddConf(builder, v)
+       root := pc.ReqEnd(builder)
+       builder.Finish(root)
+       b := builder.FinishedBytes()
+
+       PrepareConf(b)
+}
+
+func TestPrepareConfUnknownPlugin(t *testing.T) {
+       InitConfCache(1 * time.Millisecond)
+       builder := flatbuffers.NewBuilder(1024)
+
+       name := builder.CreateString("xxx")
+       value := builder.CreateString(`{"body":"yes"}`)
+       prepareConfWithData(builder, name, value)
+       res, _ := GetRuleConf(1)
+       assert.Equal(t, 0, len(res))
+}
+
+func TestPrepareConfBadConf(t *testing.T) {
+       InitConfCache(1 * time.Millisecond)
+       builder := flatbuffers.NewBuilder(1024)
+
+       f := func(in []byte) (conf interface{}, err error) {
+               return nil, errors.New("ouch")
+       }
+       RegisterPlugin("bad_conf", f, emptyFilter)
+       name := builder.CreateString("bad_conf")
+       value := builder.CreateString(`{"body":"yes"}`)
+       prepareConfWithData(builder, name, value)
+       res, _ := GetRuleConf(1)
+       assert.Equal(t, 0, len(res))
+}
+
 func TestPrepareConfConcurrently(t *testing.T) {
        InitConfCache(10 * time.Millisecond)
 
@@ -104,6 +158,7 @@ func TestGetRuleConf(t *testing.T) {
 }
 
 func TestGetRuleConfCheckConf(t *testing.T) {
+       RegisterPlugin("echo", emptyParseConf, emptyFilter)
        InitConfCache(1 * time.Millisecond)
        builder := flatbuffers.NewBuilder(1024)
 
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index b7400b0..7fa6136 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -15,18 +15,89 @@
 package plugin
 
 import (
-       "context"
+       "errors"
+       "fmt"
        "net/http"
 
        hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
        flatbuffers "github.com/google/flatbuffers/go"
 
        inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+       "github.com/apache/apisix-go-plugin-runner/internal/log"
        "github.com/apache/apisix-go-plugin-runner/internal/util"
        pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
 )
 
-func handle(conf RuleConf, ctx context.Context, w http.ResponseWriter, r 
pkgHTTP.Request) error {
+type ParseConfFunc func(in []byte) (conf interface{}, err error)
+type FilterFunc func(conf interface{}, w http.ResponseWriter, r 
pkgHTTP.Request)
+
+type pluginOpts struct {
+       ParseConf ParseConfFunc
+       Filter    FilterFunc
+}
+
+type ErrPluginRegistered struct {
+       name string
+}
+
+func (err ErrPluginRegistered) Error() string {
+       return fmt.Sprintf("plugin %s registered", err.name)
+}
+
+var (
+       pluginRegistry = map[string]*pluginOpts{}
+
+       ErrMissingName            = errors.New("missing name")
+       ErrMissingParseConfMethod = errors.New("missing ParseConf method")
+       ErrMissingFilterMethod    = errors.New("missing Filter method")
+)
+
+func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+       if name == "" {
+               return ErrMissingName
+       }
+       if pc == nil {
+               return ErrMissingParseConfMethod
+       }
+       if sv == nil {
+               return ErrMissingFilterMethod
+       }
+
+       opt := &pluginOpts{
+               ParseConf: pc,
+               Filter:    sv,
+       }
+       if _, found := pluginRegistry[name]; found {
+               return ErrPluginRegistered{name}
+       }
+       pluginRegistry[name] = opt
+       return nil
+}
+
+func findPlugin(name string) *pluginOpts {
+       if opt, found := pluginRegistry[name]; found {
+               return opt
+       }
+       return nil
+}
+
+func filter(conf RuleConf, w *inHTTP.Response, r pkgHTTP.Request) error {
+       for _, c := range conf {
+               plugin := findPlugin(c.Name)
+               if plugin == nil {
+                       log.Warnf("can't find plugin %s, skip", c.Name)
+                       continue
+               }
+
+               log.Infof("run plugin %s", c.Name)
+
+               plugin.Filter(c.Value, w, r)
+
+               if w.HasChange() {
+                       // response is generated, no need to continue
+                       break
+               }
+       }
        return nil
 }
 
@@ -61,8 +132,7 @@ func HTTPReqCall(buf []byte) (*flatbuffers.Builder, error) {
                return nil, err
        }
 
-       ctx := context.Background()
-       err = handle(conf, ctx, resp, req)
+       err = filter(conf, resp, req)
        if err != nil {
                return nil, err
        }
diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go
index c433f3f..eb942df 100644
--- a/internal/plugin/plugin_test.go
+++ b/internal/plugin/plugin_test.go
@@ -15,12 +15,26 @@
 package plugin
 
 import (
+       "net/http"
        "testing"
        "time"
 
        hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall"
        flatbuffers "github.com/google/flatbuffers/go"
        "github.com/stretchr/testify/assert"
+
+       inHTTP "github.com/apache/apisix-go-plugin-runner/internal/http"
+       pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+var (
+       emptyParseConf = func(in []byte) (conf interface{}, err error) {
+               return string(in), nil
+       }
+
+       emptyFilter = func(conf interface{}, w http.ResponseWriter, r 
pkgHTTP.Request) {
+               return
+       }
 )
 
 func TestHTTPReqCall(t *testing.T) {
@@ -43,3 +57,61 @@ func TestHTTPReqCall(t *testing.T) {
        assert.Equal(t, uint32(233), resp.Id())
        assert.Equal(t, hrc.ActionNONE, resp.ActionType())
 }
+
+func TestRegisterPlugin(t *testing.T) {
+       assert.Equal(t, ErrMissingParseConfMethod,
+               RegisterPlugin("bad_conf", nil, emptyFilter))
+       assert.Equal(t, ErrMissingFilterMethod,
+               RegisterPlugin("bad_conf", emptyParseConf, nil))
+}
+
+func TestFilter(t *testing.T) {
+       fooParseConf := func(in []byte) (conf interface{}, err error) {
+               return "foo", nil
+       }
+       fooFilter := func(conf interface{}, w http.ResponseWriter, r 
pkgHTTP.Request) {
+               w.Header().Add("foo", "bar")
+               assert.Equal(t, "foo", conf.(string))
+       }
+       barParseConf := func(in []byte) (conf interface{}, err error) {
+               return "bar", nil
+       }
+       barFilter := func(conf interface{}, w http.ResponseWriter, r 
pkgHTTP.Request) {
+               r.Header().Set("foo", "bar")
+               assert.Equal(t, "bar", conf.(string))
+       }
+
+       RegisterPlugin("foo", fooParseConf, fooFilter)
+       RegisterPlugin("bar", barParseConf, barFilter)
+
+       builder := flatbuffers.NewBuilder(1024)
+       fooName := builder.CreateString("foo")
+       fooConf := builder.CreateString("foo")
+       barName := builder.CreateString("bar")
+       barConf := builder.CreateString("bar")
+       prepareConfWithData(builder, fooName, fooConf, barName, barConf)
+
+       res, _ := GetRuleConf(1)
+       hrc.ReqStart(builder)
+       hrc.ReqAddId(builder, 233)
+       hrc.ReqAddConfToken(builder, 1)
+       r := hrc.ReqEnd(builder)
+       builder.Finish(r)
+       out := builder.FinishedBytes()
+
+       req := inHTTP.CreateRequest(out)
+       resp := inHTTP.CreateResponse()
+       filter(res, resp, req)
+
+       assert.Equal(t, "bar", resp.Header().Get("foo"))
+       assert.Equal(t, "", req.Header().Get("foo"))
+
+       req = inHTTP.CreateRequest(out)
+       resp = inHTTP.CreateResponse()
+       prepareConfWithData(builder, barName, barConf, fooName, fooConf)
+       res, _ = GetRuleConf(2)
+       filter(res, resp, req)
+
+       assert.Equal(t, "bar", resp.Header().Get("foo"))
+       assert.Equal(t, "bar", req.Header().Get("foo"))
+}
diff --git a/internal/server/server.go b/internal/server/server.go
index fe2dcea..25e9718 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -16,7 +16,6 @@ package server
 
 import (
        "encoding/binary"
-       "errors"
        "fmt"
        "io"
        "net"
@@ -27,10 +26,11 @@ import (
        "syscall"
        "time"
 
+       flatbuffers "github.com/google/flatbuffers/go"
+
        "github.com/apache/apisix-go-plugin-runner/internal/log"
        "github.com/apache/apisix-go-plugin-runner/internal/plugin"
        "github.com/apache/apisix-go-plugin-runner/internal/util"
-       flatbuffers "github.com/google/flatbuffers/go"
 )
 
 const (
@@ -48,11 +48,13 @@ const (
 )
 
 func readErr(n int, err error, required int) bool {
-       if n < required {
-               err = errors.New("truncated")
+       if 0 < n && n < required {
+               err = fmt.Errorf("truncated, only get the first %d bytes", n)
        }
-       if err != nil && err != io.EOF {
-               log.Errorf("read: %s", err)
+       if err != nil {
+               if err != io.EOF {
+                       log.Errorf("read: %s", err)
+               }
                return true
        }
        return false
diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go
new file mode 100644
index 0000000..7ea4bd8
--- /dev/null
+++ b/pkg/plugin/plugin.go
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package plugin
+
+import (
+       "net/http"
+
+       "github.com/apache/apisix-go-plugin-runner/internal/plugin"
+       pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+)
+
+// PluginOpts represents the attributes of the Plugin
+type PluginOpts struct {
+       // Name (required) is the plguin name
+       Name string
+       // ParseConf (required) is the method to parse given plugin 
configuration. When the
+       // configuration can't be parsed, it will be skipped.
+       ParseConf func(in []byte) (conf interface{}, err error)
+       // Filter (required) is the method to handle request.
+       // It is like the `http.ServeHTTP`, plus the ctx and the configuration 
created by
+       // ParseConf.
+       //
+       // When the `w` is written, the execution of plugin chain will be 
stopped.
+       // We don't use onion model like Gin/Caddy because we don't serve the 
whole request lifecycle
+       // inside the runner. The plugin is only a filter running at one stage.
+       Filter func(conf interface{}, w http.ResponseWriter, r pkgHTTP.Request)
+}
+
+// RegisterPlugin register a plugin. Plugin which has the same name can't be 
registered twice.
+func RegisterPlugin(opt *PluginOpts) error {
+       return plugin.RegisterPlugin(opt.Name, opt.ParseConf, opt.Filter)
+}

Reply via email to