This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c6314fe  To save and retrieve element_id (#50)
c6314fe is described below

commit c6314fe15b56143836e8969bd756a1310779b3b0
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Sep 22 22:15:28 2021 +0800

    To save and retrieve element_id (#50)
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/stream/stream_query.go      | 10 +++++++++-
 banyand/stream/stream_query_test.go |  5 +++++
 banyand/stream/stream_write.go      |  2 +-
 banyand/tsdb/series_seek.go         | 12 ++++++++++--
 banyand/tsdb/series_write.go        | 15 +++++++++++++++
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index fd0a05c..f8b7665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -70,7 +70,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) 
{
 }
 
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) 
(*modelv2.TagFamily, error) {
-       familyRawBytes, err := item.Val(family)
+       familyRawBytes, err := item.Family(family)
        if err != nil {
                return nil, err
        }
@@ -102,3 +102,11 @@ func (s *stream) ParseTagFamily(family string, item 
tsdb.Item) (*modelv2.TagFami
                Tags: tags,
        }, err
 }
+
+func (s *stream) ParseElementID(item tsdb.Item) (string, error) {
+       rawBytes, err := item.Val()
+       if err != nil {
+               return "", err
+       }
+       return string(rawBytes), nil
+}
diff --git a/banyand/stream/stream_query_test.go 
b/banyand/stream/stream_query_test.go
index 21669c6..8319487 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -681,6 +681,11 @@ func queryData(tester *assert.Assertions, s *stream, opts 
queryOpts) (shardsForT
                                                                elements = 
append(elements, tag.GetValue().GetStr().GetValue())
                                                        }
                                                }
+                                               eleID, errInner := 
s.ParseElementID(iterator.Val())
+                                               if errInner != nil {
+                                                       return nil, errInner
+                                               }
+                                               tester.NotEmpty(eleID)
                                        }
                                        _ = iterator.Close()
                                        g = append(g, shardStruct{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index dc536bf..610ef31 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -36,7 +36,6 @@ import (
 
 var (
        ErrMalformedElement            = errors.New("element is malformed")
-       ErrUnsupportedTagTypeAsEntry   = errors.New("the tag type can not be as 
an entry in an entity")
        ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, 
null) can not be as the index field value")
 )
 
@@ -92,6 +91,7 @@ func (s *stream) write(shardID common.ShardID, value 
*streamv2.ElementValue) (*t
                        }
                        builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
                }
+               builder.Val([]byte(value.GetElementId()))
                writer, errWrite := builder.Build()
                if errWrite != nil {
                        return nil, errWrite
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0c8700b..0aac353 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -32,7 +32,8 @@ type Iterator interface {
 }
 
 type Item interface {
-       Val(family string) ([]byte, error)
+       Family(family string) ([]byte, error)
+       Val() ([]byte, error)
        ID() common.ItemID
        SortedField() []byte
        Time() uint64
@@ -118,7 +119,7 @@ func (i *item) SortedField() []byte {
        return i.sortedField
 }
 
-func (i *item) Val(family string) ([]byte, error) {
+func (i *item) Family(family string) ([]byte, error) {
        d := dataBucket{
                seriesID: i.seriesID,
                family:   []byte(family),
@@ -126,6 +127,13 @@ func (i *item) Val(family string) ([]byte, error) {
        return i.data.Get(d.marshal(), uint64(i.itemID))
 }
 
+func (i *item) Val() ([]byte, error) {
+       d := dataBucket{
+               seriesID: i.seriesID,
+       }
+       return i.data.Get(d.marshal(), uint64(i.itemID))
+}
+
 func (i *item) ID() common.ItemID {
        return i.itemID
 }
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index cdec388..8232648 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -84,6 +84,8 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder {
 var ErrNoTime = errors.New("no time specified")
 var ErrNoVal = errors.New("no value specified")
 
+var ErrDuplicatedFamily = errors.New("duplicated family")
+
 func (w *writerBuilder) Build() (Writer, error) {
        if w.block == nil {
                return nil, errors.WithStack(ErrNoTime)
@@ -91,6 +93,16 @@ func (w *writerBuilder) Build() (Writer, error) {
        if len(w.values) < 1 {
                return nil, errors.WithStack(ErrNoVal)
        }
+       for i, value := range w.values {
+               for j := i + 1; j < len(w.values); j = j + 1 {
+                       if value.family == nil && w.values[j].family == nil {
+                               return nil, errors.Wrap(ErrDuplicatedFamily, 
"default family")
+                       }
+                       if bytes.Equal(value.family, w.values[j].family) {
+                               return nil, errors.Wrapf(ErrDuplicatedFamily, 
"family:%s", value.family)
+                       }
+               }
+       }
        segID, blockID := w.block.identity()
        return &writer{
                block: w.block,
@@ -145,6 +157,9 @@ type dataBucket struct {
 }
 
 func (d dataBucket) marshal() []byte {
+       if d.family == nil {
+               return d.seriesID.Marshal()
+       }
        return bytes.Join([][]byte{
                d.seriesID.Marshal(),
                hash(d.family),

Reply via email to