This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/liasion in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5e377c973615c8489f7076b33296b7185c56b2c2 Author: Qiuxia Fan <[email protected]> AuthorDate: Mon Jun 21 11:07:59 2021 +0800 read trace --- banyand/liaison/grpc/grpc.go | 55 +++++++++++++---------- banyand/liaison/{liaison.go => grpc/grpc_test.go} | 19 +------- banyand/liaison/liaison.go | 1 - 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go index 9f8b108..92dcc48 100644 --- a/banyand/liaison/grpc/grpc.go +++ b/banyand/liaison/grpc/grpc.go @@ -19,6 +19,7 @@ package grpc import ( "context" + "fmt" "net" "strconv" "time" @@ -76,8 +77,8 @@ func (s *Server) GracefulStop() { s.ser.GracefulStop() } -func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) (*flatbuffers.Builder, error) { - s.log.Info("WriteTraces called...") +func WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) []byte { + fmt.Println("Write called...") builder := flatbuffers.NewBuilder(0) // Serialize MetaData group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name())) @@ -90,7 +91,7 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) var fieldList []flatbuffers.UOffsetT for i := 0; i < entityValue.FieldsLength(); i++ { var f v1.Field - var s string + var str string if ok := entityValue.Fields(&f, i); ok { unionValueType := new(flatbuffers.Table) if f.Value(unionValueType) { @@ -99,7 +100,7 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) unionStr := new(v1.String) unionStr.Init(unionValueType.Bytes, unionValueType.Pos) v1.FieldAddValueType(builder, v1.ValueTypeString) - s = string(unionStr.Value()) + str = string(unionStr.Value()) } else if valueType == v1.ValueTypeInt { unionInt := new(v1.Int) unionInt.Init(unionValueType.Bytes, unionValueType.Pos) @@ -110,32 +111,32 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) } else if valueType == v1.ValueTypeStringArray { unionStrArray := new(v1.StringArray) unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos) - len := unionStrArray.ValueLength() - if len == 1 { - s += string(unionStrArray.Value(0)) + l := unionStrArray.ValueLength() + if l == 1 { + str += string(unionStrArray.Value(0)) } else { - s += "[" - for i := 0; i < len; i++ { - s += string(unionStrArray.Value(i)) + str += "[" + for j := 0; j < l; j++ { + str += string(unionStrArray.Value(j)) } - s += "]" + str += "]" } } else if valueType == v1.ValueTypeIntArray { unionIntArray := new(v1.IntArray) unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos) l := unionIntArray.ValueLength() if l == 1 { - s += strconv.FormatInt(unionIntArray.Value(0), 10) + str += strconv.FormatInt(unionIntArray.Value(0), 10) } else if l > 1{ - s += "[" - for i := 0; i < l; i++ { - s += strconv.FormatInt(unionIntArray.Value(i), 10) + str += "[" + for j := 0; j < l; j++ { + str += strconv.FormatInt(unionIntArray.Value(j), 10) } - s += "]" + str += "]" } } if valueType == v1.ValueTypeIntArray || valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString { - field := builder.CreateString(s) + field := builder.CreateString(str) v1.FieldAddValue(builder, field) fieldList = append(fieldList, field) } @@ -167,14 +168,20 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) builder.Finish(trace) - return builder, nil + return builder.Bytes[builder.Head():] } -func (s *Server) FetchTraces(in *v1.EntityCriteria) (*flatbuffers.Builder, error) { - s.log.Info("Fetch called...") - builder := flatbuffers.NewBuilder(0) - //buf := builder.FinishedBytes() - //entityCriteria := v1.GetRootAsEntityCriteria(buf, 0) +type ComponentBuilderFunc func(*flatbuffers.Builder) + +func ReadTraces(funcs ...ComponentBuilderFunc) *v1.Entity { + b := flatbuffers.NewBuilder(1024) + v1.EntityStart(b) + for _, fun := range funcs { + fun(b) + } + entityOffset := v1.EntityEnd(b) + b.Finish(entityOffset) - return builder, nil + buf := b.Bytes[b.Head():] + return v1.GetRootAsEntity(buf, 0) } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/grpc/grpc_test.go similarity index 68% copy from banyand/liaison/liaison.go copy to banyand/liaison/grpc/grpc_test.go index 117b361..761af64 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/grpc/grpc_test.go @@ -15,21 +15,4 @@ // specific language governing permissions and limitations // under the License. -package liaison - -import ( - "context" - - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/run" -) - -type Endpoint interface { - run.Config - run.Service -} - -func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) { - return grpc.NewServer(ctx, pipeline), nil -} +package grpc_test diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index 117b361..76e8d14 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -19,7 +19,6 @@ package liaison import ( "context" - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/run"
