This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-go-plugin-runner.git


The following commit(s) were added to refs/heads/master by this push:
     new 746d738  fix: we can't implement limit-req now (#17)
746d738 is described below

commit 746d738ebab2d474d3646fad737a51c76e2de159
Author: 罗泽轩 <[email protected]>
AuthorDate: Mon Jul 5 15:19:26 2021 +0800

    fix: we can't implement limit-req now (#17)
---
 cmd/go-runner/main.go                   |  1 +
 cmd/go-runner/plugins/limit_req.go      | 77 ---------------------------------
 cmd/go-runner/plugins/limit_req_test.go | 70 ------------------------------
 internal/plugin/plugin.go               |  2 +
 internal/server/server.go               | 13 +++++-
 pkg/http/http.go                        |  6 +--
 pkg/runner/runner.go                    |  2 +-
 7 files changed, 16 insertions(+), 155 deletions(-)

diff --git a/cmd/go-runner/main.go b/cmd/go-runner/main.go
index 109e219..ea2f5c2 100644
--- a/cmd/go-runner/main.go
+++ b/cmd/go-runner/main.go
@@ -26,6 +26,7 @@ import (
        "github.com/thediveo/enumflag"
        "go.uber.org/zap/zapcore"
 
+       _ "github.com/apache/apisix-go-plugin-runner/cmd/go-runner/plugins"
        "github.com/apache/apisix-go-plugin-runner/pkg/log"
        "github.com/apache/apisix-go-plugin-runner/pkg/runner"
 )
diff --git a/cmd/go-runner/plugins/limit_req.go 
b/cmd/go-runner/plugins/limit_req.go
deleted file mode 100644
index 0a8320f..0000000
--- a/cmd/go-runner/plugins/limit_req.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
-       "encoding/json"
-       "net/http"
-       "time"
-
-       "golang.org/x/time/rate"
-
-       pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http"
-       "github.com/apache/apisix-go-plugin-runner/pkg/log"
-       "github.com/apache/apisix-go-plugin-runner/pkg/plugin"
-)
-
-func init() {
-       err := plugin.RegisterPlugin(&LimitReq{})
-       if err != nil {
-               log.Fatalf("failed to register plugin limit-req: %s", err)
-       }
-}
-
-// LimitReq is a demo for a real world plugin
-type LimitReq struct {
-}
-
-type LimitReqConf struct {
-       Burst int     `json:"burst"`
-       Rate  float64 `json:"rate"`
-
-       limiter *rate.Limiter
-}
-
-func (p *LimitReq) Name() string {
-       return "limit-req"
-}
-
-// ParseConf is called when the configuration is changed. And its output is 
unique per route.
-func (p *LimitReq) ParseConf(in []byte) (interface{}, error) {
-       conf := LimitReqConf{}
-       err := json.Unmarshal(in, &conf)
-       if err != nil {
-               return nil, err
-       }
-
-       limiter := rate.NewLimiter(rate.Limit(conf.Rate), conf.Burst)
-       // the conf can be used to store route scope data
-       conf.limiter = limiter
-       return conf, nil
-}
-
-// Filter is called when a request hits the route
-func (p *LimitReq) Filter(conf interface{}, w http.ResponseWriter, r 
pkgHTTP.Request) {
-       li := conf.(LimitReqConf).limiter
-       rs := li.Reserve()
-       if !rs.OK() {
-               // limit rate exceeded
-               log.Infof("limit req rate exceeded")
-               // stop filters with this response
-               w.WriteHeader(http.StatusServiceUnavailable)
-               return
-       }
-       time.Sleep(rs.Delay())
-}
diff --git a/cmd/go-runner/plugins/limit_req_test.go 
b/cmd/go-runner/plugins/limit_req_test.go
deleted file mode 100644
index baab79b..0000000
--- a/cmd/go-runner/plugins/limit_req_test.go
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package plugins
-
-import (
-       "net/http"
-       "net/http/httptest"
-       "sync"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/assert"
-)
-
-func TestLimitReq(t *testing.T) {
-       in := []byte(`{"rate":5,"burst":1}`)
-       lr := &LimitReq{}
-       conf, err := lr.ParseConf(in)
-       assert.Nil(t, err)
-
-       start := time.Now()
-       n := 6
-       var wg sync.WaitGroup
-       res := make([]*http.Response, n)
-       for i := 0; i < n; i++ {
-               wg.Add(1)
-               go func(i int) {
-                       w := httptest.NewRecorder()
-                       lr.Filter(conf, w, nil)
-                       resp := w.Result()
-                       res[i] = resp
-                       wg.Done()
-               }(i)
-       }
-       wg.Wait()
-
-       rejectN := 0
-       for _, r := range res {
-               if r.StatusCode == 503 {
-                       rejectN++
-               }
-       }
-       assert.Equal(t, 0, rejectN)
-       t.Logf("Start: %v, now: %v", start, time.Now())
-       assert.True(t, time.Now().Sub(start) >= 1*time.Second)
-}
-
-func TestLimitReq_YouShouldNotPass(t *testing.T) {
-       in := []byte(`{}`)
-       lr := &LimitReq{}
-       conf, err := lr.ParseConf(in)
-       assert.Nil(t, err)
-
-       w := httptest.NewRecorder()
-       lr.Filter(conf, w, nil)
-       resp := w.Result()
-       assert.Equal(t, 503, resp.StatusCode)
-}
diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go
index 9b88ca9..ce8a620 100644
--- a/internal/plugin/plugin.go
+++ b/internal/plugin/plugin.go
@@ -59,6 +59,8 @@ var (
 )
 
 func RegisterPlugin(name string, pc ParseConfFunc, sv FilterFunc) error {
+       log.Infof("register plugin %s", name)
+
        if name == "" {
                return ErrMissingName
        }
diff --git a/internal/server/server.go b/internal/server/server.go
index 6e3ca58..5ac6dc6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -26,6 +26,7 @@ import (
        "syscall"
        "time"
 
+       "github.com/ReneKroon/ttlcache/v2"
        flatbuffers "github.com/google/flatbuffers/go"
 
        "github.com/apache/apisix-go-plugin-runner/internal/plugin"
@@ -67,12 +68,17 @@ func readErr(n int, err error, required int) bool {
 
 func writeErr(n int, err error) {
        if err != nil {
+               // TODO: solve "write: broken pipe" with context
                log.Errorf("write: %s", err)
        }
 }
 
 func generateErrorReport(err error) (ty byte, out []byte) {
-       log.Errorf("%s", err)
+       if err == ttlcache.ErrNotFound {
+               log.Warnf("%s", err)
+       } else {
+               log.Errorf("%s", err)
+       }
 
        ty = RPCError
        bd := ReportError(err)
@@ -220,13 +226,16 @@ func Run() {
 
        go func() {
                for {
+                       conn, err := l.Accept()
+
                        select {
                        case <-done:
+                               // don't report the "use of closed network 
connection" error when the server
+                               // is exiting.
                                return
                        default:
                        }
 
-                       conn, err := l.Accept()
                        if err != nil {
                                log.Errorf("accept: %s", err)
                                continue
diff --git a/pkg/http/http.go b/pkg/http/http.go
index cf00132..b5d79e6 100644
--- a/pkg/http/http.go
+++ b/pkg/http/http.go
@@ -26,17 +26,13 @@ import (
 //
 // 1. We need to record any change to the request headers. As the 
Request.Header
 // is not an interface, there is not way to inject our special tracker.
+//
 // 2. As the author of fasthttp pointed out, "headers are stored in a 
map[string][]string.
 // So the server must parse all the headers, ...". The official API is 
suboptimal, which
 // is even worse in our case as it is not a real HTTP server.
 type Request interface {
        // ID returns the request id
        ID() uint32
-       // ConfToken returns the token represents the configuration of current 
route.
-       // Each route have its unique token, so we can use it to distinguish 
different
-       // route in the same plugin. When the configuration of a route changed, 
the token
-       // will change too.
-       ConfToken() uint32
 
        // SrcIP returns the client's IP
        SrcIP() net.IP
diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go
index 0405097..e5efa34 100644
--- a/pkg/runner/runner.go
+++ b/pkg/runner/runner.go
@@ -31,7 +31,7 @@ type RunnerConfig struct {
        LogOutput zapcore.WriteSyncer
 }
 
-// Run starts the runner and listen the socket
+// Run starts the runner and listen the socket configured by environment 
variable "APISIX_LISTEN_ADDRESS"
 func Run(cfg RunnerConfig) {
        if cfg.LogOutput == nil {
                cfg.LogOutput = os.Stdout

Reply via email to