This is an automated email from the ASF dual-hosted git repository.

soulbird pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f8d06f  feat: support get response body by extra_info (#107)
8f8d06f is described below

commit 8f8d06fa2faea1b5074aabd337475c867ff3dfff
Author: soulbird <[email protected]>
AuthorDate: Wed Sep 28 09:23:45 2022 +0800

    feat: support get response body by extra_info (#107)
    
    * feat: support get response body by extra_info
    
    Co-authored-by: soulbird <[email protected]>
---
 go.mod                         |   2 +-
 go.sum                         |   4 +-
 internal/http/response.go      | 122 +++++++++++++++++++++++++++++
 internal/http/response_test.go | 174 +++++++++++++++++++++++++++++++++++++++++
 internal/plugin/plugin.go      |   1 +
 pkg/http/http.go               |  17 ++++
 pkg/httptest/recorder.go       |  23 +++++-
 7 files changed, 339 insertions(+), 4 deletions(-)

diff --git a/go.mod b/go.mod
index 8f963cb..4f7915f 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ go 1.15
 
 require (
        github.com/ReneKroon/ttlcache/v2 v2.4.0
-       github.com/api7/ext-plugin-proto v0.5.0
+       github.com/api7/ext-plugin-proto v0.6.0
        github.com/google/flatbuffers v2.0.0+incompatible
        github.com/spf13/cobra v1.2.1
        github.com/stretchr/testify v1.7.0
diff --git a/go.sum b/go.sum
index 57151a2..ff5ba33 100644
--- a/go.sum
+++ b/go.sum
@@ -43,8 +43,8 @@ github.com/ReneKroon/ttlcache/v2 v2.4.0 
h1:KywGhjik+ZFTDXMNLiPECSzmdx2yNvAlDNKES
 github.com/ReneKroon/ttlcache/v2 v2.4.0/go.mod 
h1:zbo6Pv/28e21Z8CzzqgYRArQYGYtjONRxaAKGxzQvG4=
 github.com/alvaroloes/enumer v1.1.2/go.mod 
h1:FxrjvuXoDAx9isTJrv4c+T410zFi0DtXIT0m65DJ+Wo=
 github.com/antihax/optional v1.0.0/go.mod 
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/api7/ext-plugin-proto v0.5.0 
h1:YcqPfluhA6qkFO8tZX2fPyKNgfB/s+MF/dK+k4QnzzY=
-github.com/api7/ext-plugin-proto v0.5.0/go.mod 
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
+github.com/api7/ext-plugin-proto v0.6.0 
h1:xmgcKwWRiM9EpBIs1wYJ7Ife/YnLl4IL2NEy4417g60=
+github.com/api7/ext-plugin-proto v0.6.0/go.mod 
h1:8dbdAgCESeqwZ0IXirbjLbshEntmdrAX3uet+LW3jVU=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod 
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod 
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod 
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
diff --git a/internal/http/response.go b/internal/http/response.go
index 19e04ef..7c97f42 100644
--- a/internal/http/response.go
+++ b/internal/http/response.go
@@ -19,10 +19,17 @@ package http
 
 import (
        "bytes"
+       "encoding/binary"
+       "net"
        "net/http"
        "sync"
 
+       "github.com/apache/apisix-go-plugin-runner/internal/util"
+       "github.com/apache/apisix-go-plugin-runner/pkg/common"
+       ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo"
+
        pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
+       "github.com/apache/apisix-go-plugin-runner/pkg/log"
        "github.com/api7/ext-plugin-proto/go/A6"
        hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPRespCall"
        flatbuffers "github.com/google/flatbuffers/go"
@@ -31,12 +38,73 @@ import (
 type Response struct {
        r *hrc.Req
 
+       conn            net.Conn
+       extraInfoHeader []byte
+
        hdr    *Header
        rawHdr http.Header
 
        statusCode int
 
        body *bytes.Buffer
+
+       vars map[string][]byte
+       // originBody is read-only
+       originBody []byte
+}
+
+func (r *Response) askExtraInfo(builder *flatbuffers.Builder,
+       infoType ei.Info, info flatbuffers.UOffsetT) ([]byte, error) {
+
+       ei.ReqStart(builder)
+       ei.ReqAddInfoType(builder, infoType)
+       ei.ReqAddInfo(builder, info)
+       eiRes := ei.ReqEnd(builder)
+       builder.Finish(eiRes)
+
+       c := r.conn
+       if len(r.extraInfoHeader) == 0 {
+               r.extraInfoHeader = make([]byte, util.HeaderLen)
+       }
+       header := r.extraInfoHeader
+
+       out := builder.FinishedBytes()
+       size := len(out)
+       binary.BigEndian.PutUint32(header, uint32(size))
+       header[0] = util.RPCExtraInfo
+
+       n, err := c.Write(header)
+       if err != nil {
+               util.WriteErr(n, err)
+               return nil, common.ErrConnClosed
+       }
+
+       n, err = c.Write(out)
+       if err != nil {
+               util.WriteErr(n, err)
+               return nil, common.ErrConnClosed
+       }
+
+       n, err = c.Read(header)
+       if util.ReadErr(n, err, util.HeaderLen) {
+               return nil, common.ErrConnClosed
+       }
+
+       ty := header[0]
+       header[0] = 0
+       length := binary.BigEndian.Uint32(header)
+
+       log.Infof("receive rpc type: %d data length: %d", ty, length)
+
+       buf := make([]byte, length)
+       n, err = c.Read(buf)
+       if util.ReadErr(n, err, int(length)) {
+               return nil, common.ErrConnClosed
+       }
+
+       resp := ei.GetRootAsResp(buf, 0)
+       res := resp.ResultBytes()
+       return res, nil
 }
 
 func (r *Response) ID() uint32 {
@@ -75,6 +143,53 @@ func (r *Response) Write(b []byte) (int, error) {
        return r.body.Write(b)
 }
 
+func (r *Response) Var(name string) ([]byte, error) {
+       if r.vars == nil {
+               r.vars = map[string][]byte{}
+       }
+
+       var v []byte
+       var found bool
+
+       if v, found = r.vars[name]; !found {
+               var err error
+
+               builder := util.GetBuilder()
+               varName := builder.CreateString(name)
+               ei.VarStart(builder)
+               ei.VarAddName(builder, varName)
+               varInfo := ei.VarEnd(builder)
+               v, err = r.askExtraInfo(builder, ei.InfoVar, varInfo)
+               util.PutBuilder(builder)
+
+               if err != nil {
+                       return nil, err
+               }
+
+               r.vars[name] = v
+       }
+       return v, nil
+}
+
+func (r *Response) ReadBody() ([]byte, error) {
+       if len(r.originBody) > 0 {
+               return r.originBody, nil
+       }
+
+       builder := util.GetBuilder()
+       ei.ReqBodyStart(builder)
+       bodyInfo := ei.ReqBodyEnd(builder)
+       v, err := r.askExtraInfo(builder, ei.InfoRespBody, bodyInfo)
+       util.PutBuilder(builder)
+
+       if err != nil {
+               return nil, err
+       }
+
+       r.originBody = v
+       return v, nil
+}
+
 func (r *Response) WriteHeader(statusCode int) {
        if r.statusCode != 0 {
                // official WriteHeader can't override written status
@@ -158,10 +273,17 @@ func (r *Response) FetchChanges(builder 
*flatbuffers.Builder) bool {
        return true
 }
 
+func (r *Response) BindConn(c net.Conn) {
+       r.conn = c
+}
+
 func (r *Response) Reset() {
        r.body = nil
        r.statusCode = 0
        r.hdr = nil
+       r.conn = nil
+       r.vars = nil
+       r.originBody = nil
 }
 
 var respPool = sync.Pool{
diff --git a/internal/http/response_test.go b/internal/http/response_test.go
index 710cf7b..128bf73 100644
--- a/internal/http/response_test.go
+++ b/internal/http/response_test.go
@@ -18,9 +18,14 @@
 package http
 
 import (
+       "encoding/binary"
+       "net"
        "net/http"
        "testing"
 
+       "github.com/apache/apisix-go-plugin-runner/pkg/common"
+       ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo"
+
        "github.com/apache/apisix-go-plugin-runner/internal/util"
        "github.com/api7/ext-plugin-proto/go/A6"
        hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPRespCall"
@@ -187,3 +192,172 @@ func TestResponse_Write(t *testing.T) {
        assert.Equal(t, []byte("hello world"), resp.BodyBytes())
        ReuseResponse(r)
 }
+
+func TestResponse_Var(t *testing.T) {
+       out := buildRespReq(respReqOpt{})
+       r := CreateResponse(out)
+
+       cc, sc := net.Pipe()
+       r.BindConn(cc)
+
+       go func() {
+               header := make([]byte, util.HeaderLen)
+               n, err := sc.Read(header)
+               if util.ReadErr(n, err, util.HeaderLen) {
+                       return
+               }
+
+               ty := header[0]
+               assert.Equal(t, byte(util.RPCExtraInfo), ty)
+               header[0] = 0
+               length := binary.BigEndian.Uint32(header)
+
+               buf := make([]byte, length)
+               n, err = sc.Read(buf)
+               if util.ReadErr(n, err, int(length)) {
+                       return
+               }
+
+               req := ei.GetRootAsReq(buf, 0)
+               info := getVarInfo(t, req)
+               assert.Equal(t, "request_time", string(info.Name()))
+
+               builder := util.GetBuilder()
+               res := builder.CreateByteVector([]byte("1.0"))
+               ei.RespStart(builder)
+               ei.RespAddResult(builder, res)
+               eiRes := ei.RespEnd(builder)
+               builder.Finish(eiRes)
+               out := builder.FinishedBytes()
+               size := len(out)
+               binary.BigEndian.PutUint32(header, uint32(size))
+               header[0] = util.RPCExtraInfo
+
+               n, err = sc.Write(header)
+               if err != nil {
+                       util.WriteErr(n, err)
+                       return
+               }
+
+               n, err = sc.Write(out)
+               if err != nil {
+                       util.WriteErr(n, err)
+                       return
+               }
+       }()
+
+       for i := 0; i < 2; i++ {
+               v, err := r.Var("request_time")
+               assert.Nil(t, err)
+               assert.Equal(t, "1.0", string(v))
+       }
+}
+
+func TestResponse_Var_FailedToSendExtraInfoReq(t *testing.T) {
+       out := buildRespReq(respReqOpt{})
+       r := CreateResponse(out)
+
+       cc, sc := net.Pipe()
+       r.BindConn(cc)
+
+       go func() {
+               header := make([]byte, util.HeaderLen)
+               n, err := sc.Read(header)
+               if util.ReadErr(n, err, util.HeaderLen) {
+                       return
+               }
+               sc.Close()
+       }()
+
+       _, err := r.Var("request_time")
+       assert.Equal(t, common.ErrConnClosed, err)
+}
+
+func TestResponse_FailedToReadExtraInfoResp(t *testing.T) {
+       out := buildRespReq(respReqOpt{})
+       r := CreateResponse(out)
+
+       cc, sc := net.Pipe()
+       r.BindConn(cc)
+
+       go func() {
+               header := make([]byte, util.HeaderLen)
+               n, err := sc.Read(header)
+               if util.ReadErr(n, err, util.HeaderLen) {
+                       return
+               }
+
+               ty := header[0]
+               assert.Equal(t, byte(util.RPCExtraInfo), ty)
+               header[0] = 0
+               length := binary.BigEndian.Uint32(header)
+
+               buf := make([]byte, length)
+               n, err = sc.Read(buf)
+               if util.ReadErr(n, err, int(length)) {
+                       return
+               }
+
+               sc.Close()
+       }()
+
+       _, err := r.Var("request_time")
+       assert.Equal(t, common.ErrConnClosed, err)
+}
+
+func TestRead(t *testing.T) {
+       out := buildRespReq(respReqOpt{})
+       r := CreateResponse(out)
+
+       cc, sc := net.Pipe()
+       r.BindConn(cc)
+
+       go func() {
+               header := make([]byte, util.HeaderLen)
+               n, err := sc.Read(header)
+               if util.ReadErr(n, err, util.HeaderLen) {
+                       return
+               }
+
+               ty := header[0]
+               assert.Equal(t, byte(util.RPCExtraInfo), ty)
+               header[0] = 0
+               length := binary.BigEndian.Uint32(header)
+
+               buf := make([]byte, length)
+               n, err = sc.Read(buf)
+               if util.ReadErr(n, err, int(length)) {
+                       return
+               }
+
+               req := ei.GetRootAsReq(buf, 0)
+               assert.Equal(t, ei.InfoRespBody, req.InfoType())
+
+               builder := util.GetBuilder()
+               res := builder.CreateByteVector([]byte("Hello, Go Runner"))
+               ei.RespStart(builder)
+               ei.RespAddResult(builder, res)
+               eiRes := ei.RespEnd(builder)
+               builder.Finish(eiRes)
+               out := builder.FinishedBytes()
+               size := len(out)
+               binary.BigEndian.PutUint32(header, uint32(size))
+               header[0] = util.RPCExtraInfo
+
+               n, err = sc.Write(header)
+               if err != nil {
+                       util.WriteErr(n, err)
+                       return
+               }
+
+               n, err = sc.Write(out)
+               if err != nil {
+                       util.WriteErr(n, err)
+                       return
+               }
+       }()
+
+       v, err := r.ReadBody()
+       assert.Nil(t, err)
+       assert.Equal(t, "Hello, Go Runner", string(v))
+}
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index 118f51c..a1bff16 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -210,6 +210,7 @@ func (ph *responsePhase) builder(id uint32, resp 
*inHTTP.Response) *flatbuffers.
 
 func HTTPRespCall(buf []byte, conn net.Conn) (*flatbuffers.Builder, error) {
        resp := inHTTP.CreateResponse(buf)
+       resp.BindConn(conn)
        defer inHTTP.ReuseResponse(resp)
 
        token := resp.ConfToken()
diff --git a/pkg/http/http.go b/pkg/http/http.go
index ca3cbd7..5b4e93c 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -95,6 +95,23 @@ type Response interface {
        // It allows you to add or set response headers before reaching the 
client.
        Header() Header
 
+       // Var returns the value of a Nginx variable, like 
`r.Var("request_time")`
+       //
+       // To fetch the value, the runner will look up the request's cache 
first. If not found,
+       // the runner will ask it from the APISIX. If the RPC call is failed, 
an error in
+       // pkg/common.ErrConnClosed type is returned.
+       Var(name string) ([]byte, error)
+
+       // ReadBody returns origin HTTP response body
+       //
+       // To fetch the value, the runner will look up the request's cache 
first. If not found,
+       // the runner will ask it from the APISIX. If the RPC call is failed, 
an error in
+       // pkg/common.ErrConnClosed type is returned.
+       //
+       // It was not named `Body`
+       // because `Body` was already occupied in earlier interface 
implementations.
+       ReadBody() ([]byte, error)
+
        // Write rewrites the origin response data.
        //
        // Unlike `ResponseWriter.Write`, we don't need to 
WriteHeader(http.StatusOK)
diff --git a/pkg/httptest/recorder.go b/pkg/httptest/recorder.go
index 0e36c9c..9453ca5 100644
--- a/pkg/httptest/recorder.go
+++ b/pkg/httptest/recorder.go
@@ -34,10 +34,15 @@ type ResponseRecorder struct {
        // It is an internal detail.
        HeaderMap pkgHTTP.Header
 
-       // body is the buffer to which the Handler's Write calls are sent.
+       // Body is the buffer to which the Handler's Write calls are sent.
        // If nil, the Writes are silently discarded.
        Body *bytes.Buffer
 
+       // OriginBody is the response body received by APISIX from upstream.
+       OriginBody []byte
+
+       Vars map[string][]byte
+
        statusCode int
        id         uint32
 }
@@ -87,6 +92,22 @@ func (rw *ResponseRecorder) Write(buf []byte) (int, error) {
        return rw.Body.Write(buf)
 }
 
+// Var implements pkgHTTP.Response.
+func (rw *ResponseRecorder) Var(key string) ([]byte, error) {
+       if rw.Vars == nil {
+               rw.Vars = make(map[string][]byte)
+       }
+       return rw.Vars[key], nil
+}
+
+// ReadBody implements pkgHTTP.Response.
+func (rw *ResponseRecorder) ReadBody() ([]byte, error) {
+       if rw.OriginBody == nil {
+               rw.OriginBody = make([]byte, 0)
+       }
+       return rw.OriginBody, nil
+}
+
 // WriteHeader implements pkgHTTP.Response.
 // The statusCode is only allowed to be written once.
 func (rw *ResponseRecorder) WriteHeader(code int) {

Reply via email to