This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push:
new 66fe3fc feat: impl http.Hijacker (#188)
66fe3fc is described below
commit 66fe3fcc6357cc78f0eef2be372f2a7a70e9b7a8
Author: Like <[email protected]>
AuthorDate: Sat Jul 13 14:01:23 2024 +0800
feat: impl http.Hijacker (#188)
---
CHANGES.md | 1 +
plugins/mux/serve_interceptor.go | 41 ++++++++++++++++++++++--
test/plugins/scenarios/mux/config/excepted.yml | 24 +++++++++++---
test/plugins/scenarios/mux/go.mod | 5 ++-
test/plugins/scenarios/mux/go.sum | 2 ++
test/plugins/scenarios/mux/main.go | 44 ++++++++++++++++++++++++++
6 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 83685e8..26f08f9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
* Support http headers collection for Gin.
* Support higher versions of grpc.
* Support [go-elasticsearchv8](https://github.com/elastic/go-elasticsearch)
database client framework.
+* Support `http.Hijacker` interface for mux plugin.
### Bug Fixes
* Fix panic error when root span finished.
diff --git a/plugins/mux/serve_interceptor.go b/plugins/mux/serve_interceptor.go
index 7078d08..01b2dee 100644
--- a/plugins/mux/serve_interceptor.go
+++ b/plugins/mux/serve_interceptor.go
@@ -40,8 +40,8 @@ func (n *ServeHTTPInterceptor) BeforeInvoke(invocation
operator.Invocation) erro
return err
}
- writer := invocation.Args()[0].(http.ResponseWriter)
- invocation.ChangeArg(0, &writerWrapper{ResponseWriter: writer,
statusCode: http.StatusOK})
+ rw := newResponseWriter(invocation.Args()[0])
+ invocation.ChangeArg(0, rw)
invocation.SetContext(s)
return nil
}
@@ -54,10 +54,32 @@ func (n *ServeHTTPInterceptor) AfterInvoke(invocation
operator.Invocation, resul
if wrapped, ok := invocation.Args()[0].(*writerWrapper); ok {
span.Tag(tracing.TagStatusCode, fmt.Sprintf("%d",
wrapped.statusCode))
}
+ if wrapped, ok := invocation.Args()[0].(*writerWrapperWithHijacker); ok
{
+ span.Tag(tracing.TagStatusCode, fmt.Sprintf("%d",
wrapped.writer.statusCode))
+ }
span.End()
return nil
}
+func newResponseWriter(val interface{}) http.ResponseWriter {
+ var rw http.ResponseWriter
+ sourceWriter := val.(http.ResponseWriter)
+ switch val.(type) {
+ case http.Hijacker:
+ rw = newWriterWrapperWithHijacker(sourceWriter,
sourceWriter.(http.Hijacker))
+ default:
+ rw = newWriterWrapper(rw)
+ }
+ return rw
+}
+
+func newWriterWrapper(writer http.ResponseWriter) *writerWrapper {
+ return &writerWrapper{
+ ResponseWriter: writer,
+ statusCode: http.StatusOK,
+ }
+}
+
type writerWrapper struct {
http.ResponseWriter
statusCode int
@@ -68,3 +90,18 @@ func (w *writerWrapper) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
+
+func newWriterWrapperWithHijacker(writer http.ResponseWriter, hijacker
http.Hijacker) *writerWrapperWithHijacker {
+ wrapper := newWriterWrapper(writer)
+ return &writerWrapperWithHijacker{
+ ResponseWriter: wrapper,
+ writer: wrapper,
+ Hijacker: hijacker,
+ }
+}
+
+type writerWrapperWithHijacker struct {
+ http.ResponseWriter
+ writer *writerWrapper // status code cache
+ http.Hijacker
+}
diff --git a/test/plugins/scenarios/mux/config/excepted.yml
b/test/plugins/scenarios/mux/config/excepted.yml
index 43b2361..670692f 100644
--- a/test/plugins/scenarios/mux/config/excepted.yml
+++ b/test/plugins/scenarios/mux/config/excepted.yml
@@ -16,8 +16,25 @@
segmentItems:
- serviceName: mux
- segmentSize: ge 3
+ segmentSize: ge 4
segments:
+ - segmentId: not null
+ spans:
+ - operationName: GET:/ws
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 5017
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - { key: http.method, value: GET }
+ - { key: url, value: 'localhost:8080/ws' }
+ - { key: status_code, value: '200' }
- segmentId: not null
spans:
- operationName: GET:/provider/test
@@ -72,6 +89,5 @@ segmentItems:
parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: mux,
traceId: not null }
-
-meterItems: []
-logItems: []
\ No newline at end of file
+meterItems: [ ]
+logItems: [ ]
diff --git a/test/plugins/scenarios/mux/go.mod
b/test/plugins/scenarios/mux/go.mod
index 7551e44..ad02ad8 100644
--- a/test/plugins/scenarios/mux/go.mod
+++ b/test/plugins/scenarios/mux/go.mod
@@ -2,4 +2,7 @@ module test/plugins/scenarios/mux
go 1.18
-require github.com/gorilla/mux v1.8.0 // indirect
+require (
+ github.com/gorilla/mux v1.8.0 // indirect
+ github.com/gorilla/websocket v1.5.3 // indirect
+)
diff --git a/test/plugins/scenarios/mux/go.sum
b/test/plugins/scenarios/mux/go.sum
index 5350288..ac0d4be 100644
--- a/test/plugins/scenarios/mux/go.sum
+++ b/test/plugins/scenarios/mux/go.sum
@@ -1,2 +1,4 @@
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/gorilla/websocket v1.5.3
h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod
h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
diff --git a/test/plugins/scenarios/mux/main.go
b/test/plugins/scenarios/mux/main.go
index 464d463..b7bc30f 100644
--- a/test/plugins/scenarios/mux/main.go
+++ b/test/plugins/scenarios/mux/main.go
@@ -18,9 +18,11 @@
package main
import (
+ "github.com/gorilla/websocket"
"io"
"log"
"net/http"
+ "net/url"
"time"
_ "github.com/apache/skywalking-go"
@@ -30,6 +32,8 @@ import (
func provider(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Millisecond * 10)
+ // test ws
+ connectWs()
w.Write([]byte("success"))
}
@@ -54,11 +58,51 @@ func health(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("success"))
}
+var upgrader = websocket.Upgrader{}
+
+// ws
+// ISSUE: https://github.com/apache/skywalking-go/pull/188
+// Test http.ResponseWriter cast to http.Hijacker
+func ws(w http.ResponseWriter, r *http.Request) {
+ c, err := upgrader.Upgrade(w, r, nil)
+ if err != nil {
+ log.Print("upgrade:", err)
+ return
+ }
+ defer c.Close()
+ for {
+ mt, message, err := c.ReadMessage()
+ if err != nil {
+ log.Println("read:", err)
+ break
+ }
+ log.Printf("recv: %s", message)
+ err = c.WriteMessage(mt, message)
+ if err != nil {
+ log.Println("write:", err)
+ break
+ }
+ }
+}
+
+func connectWs() {
+ u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"}
+ log.Printf("connecting to %s", u.String())
+ c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ if err != nil {
+ log.Fatal("dial:", err)
+ }
+ defer c.Close()
+ c.WriteMessage(websocket.TextMessage, []byte("hello from mux test"))
+ c.Close()
+}
+
func main() {
r := mux.NewRouter()
r.Path("/health").HandlerFunc(health)
r.Path("/consumer").HandlerFunc(consumer)
r.PathPrefix("/provider").Path("/{var}").HandlerFunc(provider)
+ r.Path("/ws").HandlerFunc(ws)
// Bind to a port and pass our router in
log.Fatal(http.ListenAndServe(":8080", r))