This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c98656e  Refactor index startup and remove index field check (#29)
c98656e is described below

commit c98656e5971d4c5d12131c0fab5f56686308895d
Author: Jiajing LU <[email protected]>
AuthorDate: Thu Aug 5 15:34:31 2021 +0800

    Refactor index startup and remove index field check (#29)
    
    * refactor index startup and remove index field check
    
    Signed-off-by: Megrez Lu <[email protected]>
    
    * add log and break iteration asap
    
    Signed-off-by: Megrez Lu <[email protected]>
    
    * Update banyand/index/index.go
    
    polish condition check
    
    Co-authored-by: Gao Hongtao <[email protected]>
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/index/index.go          | 44 +++++++++++++++++++++++++++++++++++++++--
 banyand/index/index_test.go     | 15 +++++---------
 banyand/query/processor_test.go | 11 +++++++----
 3 files changed, 54 insertions(+), 16 deletions(-)

diff --git a/banyand/index/index.go b/banyand/index/index.go
index 89122e9..98a049a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -38,7 +38,6 @@ import (
 var (
        ErrShardNotFound       = errors.New("series doesn't exist")
        ErrTraceSeriesNotFound = errors.New("trace series not found")
-       ErrUnknownField        = errors.New("the field is unknown")
 )
 
 type Condition struct {
@@ -64,9 +63,25 @@ type Builder interface {
        run.Service
 }
 
+type ReadyOption func(map[string]*series) bool
+
+func MetaExists(group, name string) ReadyOption {
+       seriesID := &apiv1.Metadata{
+               Name:  "sw",
+               Group: "default",
+       }
+       return func(m map[string]*series) bool {
+               if _, ok := m[compositeSeriesID(seriesID)]; ok {
+                       return true
+               }
+               return false
+       }
+}
+
 type Service interface {
        Repo
        Builder
+       Ready(context.Context, ...ReadyOption) bool
 }
 
 type series struct {
@@ -108,7 +123,8 @@ func (s *service) Insert(series common.Metadata, shardID 
uint, field *Field) err
        }
        objects, ok := sd.meta[field.Name]
        if !ok {
-               return ErrUnknownField
+               s.log.Debug().Str("field", field.Name).Msg("field is not 
indexed")
+               return nil
        }
        for _, object := range objects {
                err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
@@ -154,6 +170,30 @@ func (s *service) GracefulStop() {
        }
 }
 
+func (s *service) Ready(ctx context.Context, options ...ReadyOption) bool {
+       options = append(options, func(m map[string]*series) bool {
+               return len(m) > 0
+       })
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err() == nil
+               default:
+                       allMatches := true
+                       for _, opt := range options {
+                               if allMatches = opt(s.meta.meta); !allMatches {
+                                       break
+                               }
+                       }
+                       if !allMatches {
+                               continue
+                       }
+                       return true
+               }
+       }
+}
+
 type indexMeta struct {
        meta map[string]*series
        sync.RWMutex
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index 44a89af..cd09710 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -108,7 +108,7 @@ func Test_service_Insert(t *testing.T) {
                                        Value:   convert.Int64ToBytes(500),
                                },
                        },
-                       wantErr: true,
+                       wantErr: false,
                },
        }
        for _, tt := range tests {
@@ -172,14 +172,9 @@ func setUpModules(tester *assert.Assertions) *service {
        tester.NoError(err)
        s, ok := svc.(*service)
        tester.True(ok)
-       deadline := time.Now().Add(10 * time.Second)
-       for {
-               if s.meta.get(seriesID) != nil {
-                       break
-               }
-               if time.Now().After(deadline) {
-                       tester.Fail("timeout")
-               }
-       }
+
+       ctx, cancelFunc := context.WithTimeout(context.Background(), 
10*time.Second)
+       defer cancelFunc()
+       tester.True(svc.Ready(ctx, MetaExists("default", "sw")))
        return s
 }
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 54d2f65..dbea2ae 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -111,6 +111,10 @@ func setupServices(t *testing.T, tester 
*require.Assertions) (discovery.ServiceR
                tester.NoError(err)
        }()
 
+       ctx, cancelFunc := context.WithTimeout(context.Background(), 
10*time.Second)
+       defer cancelFunc()
+       tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
        return repo, traceSvc, func() {
                db.GracefulStop()
                _ = os.RemoveAll(rootPath)
@@ -269,10 +273,9 @@ func setupData(tester *require.Assertions, baseTs 
time.Time, svc series.Service)
        }
 
        for _, ev := range entityValues {
-               _, _ = svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, 
ev.dataBinary, ev.items...)
-               // TODO: every field should be indexed?
-               //tester.True(ok)
-               //tester.NoError(err)
+               ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, 
ev.dataBinary, ev.items...)
+               tester.True(ok)
+               tester.NoError(err)
        }
 }
 

Reply via email to