This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e1a5044da1a0a316dd70538930aee0f6c607cf02
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 29 12:05:22 2021 +0800

    Fix some test issues
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/liaison/grpc/grpc.go      | 14 +++++++-------
 banyand/liaison/grpc/grpc_test.go | 35 ++++++++++++++++++++---------------
 2 files changed, 27 insertions(+), 22 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index e84f599..58092a1 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,14 +20,15 @@ package grpc
 import (
        "context"
        "fmt"
+       "io"
+       "net"
+
        v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        flatbuffers "github.com/google/flatbuffers/go"
        grpclib "google.golang.org/grpc"
-       "io"
-       "net"
 )
 
 type Server struct {
@@ -54,7 +55,7 @@ func (s *Server) FlagSet() *run.FlagSet {
 func (s *Server) Validate() error {
        return nil
 }
-func init(){
+func init() {
        //encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
 }
 func (s *Server) Serve() error {
@@ -64,7 +65,6 @@ func (s *Server) Serve() error {
                s.log.Fatal("Failed to listen", logger.Error(err))
        }
 
-
        s.ser = 
grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
        //s.ser = grpclib.NewServer()
 
@@ -88,19 +88,19 @@ type TraceServer struct {
 func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
        for {
                writeEntity, err := TraceWriteServer.Recv()
-               fmt.Println(123, writeEntity)
                if err == io.EOF {
                        return nil
                }
                if err != nil {
                        return err
                }
+               fmt.Println(123, writeEntity)
                t.writeData = append(t.writeData, writeEntity)
                builder := flatbuffers.NewBuilder(0)
                v1.WriteResponseStart(builder)
                builder.Finish(v1.WriteResponseEnd(builder))
-               if error := TraceWriteServer.Send(builder); error != nil {
-                       return error
+               if errSend := TraceWriteServer.Send(builder); errSend != nil {
+                       return errSend
                }
                //writeEntity.Entity().Fields()
                //writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go 
b/banyand/liaison/grpc/grpc_test.go
index a4c2ee4..7f865ee 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,14 +19,15 @@ package grpc
 
 import (
        "context"
-       v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-       "github.com/apache/skywalking-banyandb/pkg/convert"
-       flatbuffers "github.com/google/flatbuffers/go"
-       "google.golang.org/grpc"
        "io"
        "log"
        "testing"
        "time"
+
+       v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       flatbuffers "github.com/google/flatbuffers/go"
+       "google.golang.org/grpc"
 )
 
 var serverAddr = "localhost:17912"
@@ -35,6 +36,7 @@ type ComponentBuilderFunc func(*flatbuffers.Builder)
 type writeEntityBuilder struct {
        *flatbuffers.Builder
 }
+
 func NewCriteriaBuilder() *writeEntityBuilder {
        return &writeEntityBuilder{
                flatbuffers.NewBuilder(1024),
@@ -77,7 +79,7 @@ func (b *writeEntityBuilder) BuildEntity(id string, binary 
[]byte, items ...inte
        }
 }
 
-func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT  
{
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT {
        var ValueTypeOffset flatbuffers.UOffsetT
        var valType v1.ValueType
        switch v := val.(type) {
@@ -159,7 +161,7 @@ func (b *writeEntityBuilder) Build(funcs 
...ComponentBuilderFunc) *v1.WriteEntit
        return v1.GetRootAsWriteEntity(buf, 0)
 }
 
-func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+func runWrite(writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
        builder := flatbuffers.NewBuilder(0)
        metaData := writeEntity.MetaData(nil)
        entityValue := writeEntity.Entity(nil)
@@ -270,7 +272,7 @@ func runWrite (writeEntity *v1.WriteEntity) 
(*flatbuffers.Builder, error) {
        position := v1.WriteEntityEnd(builder)
        builder.Finish(position)
 
-       return  builder, nil
+       return builder, nil
 }
 
 func Test_grpc_write(t *testing.T) {
@@ -281,7 +283,7 @@ func Test_grpc_write(t *testing.T) {
        defer conn.Close()
 
        client := v1.NewTraceClient(conn)
-       ctx, cancel := context.WithTimeout(context.Background(), 10 * 
time.Second)
+       ctx := context.Background()
        b := NewCriteriaBuilder()
        binary := byte(12)
        entity := b.Build(
@@ -292,26 +294,29 @@ func Test_grpc_write(t *testing.T) {
        if e != nil {
                log.Fatalf("Failed to connect: %v", err)
        }
-       defer cancel()
        stream, er := client.Write(ctx)
        if er != nil {
                log.Fatalf("%v.runWrite(_) = _, %v", client, err)
        }
+       waitc := make(chan struct{})
        go func() {
                for {
-                       writeResponse, err := stream.Recv()
-                       if err == io.EOF {
+                       writeResponse, errRecv := stream.Recv()
+                       if errRecv == io.EOF {
                                // read done.
+                               close(waitc)
                                return
                        }
-                       if err != nil {
+                       if errRecv != nil {
                                log.Fatalf("Failed to receive data : %v", err)
                        }
-                       println( writeResponse)
+                       println(writeResponse)
                }
        }()
-       if error := stream.Send(builder); error != nil {
-               log.Fatalf("Failed to send a note: %v", err)
+       if errSend := stream.Send(builder); errSend != nil {
+               log.Fatalf("Failed to send a note: %v", errSend)
        }
+
        stream.CloseSend()
+       <-waitc
 }

Reply via email to