This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/liasion in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e1a5044da1a0a316dd70538930aee0f6c607cf02 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jun 29 12:05:22 2021 +0800 Fix some test issues Signed-off-by: Gao Hongtao <[email protected]> --- banyand/liaison/grpc/grpc.go | 14 +++++++------- banyand/liaison/grpc/grpc_test.go | 35 ++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go index e84f599..58092a1 100644 --- a/banyand/liaison/grpc/grpc.go +++ b/banyand/liaison/grpc/grpc.go @@ -20,14 +20,15 @@ package grpc import ( "context" "fmt" + "io" + "net" + v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" flatbuffers "github.com/google/flatbuffers/go" grpclib "google.golang.org/grpc" - "io" - "net" ) type Server struct { @@ -54,7 +55,7 @@ func (s *Server) FlagSet() *run.FlagSet { func (s *Server) Validate() error { return nil } -func init(){ +func init() { //encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{}) } func (s *Server) Serve() error { @@ -64,7 +65,6 @@ func (s *Server) Serve() error { s.log.Fatal("Failed to listen", logger.Error(err)) } - s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{})) //s.ser = grpclib.NewServer() @@ -88,19 +88,19 @@ type TraceServer struct { func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error { for { writeEntity, err := TraceWriteServer.Recv() - fmt.Println(123, writeEntity) if err == io.EOF { return nil } if err != nil { return err } + fmt.Println(123, writeEntity) t.writeData = append(t.writeData, writeEntity) builder := flatbuffers.NewBuilder(0) v1.WriteResponseStart(builder) builder.Finish(v1.WriteResponseEnd(builder)) - if error := TraceWriteServer.Send(builder); error != nil { - return error + if errSend := TraceWriteServer.Send(builder); errSend != nil { + return errSend } //writeEntity.Entity().Fields() //writeEntity.MetaData(nil).Group() diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go index a4c2ee4..7f865ee 100644 --- a/banyand/liaison/grpc/grpc_test.go +++ b/banyand/liaison/grpc/grpc_test.go @@ -19,14 +19,15 @@ package grpc import ( "context" - v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" - "github.com/apache/skywalking-banyandb/pkg/convert" - flatbuffers "github.com/google/flatbuffers/go" - "google.golang.org/grpc" "io" "log" "testing" "time" + + v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + flatbuffers "github.com/google/flatbuffers/go" + "google.golang.org/grpc" ) var serverAddr = "localhost:17912" @@ -35,6 +36,7 @@ type ComponentBuilderFunc func(*flatbuffers.Builder) type writeEntityBuilder struct { *flatbuffers.Builder } + func NewCriteriaBuilder() *writeEntityBuilder { return &writeEntityBuilder{ flatbuffers.NewBuilder(1024), @@ -77,7 +79,7 @@ func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...inte } } -func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT { +func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT { var ValueTypeOffset flatbuffers.UOffsetT var valType v1.ValueType switch v := val.(type) { @@ -159,7 +161,7 @@ func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntit return v1.GetRootAsWriteEntity(buf, 0) } -func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) { +func runWrite(writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) { builder := flatbuffers.NewBuilder(0) metaData := writeEntity.MetaData(nil) entityValue := writeEntity.Entity(nil) @@ -270,7 +272,7 @@ func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) { position := v1.WriteEntityEnd(builder) builder.Finish(position) - return builder, nil + return builder, nil } func Test_grpc_write(t *testing.T) { @@ -281,7 +283,7 @@ func Test_grpc_write(t *testing.T) { defer conn.Close() client := v1.NewTraceClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second) + ctx := context.Background() b := NewCriteriaBuilder() binary := byte(12) entity := b.Build( @@ -292,26 +294,29 @@ func Test_grpc_write(t *testing.T) { if e != nil { log.Fatalf("Failed to connect: %v", err) } - defer cancel() stream, er := client.Write(ctx) if er != nil { log.Fatalf("%v.runWrite(_) = _, %v", client, err) } + waitc := make(chan struct{}) go func() { for { - writeResponse, err := stream.Recv() - if err == io.EOF { + writeResponse, errRecv := stream.Recv() + if errRecv == io.EOF { // read done. + close(waitc) return } - if err != nil { + if errRecv != nil { log.Fatalf("Failed to receive data : %v", err) } - println( writeResponse) + println(writeResponse) } }() - if error := stream.Send(builder); error != nil { - log.Fatalf("Failed to send a note: %v", err) + if errSend := stream.Send(builder); errSend != nil { + log.Fatalf("Failed to send a note: %v", errSend) } + stream.CloseSend() + <-waitc }
