This is an automated email from the ASF dual-hosted git repository.
soulbird pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git
The following commit(s) were added to refs/heads/master by this push:
new 8f8d06f feat: support get response body by extra_info (#107)
8f8d06f is described below
commit 8f8d06fa2faea1b5074aabd337475c867ff3dfff
Author: soulbird <[email protected]>
AuthorDate: Wed Sep 28 09:23:45 2022 +0800
feat: support get response body by extra_info (#107)
* feat: support get response body by extra_info
Co-authored-by: soulbird <[email protected]>
---
go.mod | 2 +-
go.sum | 4 +-
internal/http/response.go | 122 +++++++++++++++++++++++++++++
internal/http/response_test.go | 174 +++++++++++++++++++++++++++++++++++++++++
internal/plugin/plugin.go | 1 +
pkg/http/http.go | 17 ++++
pkg/httptest/recorder.go | 23 +++++-
7 files changed, 339 insertions(+), 4 deletions(-)
diff --git a/go.mod b/go.mod
index 8f963cb..4f7915f 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.15
require (
github.com/ReneKroon/ttlcache/v2 v2.4.0
- github.com/api7/ext-plugin-proto v0.5.0
+ github.com/api7/ext-plugin-proto v0.6.0
github.com/google/flatbuffers v2.0.0+incompatible
github.com/spf13/cobra v1.2.1
github.com/stretchr/testify v1.7.0
diff --git a/go.sum b/go.sum
index 57151a2..ff5ba33 100644
--- a/go.sum
+++ b/go.sum
@@ -43,8 +43,8 @@ github.com/ReneKroon/ttlcache/v2 v2.4.0
h1:KywGhjik+ZFTDXMNLiPECSzmdx2yNvAlDNKES
github.com/ReneKroon/ttlcache/v2 v2.4.0/go.mod
h1:zbo6Pv/28e21Z8CzzqgYRArQYGYtjONRxaAKGxzQvG4=
github.com/alvaroloes/enumer v1.1.2/go.mod
h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
github.com/antihax/optional v1.0.0/go.mod
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/api7/ext-plugin-proto v0.5.0
h1:YcqPfluhA6qkFO8tZX2fPyKNgfB/s+MF/dK+k4QnzzY=
-github.com/api7/ext-plugin-proto v0.5.0/go.mod
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.6.0
h1:xmgcKwWRiM9EpBIs1wYJ7Ife/YnLl4IL2NEy4417g60=
+github.com/api7/ext-plugin-proto v0.6.0/go.mod
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
diff --git a/internal/http/response.go b/internal/http/response.go
index 19e04ef..7c97f42 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -19,10 +19,17 @@ package http
import (
"bytes"
+ "encoding/binary"
+ "net"
"net/http"
"sync"
+ "github.com/apache/apisix-go-plugin-runner/internal/util"
+ "github.com/apache/apisix-go-plugin-runner/pkg/common"
+ ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo"
+
pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+ "github.com/apache/apisix-go-plugin-runner/pkg/log"
"github.com/api7/ext-plugin-proto/go/A6"
hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPRespCall"
flatbuffers "github.com/google/flatbuffers/go"
@@ -31,12 +38,73 @@ import (
type Response struct {
r *hrc.Req
+ conn net.Conn
+ extraInfoHeader []byte
+
hdr *Header
rawHdr http.Header
statusCode int
body *bytes.Buffer
+
+ vars map[string][]byte
+ // originBody is read-only
+ originBody []byte
+}
+
+func (r *Response) askExtraInfo(builder *flatbuffers.Builder,
+ infoType ei.Info, info flatbuffers.UOffsetT) ([]byte, error) {
+
+ ei.ReqStart(builder)
+ ei.ReqAddInfoType(builder, infoType)
+ ei.ReqAddInfo(builder, info)
+ eiRes := ei.ReqEnd(builder)
+ builder.Finish(eiRes)
+
+ c := r.conn
+ if len(r.extraInfoHeader) == 0 {
+ r.extraInfoHeader = make([]byte, util.HeaderLen)
+ }
+ header := r.extraInfoHeader
+
+ out := builder.FinishedBytes()
+ size := len(out)
+ binary.BigEndian.PutUint32(header, uint32(size))
+ header[0] = util.RPCExtraInfo
+
+ n, err := c.Write(header)
+ if err != nil {
+ util.WriteErr(n, err)
+ return nil, common.ErrConnClosed
+ }
+
+ n, err = c.Write(out)
+ if err != nil {
+ util.WriteErr(n, err)
+ return nil, common.ErrConnClosed
+ }
+
+ n, err = c.Read(header)
+ if util.ReadErr(n, err, util.HeaderLen) {
+ return nil, common.ErrConnClosed
+ }
+
+ ty := header[0]
+ header[0] = 0
+ length := binary.BigEndian.Uint32(header)
+
+ log.Infof("receive rpc type: %d data length: %d", ty, length)
+
+ buf := make([]byte, length)
+ n, err = c.Read(buf)
+ if util.ReadErr(n, err, int(length)) {
+ return nil, common.ErrConnClosed
+ }
+
+ resp := ei.GetRootAsResp(buf, 0)
+ res := resp.ResultBytes()
+ return res, nil
}
func (r *Response) ID() uint32 {
@@ -75,6 +143,53 @@ func (r *Response) Write(b []byte) (int, error) {
return r.body.Write(b)
}
+func (r *Response) Var(name string) ([]byte, error) {
+ if r.vars == nil {
+ r.vars = map[string][]byte{}
+ }
+
+ var v []byte
+ var found bool
+
+ if v, found = r.vars[name]; !found {
+ var err error
+
+ builder := util.GetBuilder()
+ varName := builder.CreateString(name)
+ ei.VarStart(builder)
+ ei.VarAddName(builder, varName)
+ varInfo := ei.VarEnd(builder)
+ v, err = r.askExtraInfo(builder, ei.InfoVar, varInfo)
+ util.PutBuilder(builder)
+
+ if err != nil {
+ return nil, err
+ }
+
+ r.vars[name] = v
+ }
+ return v, nil
+}
+
+func (r *Response) ReadBody() ([]byte, error) {
+ if len(r.originBody) > 0 {
+ return r.originBody, nil
+ }
+
+ builder := util.GetBuilder()
+ ei.ReqBodyStart(builder)
+ bodyInfo := ei.ReqBodyEnd(builder)
+ v, err := r.askExtraInfo(builder, ei.InfoRespBody, bodyInfo)
+ util.PutBuilder(builder)
+
+ if err != nil {
+ return nil, err
+ }
+
+ r.originBody = v
+ return v, nil
+}
+
func (r *Response) WriteHeader(statusCode int) {
if r.statusCode != 0 {
// official WriteHeader can't override written status
@@ -158,10 +273,17 @@ func (r *Response) FetchChanges(builder
*flatbuffers.Builder) bool {
return true
}
+func (r *Response) BindConn(c net.Conn) {
+ r.conn = c
+}
+
func (r *Response) Reset() {
r.body = nil
r.statusCode = 0
r.hdr = nil
+ r.conn = nil
+ r.vars = nil
+ r.originBody = nil
}
var respPool = sync.Pool{
diff --git a/internal/http/response_test.go b/internal/http/response_test.go
index 710cf7b..128bf73 100644
--- a/internal/http/response_test.go
+++ b/internal/http/response_test.go
@@ -18,9 +18,14 @@
package http
import (
+ "encoding/binary"
+ "net"
"net/http"
"testing"
+ "github.com/apache/apisix-go-plugin-runner/pkg/common"
+ ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo"
+
"github.com/apache/apisix-go-plugin-runner/internal/util"
"github.com/api7/ext-plugin-proto/go/A6"
hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPRespCall"
@@ -187,3 +192,172 @@ func TestResponse_Write(t *testing.T) {
assert.Equal(t, []byte("hello world"), resp.BodyBytes())
ReuseResponse(r)
}
+
+func TestResponse_Var(t *testing.T) {
+ out := buildRespReq(respReqOpt{})
+ r := CreateResponse(out)
+
+ cc, sc := net.Pipe()
+ r.BindConn(cc)
+
+ go func() {
+ header := make([]byte, util.HeaderLen)
+ n, err := sc.Read(header)
+ if util.ReadErr(n, err, util.HeaderLen) {
+ return
+ }
+
+ ty := header[0]
+ assert.Equal(t, byte(util.RPCExtraInfo), ty)
+ header[0] = 0
+ length := binary.BigEndian.Uint32(header)
+
+ buf := make([]byte, length)
+ n, err = sc.Read(buf)
+ if util.ReadErr(n, err, int(length)) {
+ return
+ }
+
+ req := ei.GetRootAsReq(buf, 0)
+ info := getVarInfo(t, req)
+ assert.Equal(t, "request_time", string(info.Name()))
+
+ builder := util.GetBuilder()
+ res := builder.CreateByteVector([]byte("1.0"))
+ ei.RespStart(builder)
+ ei.RespAddResult(builder, res)
+ eiRes := ei.RespEnd(builder)
+ builder.Finish(eiRes)
+ out := builder.FinishedBytes()
+ size := len(out)
+ binary.BigEndian.PutUint32(header, uint32(size))
+ header[0] = util.RPCExtraInfo
+
+ n, err = sc.Write(header)
+ if err != nil {
+ util.WriteErr(n, err)
+ return
+ }
+
+ n, err = sc.Write(out)
+ if err != nil {
+ util.WriteErr(n, err)
+ return
+ }
+ }()
+
+ for i := 0; i < 2; i++ {
+ v, err := r.Var("request_time")
+ assert.Nil(t, err)
+ assert.Equal(t, "1.0", string(v))
+ }
+}
+
+func TestResponse_Var_FailedToSendExtraInfoReq(t *testing.T) {
+ out := buildRespReq(respReqOpt{})
+ r := CreateResponse(out)
+
+ cc, sc := net.Pipe()
+ r.BindConn(cc)
+
+ go func() {
+ header := make([]byte, util.HeaderLen)
+ n, err := sc.Read(header)
+ if util.ReadErr(n, err, util.HeaderLen) {
+ return
+ }
+ sc.Close()
+ }()
+
+ _, err := r.Var("request_time")
+ assert.Equal(t, common.ErrConnClosed, err)
+}
+
+func TestResponse_FailedToReadExtraInfoResp(t *testing.T) {
+ out := buildRespReq(respReqOpt{})
+ r := CreateResponse(out)
+
+ cc, sc := net.Pipe()
+ r.BindConn(cc)
+
+ go func() {
+ header := make([]byte, util.HeaderLen)
+ n, err := sc.Read(header)
+ if util.ReadErr(n, err, util.HeaderLen) {
+ return
+ }
+
+ ty := header[0]
+ assert.Equal(t, byte(util.RPCExtraInfo), ty)
+ header[0] = 0
+ length := binary.BigEndian.Uint32(header)
+
+ buf := make([]byte, length)
+ n, err = sc.Read(buf)
+ if util.ReadErr(n, err, int(length)) {
+ return
+ }
+
+ sc.Close()
+ }()
+
+ _, err := r.Var("request_time")
+ assert.Equal(t, common.ErrConnClosed, err)
+}
+
+func TestRead(t *testing.T) {
+ out := buildRespReq(respReqOpt{})
+ r := CreateResponse(out)
+
+ cc, sc := net.Pipe()
+ r.BindConn(cc)
+
+ go func() {
+ header := make([]byte, util.HeaderLen)
+ n, err := sc.Read(header)
+ if util.ReadErr(n, err, util.HeaderLen) {
+ return
+ }
+
+ ty := header[0]
+ assert.Equal(t, byte(util.RPCExtraInfo), ty)
+ header[0] = 0
+ length := binary.BigEndian.Uint32(header)
+
+ buf := make([]byte, length)
+ n, err = sc.Read(buf)
+ if util.ReadErr(n, err, int(length)) {
+ return
+ }
+
+ req := ei.GetRootAsReq(buf, 0)
+ assert.Equal(t, ei.InfoRespBody, req.InfoType())
+
+ builder := util.GetBuilder()
+ res := builder.CreateByteVector([]byte("Hello, Go Runner"))
+ ei.RespStart(builder)
+ ei.RespAddResult(builder, res)
+ eiRes := ei.RespEnd(builder)
+ builder.Finish(eiRes)
+ out := builder.FinishedBytes()
+ size := len(out)
+ binary.BigEndian.PutUint32(header, uint32(size))
+ header[0] = util.RPCExtraInfo
+
+ n, err = sc.Write(header)
+ if err != nil {
+ util.WriteErr(n, err)
+ return
+ }
+
+ n, err = sc.Write(out)
+ if err != nil {
+ util.WriteErr(n, err)
+ return
+ }
+ }()
+
+ v, err := r.ReadBody()
+ assert.Nil(t, err)
+ assert.Equal(t, "Hello, Go Runner", string(v))
+}
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index 118f51c..a1bff16 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -210,6 +210,7 @@ func (ph *responsePhase) builder(id uint32, resp
*inHTTP.Response) *flatbuffers.
func HTTPRespCall(buf []byte, conn net.Conn) (*flatbuffers.Builder, error) {
resp := inHTTP.CreateResponse(buf)
+ resp.BindConn(conn)
defer inHTTP.ReuseResponse(resp)
token := resp.ConfToken()
diff --git a/pkg/http/http.go b/pkg/http/http.go
index ca3cbd7..5b4e93c 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -95,6 +95,23 @@ type Response interface {
// It allows you to add or set response headers before reaching the
client.
Header() Header
+ // Var returns the value of a Nginx variable, like
`r.Var("request_time")`
+ //
+ // To fetch the value, the runner will look up the request's cache
first. If not found,
+ // the runner will ask it from the APISIX. If the RPC call is failed,
an error in
+ // pkg/common.ErrConnClosed type is returned.
+ Var(name string) ([]byte, error)
+
+ // ReadBody returns origin HTTP response body
+ //
+ // To fetch the value, the runner will look up the request's cache
first. If not found,
+ // the runner will ask it from the APISIX. If the RPC call is failed,
an error in
+ // pkg/common.ErrConnClosed type is returned.
+ //
+ // It was not named `Body`
+ // because `Body` was already occupied in earlier interface
implementations.
+ ReadBody() ([]byte, error)
+
// Write rewrites the origin response data.
//
// Unlike `ResponseWriter.Write`, we don't need to
WriteHeader(http.StatusOK)
diff --git a/pkg/httptest/recorder.go b/pkg/httptest/recorder.go
index 0e36c9c..9453ca5 100644
--- a/pkg/httptest/recorder.go
+++ b/pkg/httptest/recorder.go
@@ -34,10 +34,15 @@ type ResponseRecorder struct {
// It is an internal detail.
HeaderMap pkgHTTP.Header
- // body is the buffer to which the Handler's Write calls are sent.
+ // Body is the buffer to which the Handler's Write calls are sent.
// If nil, the Writes are silently discarded.
Body *bytes.Buffer
+ // OriginBody is the response body received by APISIX from upstream.
+ OriginBody []byte
+
+ Vars map[string][]byte
+
statusCode int
id uint32
}
@@ -87,6 +92,22 @@ func (rw *ResponseRecorder) Write(buf []byte) (int, error) {
return rw.Body.Write(buf)
}
+// Var implements pkgHTTP.Response.
+func (rw *ResponseRecorder) Var(key string) ([]byte, error) {
+ if rw.Vars == nil {
+ rw.Vars = make(map[string][]byte)
+ }
+ return rw.Vars[key], nil
+}
+
+// ReadBody implements pkgHTTP.Response.
+func (rw *ResponseRecorder) ReadBody() ([]byte, error) {
+ if rw.OriginBody == nil {
+ rw.OriginBody = make([]byte, 0)
+ }
+ return rw.OriginBody, nil
+}
+
// WriteHeader implements pkgHTTP.Response.
// The statusCode is only allowed to be written once.
func (rw *ResponseRecorder) WriteHeader(code int) {