This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/liasion in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit dfddd49875f8d4750b2f3fa061b12b231dfd0e57 Author: Qiuxia Fan <[email protected]> AuthorDate: Mon Jun 28 18:08:55 2021 +0800 fix runWrite --- api/fbs/Makefile | 2 +- api/fbs/v1/Trace_grpc.go | 164 ++++++++++++++++++++ banyand/liaison/grpc/grpc.go | 169 +++++++-------------- banyand/liaison/grpc/grpc_test.go | 305 ++++++++++++++++++++++++++++++++++---- banyand/liaison/liaison.go | 1 + banyand/series/service.go | 8 +- pkg/convert/number.go | 8 + 7 files changed, 513 insertions(+), 144 deletions(-) diff --git a/api/fbs/Makefile b/api/fbs/Makefile index 660aae1..825c495 100644 --- a/api/fbs/Makefile +++ b/api/fbs/Makefile @@ -21,4 +21,4 @@ fbs := $(wildcard $(VERSION)/*.fbs) .PHONY: generate generate: $(fbs) - flatc --go --gen-onefile --go-namespace $(VERSION) -o $(VERSION) $^ + flatc --go --gen-onefile --grpc --go-namespace $(VERSION) -o $(VERSION) $^ diff --git a/api/fbs/v1/Trace_grpc.go b/api/fbs/v1/Trace_grpc.go new file mode 100644 index 0000000..7d2cb69 --- /dev/null +++ b/api/fbs/v1/Trace_grpc.go @@ -0,0 +1,164 @@ +//Generated by gRPC Go plugin +//If you make any local changes, they will be lost +//source: rpc + +package v1 + +import ( + context "context" + + flatbuffers "github.com/google/flatbuffers/go" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Client API for Trace service +type TraceClient interface { + Query(ctx context.Context, in *flatbuffers.Builder, + opts ...grpc.CallOption) (*QueryResponse, error) + Write(ctx context.Context, + opts ...grpc.CallOption) (Trace_WriteClient, error) +} + +type traceClient struct { + cc grpc.ClientConnInterface +} + +func NewTraceClient(cc grpc.ClientConnInterface) TraceClient { + return &traceClient{cc} +} + +func (c *traceClient) Query(ctx context.Context, in *flatbuffers.Builder, + opts ...grpc.CallOption) (*QueryResponse, error) { + out := new(QueryResponse) + err := c.cc.Invoke(ctx, "/banyandb.v1.Trace/Query", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *traceClient) Write(ctx context.Context, + opts ...grpc.CallOption) (Trace_WriteClient, error) { + stream, err := c.cc.NewStream(ctx, &_Trace_serviceDesc.Streams[0], "/banyandb.v1.Trace/Write", opts...) + if err != nil { + return nil, err + } + x := &traceWriteClient{stream} + return x, nil +} + +type Trace_WriteClient interface { + Send(*flatbuffers.Builder) error + Recv() (*WriteResponse, error) + grpc.ClientStream +} + +type traceWriteClient struct { + grpc.ClientStream +} + +func (x *traceWriteClient) Send(m *flatbuffers.Builder) error { + return x.ClientStream.SendMsg(m) +} + +func (x *traceWriteClient) Recv() (*WriteResponse, error) { + m := new(WriteResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Trace service +type TraceServer interface { + Query(context.Context, *EntityCriteria) (*flatbuffers.Builder, error) + Write(Trace_WriteServer) error + mustEmbedUnimplementedTraceServer() +} + +type UnimplementedTraceServer struct { +} + +func (UnimplementedTraceServer) Query(context.Context, *EntityCriteria) (*flatbuffers.Builder, error) { + return nil, status.Errorf(codes.Unimplemented, "method Query not implemented") +} + +func (UnimplementedTraceServer) Write(Trace_WriteServer) error { + return status.Errorf(codes.Unimplemented, "method Write not implemented") +} + +func (UnimplementedTraceServer) mustEmbedUnimplementedTraceServer() {} + +type UnsafeTraceServer interface { + mustEmbedUnimplementedTraceServer() +} + +func RegisterTraceServer(s grpc.ServiceRegistrar, srv TraceServer) { + s.RegisterService(&_Trace_serviceDesc, srv) +} + +func _Trace_Query_Handler(srv interface{}, ctx context.Context, + dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EntityCriteria) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TraceServer).Query(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/banyandb.v1.Trace/Query", + } + + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TraceServer).Query(ctx, req.(*EntityCriteria)) + } + return interceptor(ctx, in, info, handler) +} +func _Trace_Write_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TraceServer).Write(&traceWriteServer{stream}) +} + +type Trace_WriteServer interface { + Send(*flatbuffers.Builder) error + Recv() (*WriteEntity, error) + grpc.ServerStream +} + +type traceWriteServer struct { + grpc.ServerStream +} + +func (x *traceWriteServer) Send(m *flatbuffers.Builder) error { + return x.ServerStream.SendMsg(m) +} + +func (x *traceWriteServer) Recv() (*WriteEntity, error) { + m := new(WriteEntity) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Trace_serviceDesc = grpc.ServiceDesc{ + ServiceName: "banyandb.v1.Trace", + HandlerType: (*TraceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Query", + Handler: _Trace_Query_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Write", + Handler: _Trace_Write_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, +} diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go index 0e82346..4156efb 100644 --- a/banyand/liaison/grpc/grpc.go +++ b/banyand/liaison/grpc/grpc.go @@ -20,18 +20,15 @@ package grpc import ( "context" "fmt" - "net" - "strconv" - "time" - - flatbuffers "github.com/google/flatbuffers/go" - grpclib "google.golang.org/grpc" - "google.golang.org/grpc/encoding" - v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" + flatbuffers "github.com/google/flatbuffers/go" + grpclib "google.golang.org/grpc" + "io" + "net" + "sync" ) type Server struct { @@ -39,6 +36,7 @@ type Server struct { log *logger.Logger ser *grpclib.Server pipeline queue.Queue + writeEntity *v1.WriteEntity } func NewServer(ctx context.Context, pipeline queue.Queue) *Server { @@ -58,7 +56,9 @@ func (s *Server) FlagSet() *run.FlagSet { func (s *Server) Validate() error { return nil } - +func init(){ + //encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{}) +} func (s *Server) Serve() error { s.log = logger.GetLogger("grpc") lis, err := net.Listen("tcp", s.addr) @@ -66,8 +66,11 @@ func (s *Server) Serve() error { s.log.Fatal("Failed to listen", logger.Error(err)) } - encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{}) - s.ser = grpclib.NewServer() + + s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{})) + //s.ser = grpclib.NewServer() + + v1.RegisterTraceServer(s.ser, &TraceServer{}) return s.ser.Serve(lis) } @@ -77,113 +80,49 @@ func (s *Server) GracefulStop() { s.ser.GracefulStop() } -func WriteTraces(writeEntity *v1.WriteEntity) []byte { - fmt.Println("Write called...") - builder := flatbuffers.NewBuilder(0) - metaData := writeEntity.MetaData(nil) - entityValue := writeEntity.Entity(nil) - // Serialize MetaData - group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name())) - v1.MetadataStart(builder) - v1.MetadataAddGroup(builder, group) - v1.MetadataAddName(builder, name) - v1.MetadataEnd(builder) - // Serialize Fields - v1.FieldStart(builder) - var fieldList []flatbuffers.UOffsetT - for i := 0; i < entityValue.FieldsLength(); i++ { - var f v1.Field - var str string - if ok := entityValue.Fields(&f, i); ok { - unionValueType := new(flatbuffers.Table) - if f.Value(unionValueType) { - valueType := f.ValueType() - if valueType == v1.ValueTypeString { - unionStr := new(v1.String) - unionStr.Init(unionValueType.Bytes, unionValueType.Pos) - v1.FieldAddValueType(builder, v1.ValueTypeString) - str = string(unionStr.Value()) - } else if valueType == v1.ValueTypeInt { - unionInt := new(v1.Int) - unionInt.Init(unionValueType.Bytes, unionValueType.Pos) - v1.FieldAddValueType(builder, v1.ValueTypeInt) - field := flatbuffers.UOffsetT(unionInt.Value()) - v1.FieldAddValue(builder, field) - fieldList = append(fieldList, field) - } else if valueType == v1.ValueTypeStringArray { - unionStrArray := new(v1.StringArray) - unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos) - l := unionStrArray.ValueLength() - if l == 1 { - str += string(unionStrArray.Value(0)) - } else { - str += "[" - for j := 0; j < l; j++ { - str += string(unionStrArray.Value(j)) - } - str += "]" - } - } else if valueType == v1.ValueTypeIntArray { - unionIntArray := new(v1.IntArray) - unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos) - l := unionIntArray.ValueLength() - if l == 1 { - str += strconv.FormatInt(unionIntArray.Value(0), 10) - } else if l > 1{ - str += "[" - for j := 0; j < l; j++ { - str += strconv.FormatInt(unionIntArray.Value(j), 10) - } - str += "]" - } - } - if valueType == v1.ValueTypeIntArray || valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString { - field := builder.CreateString(str) - v1.FieldAddValue(builder, field) - fieldList = append(fieldList, field) - } - } - } - } - v1.FieldEnd(builder) - // Serialize EntityValue - dataBinaryLength := entityValue.DataBinaryLength() - v1.EntityStartDataBinaryVector(builder, dataBinaryLength) - for i := dataBinaryLength; i >= 0; i-- { - builder.PrependByte(byte(i)) - } - dataBinary := builder.EndVector(dataBinaryLength) - entityId := builder.CreateString(string(entityValue.EntityId())) - v1.EntityValueStart(builder) - v1.EntityValueAddEntityId(builder, entityId) - time := uint64(time.Now().UnixNano()) - v1.EntityValueAddTimestampNanoseconds(builder, time) - v1.EntityValueAddDataBinary(builder, dataBinary) - v1.EntityValueStartFieldsVector(builder, len(fieldList)) - for val := range fieldList { - builder.PrependUOffsetT(flatbuffers.UOffsetT(val)) - } - fields := builder.EndVector(len(fieldList)) - v1.EntityValueAddFields(builder, fields) - v1.EntityValueEnd(builder) - trace := v1.WriteEntityEnd(builder) - - builder.Finish(trace) +//var _ gomock.TestHelper = (*TraceServer)(nil) - return builder.Bytes[builder.Head():] +type TraceServer struct { + v1.UnimplementedTraceServer + writeData []*v1.WriteEntity + mu sync.Mutex } -type ComponentBuilderFunc func(*flatbuffers.Builder) - -func ReadTraces(funcs ...ComponentBuilderFunc) *v1.Entity { - b := flatbuffers.NewBuilder(1024) - v1.EntityStart(b) - for _, fun := range funcs { - fun(b) +func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error { + for { + writeEntity, err := TraceWriteServer.Recv() + fmt.Println(writeEntity) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + t.writeData = append(t.writeData, writeEntity) + builder := flatbuffers.NewBuilder(0) + v1.WriteResponseStart(builder) + builder.Finish(v1.WriteResponseEnd(builder)) + if err := TraceWriteServer.Send(builder); err != nil { + return err + } + //writeEntity.Entity().Fields() + //writeEntity.MetaData(nil).Group() + //serviceID+instanceID + //seriesID := hash(fieds, f1, f2) + //shardID := shardingFunc(seriesID, shardNum) + //queue + //for _, l := range t.writeData { + // if err := TraceWriteServer.Send(l); err != nil { + // return err + // } + //} } - entityOffset := v1.EntityEnd(b) - b.Finish(entityOffset) +} + +func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) (*flatbuffers.Builder, error) { + b := flatbuffers.NewBuilder(0) + v1.EntityCriteriaStart(b) + b.Finish(v1.EntityCriteriaEnd(b)) - buf := b.Bytes[b.Head():] - return v1.GetRootAsEntity(buf, 0) + return b, nil } diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go index a5bf5ca..b7d1ef1 100644 --- a/banyand/liaison/grpc/grpc_test.go +++ b/banyand/liaison/grpc/grpc_test.go @@ -15,28 +15,140 @@ // specific language governing permissions and limitations // under the License. -package grpc_test +package grpc import ( - flatbuffers "github.com/google/flatbuffers/go" - + "context" + "fmt" v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/stretchr/testify/assert" + "github.com/apache/skywalking-banyandb/pkg/convert" + flatbuffers "github.com/google/flatbuffers/go" + "google.golang.org/grpc" + "io" + "log" "testing" + "time" ) +var serverAddr = "localhost:17912" + type ComponentBuilderFunc func(*flatbuffers.Builder) -type criteriaBuilder struct { +type writeEntityBuilder struct { *flatbuffers.Builder } -func NewCriteriaBuilder() *criteriaBuilder { - return &criteriaBuilder{ +func NewCriteriaBuilder() *writeEntityBuilder { + return &writeEntityBuilder{ flatbuffers.NewBuilder(1024), } } +func (b *writeEntityBuilder) BuildMetaData(group, name string) ComponentBuilderFunc { + g, n := b.Builder.CreateString(group), b.Builder.CreateString(name) + v1.MetadataStart(b.Builder) + v1.MetadataAddGroup(b.Builder, g) + v1.MetadataAddName(b.Builder, n) + metadata := v1.MetadataEnd(b.Builder) + return func(b *flatbuffers.Builder) { + v1.WriteEntityAddMetaData(b, metadata) + } +} + +func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...interface{}) ComponentBuilderFunc { + entityId := b.Builder.CreateString(id) + binaryOffset := b.Builder.CreateByteVector(binary) + l := len(items) + var fieldOffsets []flatbuffers.UOffsetT + for i := 0; i < l; i++ { + o := b.BuildField(items[i]) + fieldOffsets = append(fieldOffsets, o) + } + v1.EntityValueStartFieldsVector(b.Builder, len(fieldOffsets)) + for i := 0; i < len(fieldOffsets); i++ { + b.PrependUOffsetT(fieldOffsets[i]) + } + fields := b.EndVector(len(fieldOffsets)) + v1.EntityValueStart(b.Builder) + v1.EntityValueAddEntityId(b.Builder, entityId) + t := uint64(time.Now().UnixNano()) + v1.EntityValueAddTimestampNanoseconds(b.Builder, t) + v1.EntityValueAddDataBinary(b.Builder, binaryOffset) + v1.EntityValueAddFields(b.Builder, fields) + entity := v1.EntityValueEnd(b.Builder) + return func(b *flatbuffers.Builder) { + v1.WriteEntityAddEntity(b, entity) + } +} + +func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT { + var ValueTypeOffset flatbuffers.UOffsetT + var valType v1.ValueType + switch v := val.(type) { + case int: + ValueTypeOffset = b.buildInt(int64(v)) + valType = v1.ValueTypeInt + case []int: + ValueTypeOffset = b.buildInt(convert.IntToInt64(v...)...) + valType = v1.ValueTypeIntArray + case int64: + ValueTypeOffset = b.buildInt(v) + valType = v1.ValueTypeInt + case []int64: + ValueTypeOffset = b.buildInt(v...) + valType = v1.ValueTypeIntArray + case string: + ValueTypeOffset = b.buildStrValueType(v) + valType = v1.ValueTypeString + case []string: + ValueTypeOffset = b.buildStrValueType(v...) + valType = v1.ValueTypeStringArray + default: + panic("not supported values") + } + + v1.FieldStart(b.Builder) + v1.FieldAddValue(b.Builder, ValueTypeOffset) + v1.FieldAddValueType(b.Builder, valType) + return v1.FieldEnd(b.Builder) +} + +func (b *writeEntityBuilder) buildStrValueType(values ...string) flatbuffers.UOffsetT { + var strOffsets []flatbuffers.UOffsetT + for i := 0; i < len(values); i++ { + strOffsets = append(strOffsets, b.CreateString(values[i])) + } + v1.StringArrayStartValueVector(b.Builder, len(values)) + for i := 0; i < len(strOffsets); i++ { + b.Builder.PrependUOffsetT(strOffsets[i]) + } + int64Arr := b.Builder.EndVector(len(values)) + v1.IntArrayStart(b.Builder) + v1.IntArrayAddValue(b.Builder, int64Arr) + return v1.IntArrayEnd(b.Builder) +} + +func (b *writeEntityBuilder) buildInt(values ...int64) flatbuffers.UOffsetT { + v1.IntArrayStartValueVector(b.Builder, len(values)) + for i := 0; i < len(values); i++ { + b.Builder.PrependInt64(values[i]) + } + int64Arr := b.Builder.EndVector(len(values)) -func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity { + v1.IntArrayStart(b.Builder) + v1.IntArrayAddValue(b.Builder, int64Arr) + return v1.IntArrayEnd(b.Builder) +} + +func (b *writeEntityBuilder) BuildDataBinary(binary []byte) flatbuffers.UOffsetT { + dataBinaryLength := len(binary) + v1.EntityStartDataBinaryVector(b.Builder, dataBinaryLength) + for i := dataBinaryLength; i >= 0; i-- { + b.Builder.PrependByte(byte(i)) + } + dataBinaryOffset := b.Builder.EndVector(dataBinaryLength) + + return dataBinaryOffset +} + +func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity { v1.WriteEntityStart(b.Builder) for _, fun := range funcs { fun(b.Builder) @@ -47,21 +159,164 @@ func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity { buf := b.Bytes[b.Head():] return v1.GetRootAsWriteEntity(buf, 0) } -func BenchmarkWriteTraces(t *testing.T) { - tester := assert.New(t) - builder := NewCriteriaBuilder() - entity := builder.Build( - ) - res := grpc.WriteTraces(entity) - //tester.NoError(err) - tester.NotNil(res) - //tester.NoError(plan.Validate()) + +func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) { + builder := flatbuffers.NewBuilder(0) + metaData := writeEntity.MetaData(nil) + entityValue := writeEntity.Entity(nil) + // Serialize MetaData + group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name())) + v1.MetadataStart(builder) + v1.MetadataAddGroup(builder, group) + v1.MetadataAddName(builder, name) + v1.MetadataEnd(builder) + // Serialize Fields + var fieldList []flatbuffers.UOffsetT + for i := 0; i < 1; i++ { + var field v1.Field + var str string + if ok := entityValue.Fields(&field, i); ok { + unionValueType := new(flatbuffers.Table) + if field.Value(unionValueType) { + valueType := field.ValueType() + if valueType == v1.ValueTypeString { + unionStr := new(v1.String) + unionStr.Init(unionValueType.Bytes, unionValueType.Pos) + v1.FieldStart(builder) + v1.FieldAddValueType(builder, v1.ValueTypeString) + v1.FieldEnd(builder) + str = string(unionStr.Value()) + f := builder.CreateString(str) + fieldList = append(fieldList, f) + } else if valueType == v1.ValueTypeInt { + unionInt := new(v1.Int) + unionInt.Init(unionValueType.Bytes, unionValueType.Pos) + v1.FieldStart(builder) + v1.FieldAddValueType(builder, v1.ValueTypeInt) + v1.FieldEnd(builder) + f := flatbuffers.UOffsetT(unionInt.Value()) + //v1.IntAddValue(builder, int64(f)) + fieldList = append(fieldList, f) + } else if valueType == v1.ValueTypeStringArray { + unionStrArray := new(v1.StringArray) + unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos) + v1.FieldStart(builder) + v1.FieldAddValueType(builder, v1.ValueTypeStringArray) + v1.FieldEnd(builder) + l := unionStrArray.ValueLength() + var offsets []flatbuffers.UOffsetT + for j := 0; j < l; j++ { + v := builder.CreateString(string(unionStrArray.Value(j))) + v1.StringArrayStart(builder) + v1.StringArrayAddValue(builder, v) + offset := v1.StringArrayEnd(builder) + offsets = append(offsets, offset) + } + v1.StringArrayStartValueVector(builder, l) + for o := range offsets { + builder.PrependUOffsetT(flatbuffers.UOffsetT(o)) + } + f := builder.EndVector(l) + fieldList = append(fieldList, f) + } else if valueType == v1.ValueTypeIntArray { + unionIntArray := new(v1.IntArray) + unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos) + v1.FieldStart(builder) + v1.FieldAddValueType(builder, v1.ValueTypeIntArray) + v1.FieldEnd(builder) + l := unionIntArray.ValueLength() + var offsets []flatbuffers.UOffsetT + for j := 0; j < l; j++ { + v1.IntArrayStart(builder) + v1.IntArrayAddValue(builder, flatbuffers.UOffsetT(unionIntArray.Value(j))) + offset := v1.StringArrayEnd(builder) + offsets = append(offsets, offset) + } + v1.IntArrayStartValueVector(builder, len(offsets)) + for o := range offsets { + builder.PrependUOffsetT(flatbuffers.UOffsetT(o)) + } + f := builder.EndVector(len(offsets)) + fieldList = append(fieldList, f) + } + } + } + } + v1.FieldStart(builder) + for field := range fieldList { + v1.FieldAddValue(builder, flatbuffers.UOffsetT(field)) + } + v1.FieldEnd(builder) + // Serialize EntityValue + dataBinaryLength := 10 + v1.EntityStartDataBinaryVector(builder, dataBinaryLength) + for i := dataBinaryLength; i >= 0; i-- { + builder.PrependByte(byte(i)) + } + dataBinaryP := builder.EndVector(dataBinaryLength) + v1.EntityValueStartFieldsVector(builder, len(fieldList)) + for val := range fieldList { + builder.PrependUOffsetT(flatbuffers.UOffsetT(val)) + } + fieldsP := builder.EndVector(len(fieldList)) + entityId := builder.CreateString(string(entityValue.EntityId())) + v1.EntityValueStart(builder) + v1.EntityValueAddEntityId(builder, entityId) + time := uint64(time.Now().UnixNano()) + v1.EntityValueAddTimestampNanoseconds(builder, time) + v1.EntityValueAddDataBinary(builder, dataBinaryP) + v1.EntityValueAddFields(builder, fieldsP) + v1.EntityValueEnd(builder) + v1.WriteEntityStart(builder) + position := v1.WriteEntityEnd(builder) + builder.Finish(position) + + return builder, nil } -func BenchmarkReadTraces(t *testing.T) { - tester := assert.New(t) - builder := NewCriteriaBuilder() - entity := builder.Build() - res := grpc.ReadTraces(entity) - tester.NotNil(res) -} \ No newline at end of file +func Test_grpc_write(t *testing.T) { + conn, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.CustomCodecCallOption{Codec: flatbuffers.FlatbuffersCodec{}})) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + client := v1.NewTraceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + b := NewCriteriaBuilder() + binary := byte(123) + entity := b.Build( + b.BuildEntity("entityId", []byte{binary}, "service_name", "endpoint_id"), + b.BuildMetaData("default", "trace"), + ) + fmt.Println(entity) + builder, e := runWrite(entity) + if e != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer cancel() + stream, er := client.Write(ctx) + if er != nil { + log.Fatalf("%v.runWrite(_) = _, %v", client, err) + } + //waitc := make(chan struct{}) + go func() { + for { + writeResponse, err := stream.Recv() + if err == io.EOF { + // read done. + //close(waitc) + return + } + if err != nil { + log.Fatalf("Failed to receive data : %v", err) + } + println( writeResponse) + } + }() + if error := stream.Send(builder); error != nil { + log.Fatalf("Failed to send a note: %v", err) + } + stream.CloseSend() + //<-waitc +} diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index 76e8d14..117b361 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -19,6 +19,7 @@ package liaison import ( "context" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/run" diff --git a/banyand/series/service.go b/banyand/series/service.go index a62c7d0..beb3361 100644 --- a/banyand/series/service.go +++ b/banyand/series/service.go @@ -20,6 +20,10 @@ package series import ( "bytes" "context" + "time" + + "go.uber.org/multierr" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" v1 "github.com/apache/skywalking-banyandb/api/fbs/v1" @@ -27,14 +31,12 @@ import ( "github.com/apache/skywalking-banyandb/banyand/series/schema/sw" "github.com/apache/skywalking-banyandb/banyand/storage" "github.com/apache/skywalking-banyandb/pkg/run" - "go.uber.org/multierr" - "time" ) var _ Service = (*service)(nil) type service struct { - db storage.Database + db storage.Database addr string } diff --git a/pkg/convert/number.go b/pkg/convert/number.go index 9656912..0dbc4ca 100644 --- a/pkg/convert/number.go +++ b/pkg/convert/number.go @@ -34,3 +34,11 @@ func Uint32ToBytes(u uint32) []byte { func BytesToUint64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } + +func IntToInt64(numbers ...int) []int64 { + var arr []int64 + for i := 0; i < len(numbers); i++ { + arr = append(arr, int64(numbers[i])) + } + return arr +} \ No newline at end of file
