buptubuntu commented on a change in pull request #2497:
URL: https://github.com/apache/thrift/pull/2497#discussion_r780645102
##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
}
if client != nil {
p.wg.Add(1)
+ p.clients[client] = struct{}{}
go func() {
+ defer func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ delete(p.clients, client)
Review comment:
I have tested the mem the goroutine cost using the code below:
```golang
package main
import (
"context"
"fmt"
"runtime"
"sync"
)
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
var channel <-chan interface{}
var wg sync.WaitGroup
const numGoroutines = 1000000 // 1M
wg.Add(numGoroutines)
before := memConsumed()
for i := numGoroutines; i > 0; i-- {
ctx, cancel := context.WithCancel(context.Background())
go func() {
wg.Done()
cancel()
<-ctx.Done()
<-channel
}()
}
wg.Wait()
after := memConsumed()
fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
}
```
cost as below:
<html xmlns:v="urn:schemas-microsoft-com:vml"
xmlns:o="urn:schemas-microsoft-com:office:office"
xmlns:x="urn:schemas-microsoft-com:office:excel"
xmlns="http://www.w3.org/TR/REC-html40">
<head>
<meta name=ProgId content=Excel.Sheet>
<meta name=Generator content="Microsoft Excel 15">
<link id=Main-File rel=Main-File
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip.htm">
<link rel=File-List
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip_filelist.xml">
</head>
<body link="#0563C1" vlink="#954F72">
goroutines | total mem | mem per goroutine
-- | -- | --
10000 | 35M | 3.5K
1000 | 5.3M | 5.3K
</body>
</html>
##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
}
if client != nil {
p.wg.Add(1)
+ p.clients[client] = struct{}{}
go func() {
+ defer func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ delete(p.clients, client)
Review comment:
I have also tested the cost per client using code below:
```golang
package main
import (
"context"
"fmt"
"net"
_ "net/http/pprof"
"runtime"
"strings"
"time"
"github.com/apache/thrift/lib/go/thrift"
)
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
ln, _ := net.Listen("tcp", "localhost:0")
proc := &mockProcessor{
ProcessFunc: func(in, out thrift.TProtocol) (bool,
thrift.TException) {
in.ReadMessageBegin(context.Background())
time.Sleep(time.Hour)
return false, nil
},
}
trans := &mockServerTransport{
ListenFunc: func() error {
return nil
},
AcceptFunc: func() (thrift.TTransport, error) {
conn, err := ln.Accept()
if err != nil {
// t.Errorf("error accept connection")
return nil, err
}
return thrift.NewTSocketFromConnTimeout(conn, 0), nil
},
CloseFunc: func() error {
return nil
},
InterruptFunc: func() error {
return ln.Close()
},
}
serv := thrift.NewTSimpleServer2(proc, trans)
go serv.Serve()
time.Sleep(time.Second)
const numGoroutines = 1000
conns := make([]net.Conn, 0, numGoroutines)
before := memConsumed()
for i := 0; i < numGoroutines; i++ {
port := strings.Split(ln.Addr().String(), ":")[1]
netConn, _ := net.Dial("tcp", "localhost:"+port)
time.Sleep(time.Millisecond)
conns = append(conns, netConn)
}
after := memConsumed()
time.Sleep(time.Second)
fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
}
type mockProcessor struct {
ProcessFunc func(in, out thrift.TProtocol) (bool, thrift.TException)
}
func (m *mockProcessor) Process(ctx context.Context, in, out
thrift.TProtocol) (bool, thrift.TException) {
return m.ProcessFunc(in, out)
}
func (m *mockProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
return map[string]thrift.TProcessorFunction{
"mock": thrift.WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out
thrift.TProtocol) (bool, thrift.TException) {
return m.ProcessFunc(in, out)
},
},
}
}
func (m *mockProcessor) AddToProcessorMap(name string, processorFunc
thrift.TProcessorFunction) {}
type mockWrappedProcessorContextKey int
const (
processorName mockWrappedProcessorContextKey = iota
)
// setMockWrappableProcessorName sets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
//
// In a normal TProcessor, the request name is read from the request itself
// which happens in TProcessor.Process, so it is not passed into the call to
// Process itself, to get around this in testing, mockWrappableProcessor
calls
// getMockWrappableProcessorName to get the name to use from the context
// object.
func setMockWrappableProcessorName(ctx context.Context, name string)
context.Context {
return context.WithValue(ctx, processorName, name)
}
// getMockWrappableProcessorName gets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
func getMockWrappableProcessorName(ctx context.Context) (string, bool) {
val, ok := ctx.Value(processorName).(string)
return val, ok
}
// mockWrappableProcessor can be used to create a mock object that fufills
the
// TProcessor interface in testing.
type mockWrappableProcessor struct {
ProcessorFuncs map[string]thrift.TProcessorFunction
}
// Process calls the TProcessorFunction assigned to the "name" set on the
// context object by setMockWrappableProcessorName.
//
// If no name is set on the context or there is no TProcessorFunction mapped
to
// that name, the call will panic.
func (p *mockWrappableProcessor) Process(ctx context.Context, in, out
thrift.TProtocol) (bool, thrift.TException) {
name, ok := getMockWrappableProcessorName(ctx)
if !ok {
panic("MockWrappableProcessorName not set on context")
}
processor, ok := p.ProcessorMap()[name]
if !ok {
panic(fmt.Sprintf("No processor set for name %q", name))
}
return processor.Process(ctx, 0, in, out)
}
func (p *mockWrappableProcessor) ProcessorMap()
map[string]thrift.TProcessorFunction {
return p.ProcessorFuncs
}
func (p *mockWrappableProcessor) AddToProcessorMap(name string,
processorFunc thrift.TProcessorFunction) {
p.ProcessorFuncs[name] = processorFunc
}
var (
_ thrift.TProcessor = (*mockProcessor)(nil)
_ thrift.TProcessor = (*mockWrappableProcessor)(nil)
)
type mockServerTransport struct {
ListenFunc func() error
AcceptFunc func() (thrift.TTransport, error)
CloseFunc func() error
InterruptFunc func() error
}
func (m *mockServerTransport) Listen() error {
return m.ListenFunc()
}
func (m *mockServerTransport) Accept() (thrift.TTransport, error) {
return m.AcceptFunc()
}
func (m *mockServerTransport) Close() error {
return m.CloseFunc()
}
func (m *mockServerTransport) Interrupt() error {
return m.InterruptFunc()
}
type mockTTransport struct {
thrift.TTransport
}
func (m *mockTTransport) Close() error {
return nil
}
```
cost as below:
<html xmlns:v="urn:schemas-microsoft-com:vml"
xmlns:o="urn:schemas-microsoft-com:office:office"
xmlns:x="urn:schemas-microsoft-com:office:excel"
xmlns="http://www.w3.org/TR/REC-html40">
<head>
<meta name=ProgId content=Excel.Sheet>
<meta name=Generator content="Microsoft Excel 15">
<link id=Main-File rel=Main-File
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip.htm">
<link rel=File-List
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip_filelist.xml">
</head>
<body link="#0563C1" vlink="#954F72">
clients | total mem | mem per client
-- | -- | --
10000 | 157M | 15.7K
1000 | 24M | 24K
</body>
</html>
##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
}
if client != nil {
p.wg.Add(1)
+ p.clients[client] = struct{}{}
go func() {
+ defer func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ delete(p.clients, client)
Review comment:
For the cpu cost, as the operation of the additional goroutine is quite
simple(close the client) and we just move it from the stop function,i think we
can just ignore that
##########
File path: lib/go/thrift/simple_server.go
##########
@@ -234,7 +250,9 @@ func (p *TSimpleServer) Stop() error {
}
atomic.StoreInt32(&p.closed, 1)
p.serverTransport.Interrupt()
+ close(p.stopChan)
p.wg.Wait()
+ p.stopChan = make(chan struct{})
Review comment:
Because is stopped server can serve later, and the closed channel can
not be closed again, so we have to make a new channel
##########
File path: lib/go/thrift/socket.go
##########
@@ -195,8 +195,9 @@ func (p *TSocket) IsOpen() bool {
// Closes the socket.
func (p *TSocket) Close() error {
// Close the socket
- if p.conn != nil {
- err := p.conn.Close()
+ conn := p.conn
Review comment:
go test --race ./lib/go/thrift/...
ok github.com/apache/thrift/lib/go/thrift 2.343s
seems ok
##########
File path: lib/go/thrift/socket.go
##########
@@ -195,8 +195,9 @@ func (p *TSocket) IsOpen() bool {
// Closes the socket.
func (p *TSocket) Close() error {
// Close the socket
- if p.conn != nil {
- err := p.conn.Close()
+ conn := p.conn
Review comment:
go test --race ./lib/go/thrift/...
ok github.com/apache/thrift/lib/go/thrift 2.343s
seems ok
##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
runtime.Gosched()
serv.Stop()
}
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ t.Errorf("error when listen")
Review comment:
done
##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
runtime.Gosched()
serv.Stop()
}
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ t.Errorf("error when listen")
+ }
+ proc := &mockProcessor{
+ ProcessFunc: func(in, out TProtocol) (bool, TException) {
+ in.ReadMessageBegin(context.Background())
+ return false, nil
+ },
+ }
+
+ trans := &mockServerTransport{
+ ListenFunc: func() error {
+ return nil
+ },
+ AcceptFunc: func() (TTransport, error) {
+ conn, err := ln.Accept()
+ if err != nil {
+ // t.Errorf("error accept connection")
Review comment:
done
##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
runtime.Gosched()
serv.Stop()
}
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ t.Errorf("error when listen")
+ }
+ proc := &mockProcessor{
+ ProcessFunc: func(in, out TProtocol) (bool, TException) {
+ in.ReadMessageBegin(context.Background())
+ return false, nil
+ },
+ }
+
+ trans := &mockServerTransport{
+ ListenFunc: func() error {
+ return nil
+ },
+ AcceptFunc: func() (TTransport, error) {
+ conn, err := ln.Accept()
+ if err != nil {
+ // t.Errorf("error accept connection")
+ return nil, err
+ }
+ return NewTSocketFromConnTimeout(conn, 0), nil
+ },
+ CloseFunc: func() error {
+ return nil
+ },
+ InterruptFunc: func() error {
+ return ln.Close()
+ },
+ }
+
+ serv := NewTSimpleServer2(proc, trans)
+ go serv.Serve()
+
+ port := strings.Split(ln.Addr().String(), ":")[1]
+ netConn, err := net.Dial("tcp", "localhost:"+port)
Review comment:
done
##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
runtime.Gosched()
serv.Stop()
}
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ t.Errorf("error when listen")
+ }
+ proc := &mockProcessor{
+ ProcessFunc: func(in, out TProtocol) (bool, TException) {
+ in.ReadMessageBegin(context.Background())
+ return false, nil
+ },
+ }
+
+ trans := &mockServerTransport{
+ ListenFunc: func() error {
+ return nil
+ },
+ AcceptFunc: func() (TTransport, error) {
+ conn, err := ln.Accept()
+ if err != nil {
+ // t.Errorf("error accept connection")
+ return nil, err
+ }
+ return NewTSocketFromConnTimeout(conn, 0), nil
Review comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]