buptubuntu commented on a change in pull request #2497:
URL: https://github.com/apache/thrift/pull/2497#discussion_r780645102



##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
        }
        if client != nil {
                p.wg.Add(1)
+               p.clients[client] = struct{}{}
                go func() {
+                       defer func() {
+                               p.mu.Lock()
+                               defer p.mu.Unlock()
+                               delete(p.clients, client)

Review comment:
       I have tested the mem the goroutine cost using the code below:
   ```golang
   package main
   
   import (
        "context"
        "fmt"
   
        "runtime"
        "sync"
   )
   
   func main() {
        memConsumed := func() uint64 {
                runtime.GC()
                var s runtime.MemStats
                runtime.ReadMemStats(&s)
                return s.Sys
        }
   
        var channel <-chan interface{}
        var wg sync.WaitGroup
   
        const numGoroutines = 1000000 // 1M
        wg.Add(numGoroutines)
   
        before := memConsumed()
        for i := numGoroutines; i > 0; i-- {
                ctx, cancel := context.WithCancel(context.Background())
                go func() {
                        wg.Done()
                        cancel()
                        <-ctx.Done()
                        <-channel
   
                }()
        }
        wg.Wait()
        after := memConsumed()
   
        fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
   }
   ```
   
   cost as below:
   <html xmlns:v="urn:schemas-microsoft-com:vml"
   xmlns:o="urn:schemas-microsoft-com:office:office"
   xmlns:x="urn:schemas-microsoft-com:office:excel"
   xmlns="http://www.w3.org/TR/REC-html40";>
   
   <head>
   
   <meta name=ProgId content=Excel.Sheet>
   <meta name=Generator content="Microsoft Excel 15">
   <link id=Main-File rel=Main-File
   
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip.htm">
   <link rel=File-List
   
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip_filelist.xml">
   </head>
   
   <body link="#0563C1" vlink="#954F72">
   
   
   
   goroutines | total mem | mem per goroutine
   -- | -- | --
   10000 | 35M | 3.5K
   1000 | 5.3M | 5.3K
   
   
   
   </body>
   
   </html>
   

##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
        }
        if client != nil {
                p.wg.Add(1)
+               p.clients[client] = struct{}{}
                go func() {
+                       defer func() {
+                               p.mu.Lock()
+                               defer p.mu.Unlock()
+                               delete(p.clients, client)

Review comment:
       I have also tested the cost per client using code below:
   
   ```golang
   package main
   
   import (
        "context"
        "fmt"
        "net"
        _ "net/http/pprof"
        "runtime"
        "strings"
        "time"
   
        "github.com/apache/thrift/lib/go/thrift"
   )
   
   func main() {
        memConsumed := func() uint64 {
                runtime.GC()
                var s runtime.MemStats
                runtime.ReadMemStats(&s)
                return s.Sys
        }
   
        ln, _ := net.Listen("tcp", "localhost:0")
   
        proc := &mockProcessor{
                ProcessFunc: func(in, out thrift.TProtocol) (bool, 
thrift.TException) {
                        in.ReadMessageBegin(context.Background())
                        time.Sleep(time.Hour)
                        return false, nil
                },
        }
   
        trans := &mockServerTransport{
                ListenFunc: func() error {
                        return nil
                },
                AcceptFunc: func() (thrift.TTransport, error) {
                        conn, err := ln.Accept()
                        if err != nil {
                                // t.Errorf("error accept connection")
                                return nil, err
                        }
                        return thrift.NewTSocketFromConnTimeout(conn, 0), nil
                },
                CloseFunc: func() error {
                        return nil
                },
                InterruptFunc: func() error {
                        return ln.Close()
                },
        }
   
        serv := thrift.NewTSimpleServer2(proc, trans)
        go serv.Serve()
        time.Sleep(time.Second)
        const numGoroutines = 1000
        conns := make([]net.Conn, 0, numGoroutines)
        before := memConsumed()
   
        for i := 0; i < numGoroutines; i++ {
                port := strings.Split(ln.Addr().String(), ":")[1]
                netConn, _ := net.Dial("tcp", "localhost:"+port)
                time.Sleep(time.Millisecond)
                conns = append(conns, netConn)
        }
   
        after := memConsumed()
   
        time.Sleep(time.Second)
        fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
   }
   
   type mockProcessor struct {
        ProcessFunc func(in, out thrift.TProtocol) (bool, thrift.TException)
   }
   
   func (m *mockProcessor) Process(ctx context.Context, in, out 
thrift.TProtocol) (bool, thrift.TException) {
        return m.ProcessFunc(in, out)
   }
   
   func (m *mockProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
        return map[string]thrift.TProcessorFunction{
                "mock": thrift.WrappedTProcessorFunction{
                        Wrapped: func(ctx context.Context, seqId int32, in, out 
thrift.TProtocol) (bool, thrift.TException) {
                                return m.ProcessFunc(in, out)
                        },
                },
        }
   }
   
   func (m *mockProcessor) AddToProcessorMap(name string, processorFunc 
thrift.TProcessorFunction) {}
   
   type mockWrappedProcessorContextKey int
   
   const (
        processorName mockWrappedProcessorContextKey = iota
   )
   
   // setMockWrappableProcessorName sets the "name" of the TProcessorFunction to
   // call on a mockWrappableProcessor when calling Process.
   //
   // In a normal TProcessor, the request name is read from the request itself
   // which happens in TProcessor.Process, so it is not passed into the call to
   // Process itself, to get around this in testing, mockWrappableProcessor 
calls
   // getMockWrappableProcessorName  to get the name to use from the context
   // object.
   func setMockWrappableProcessorName(ctx context.Context, name string) 
context.Context {
        return context.WithValue(ctx, processorName, name)
   }
   
   // getMockWrappableProcessorName gets the "name" of the TProcessorFunction to
   // call on a mockWrappableProcessor when calling Process.
   func getMockWrappableProcessorName(ctx context.Context) (string, bool) {
        val, ok := ctx.Value(processorName).(string)
        return val, ok
   }
   
   // mockWrappableProcessor can be used to create a mock object that fufills 
the
   // TProcessor interface in testing.
   type mockWrappableProcessor struct {
        ProcessorFuncs map[string]thrift.TProcessorFunction
   }
   
   // Process calls the TProcessorFunction assigned to the "name" set on the
   // context object by setMockWrappableProcessorName.
   //
   // If no name is set on the context or there is no TProcessorFunction mapped 
to
   // that name, the call will panic.
   func (p *mockWrappableProcessor) Process(ctx context.Context, in, out 
thrift.TProtocol) (bool, thrift.TException) {
        name, ok := getMockWrappableProcessorName(ctx)
        if !ok {
                panic("MockWrappableProcessorName not set on context")
        }
        processor, ok := p.ProcessorMap()[name]
        if !ok {
                panic(fmt.Sprintf("No processor set for name %q", name))
        }
        return processor.Process(ctx, 0, in, out)
   }
   
   func (p *mockWrappableProcessor) ProcessorMap() 
map[string]thrift.TProcessorFunction {
        return p.ProcessorFuncs
   }
   
   func (p *mockWrappableProcessor) AddToProcessorMap(name string, 
processorFunc thrift.TProcessorFunction) {
        p.ProcessorFuncs[name] = processorFunc
   }
   
   var (
        _ thrift.TProcessor = (*mockProcessor)(nil)
        _ thrift.TProcessor = (*mockWrappableProcessor)(nil)
   )
   
   type mockServerTransport struct {
        ListenFunc    func() error
        AcceptFunc    func() (thrift.TTransport, error)
        CloseFunc     func() error
        InterruptFunc func() error
   }
   
   func (m *mockServerTransport) Listen() error {
        return m.ListenFunc()
   }
   
   func (m *mockServerTransport) Accept() (thrift.TTransport, error) {
        return m.AcceptFunc()
   }
   
   func (m *mockServerTransport) Close() error {
        return m.CloseFunc()
   }
   
   func (m *mockServerTransport) Interrupt() error {
        return m.InterruptFunc()
   }
   
   type mockTTransport struct {
        thrift.TTransport
   }
   
   func (m *mockTTransport) Close() error {
        return nil
   }
   ```
   cost as below:
   
   <html xmlns:v="urn:schemas-microsoft-com:vml"
   xmlns:o="urn:schemas-microsoft-com:office:office"
   xmlns:x="urn:schemas-microsoft-com:office:excel"
   xmlns="http://www.w3.org/TR/REC-html40";>
   
   <head>
   
   <meta name=ProgId content=Excel.Sheet>
   <meta name=Generator content="Microsoft Excel 15">
   <link id=Main-File rel=Main-File
   
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip.htm">
   <link rel=File-List
   
href="file:////Users/buptubuntu/Library/Group%20Containers/UBF8T346G9.Office/TemporaryItems/msohtmlclip/clip_filelist.xml">
   </head>
   
   <body link="#0563C1" vlink="#954F72">
   
   
   
   clients | total mem | mem per client
   -- | -- | --
   10000 | 157M | 15.7K
   1000 | 24M | 24K
   
   
   
   </body>
   
   </html>
   

##########
File path: lib/go/thrift/simple_server.go
##########
@@ -193,7 +195,13 @@ func (p *TSimpleServer) innerAccept() (int32, error) {
        }
        if client != nil {
                p.wg.Add(1)
+               p.clients[client] = struct{}{}
                go func() {
+                       defer func() {
+                               p.mu.Lock()
+                               defer p.mu.Unlock()
+                               delete(p.clients, client)

Review comment:
       For the cpu cost, as the operation of the additional goroutine is quite 
simple(close the client) and we just move it from the stop function,i think we 
can just ignore that

##########
File path: lib/go/thrift/simple_server.go
##########
@@ -234,7 +250,9 @@ func (p *TSimpleServer) Stop() error {
        }
        atomic.StoreInt32(&p.closed, 1)
        p.serverTransport.Interrupt()
+       close(p.stopChan)
        p.wg.Wait()
+       p.stopChan = make(chan struct{})

Review comment:
       Because is stopped server can serve later, and the closed channel can 
not be closed again, so we have to make a new channel

##########
File path: lib/go/thrift/socket.go
##########
@@ -195,8 +195,9 @@ func (p *TSocket) IsOpen() bool {
 // Closes the socket.
 func (p *TSocket) Close() error {
        // Close the socket
-       if p.conn != nil {
-               err := p.conn.Close()
+       conn := p.conn

Review comment:
       go test --race ./lib/go/thrift/...
   ok      github.com/apache/thrift/lib/go/thrift  2.343s
   
   seems ok

##########
File path: lib/go/thrift/socket.go
##########
@@ -195,8 +195,9 @@ func (p *TSocket) IsOpen() bool {
 // Closes the socket.
 func (p *TSocket) Close() error {
        // Close the socket
-       if p.conn != nil {
-               err := p.conn.Close()
+       conn := p.conn

Review comment:
       go test --race ./lib/go/thrift/...
   ok      github.com/apache/thrift/lib/go/thrift  2.343s
   
   seems ok

##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func 
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
        runtime.Gosched()
        serv.Stop()
 }
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               t.Errorf("error when listen")

Review comment:
       done

##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func 
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
        runtime.Gosched()
        serv.Stop()
 }
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               t.Errorf("error when listen")
+       }
+       proc := &mockProcessor{
+               ProcessFunc: func(in, out TProtocol) (bool, TException) {
+                       in.ReadMessageBegin(context.Background())
+                       return false, nil
+               },
+       }
+
+       trans := &mockServerTransport{
+               ListenFunc: func() error {
+                       return nil
+               },
+               AcceptFunc: func() (TTransport, error) {
+                       conn, err := ln.Accept()
+                       if err != nil {
+                               // t.Errorf("error accept connection")

Review comment:
       done

##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func 
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
        runtime.Gosched()
        serv.Stop()
 }
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               t.Errorf("error when listen")
+       }
+       proc := &mockProcessor{
+               ProcessFunc: func(in, out TProtocol) (bool, TException) {
+                       in.ReadMessageBegin(context.Background())
+                       return false, nil
+               },
+       }
+
+       trans := &mockServerTransport{
+               ListenFunc: func() error {
+                       return nil
+               },
+               AcceptFunc: func() (TTransport, error) {
+                       conn, err := ln.Accept()
+                       if err != nil {
+                               // t.Errorf("error accept connection")
+                               return nil, err
+                       }
+                       return NewTSocketFromConnTimeout(conn, 0), nil
+               },
+               CloseFunc: func() error {
+                       return nil
+               },
+               InterruptFunc: func() error {
+                       return ln.Close()
+               },
+       }
+
+       serv := NewTSimpleServer2(proc, trans)
+       go serv.Serve()
+
+       port := strings.Split(ln.Addr().String(), ":")[1]
+       netConn, err := net.Dial("tcp", "localhost:"+port)

Review comment:
       done

##########
File path: lib/go/thrift/simple_server_test.go
##########
@@ -154,3 +158,50 @@ func 
TestNoHangDuringStopFromDanglingLockAcquireDuringAcceptLoop(t *testing.T) {
        runtime.Gosched()
        serv.Stop()
 }
+
+func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) {
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               t.Errorf("error when listen")
+       }
+       proc := &mockProcessor{
+               ProcessFunc: func(in, out TProtocol) (bool, TException) {
+                       in.ReadMessageBegin(context.Background())
+                       return false, nil
+               },
+       }
+
+       trans := &mockServerTransport{
+               ListenFunc: func() error {
+                       return nil
+               },
+               AcceptFunc: func() (TTransport, error) {
+                       conn, err := ln.Accept()
+                       if err != nil {
+                               // t.Errorf("error accept connection")
+                               return nil, err
+                       }
+                       return NewTSocketFromConnTimeout(conn, 0), nil

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to