This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e2627b7  Introduce index module with memtable only (#26)
e2627b7 is described below

commit e2627b7a6a53f8485aa051b38536e566c9a1eb39
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Aug 4 15:38:32 2021 +0800

    Introduce index module with memtable only (#26)
    
    * Add series event
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Use build method to build topics
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Introduce posting list
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Add name to IndexObject
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Add mem table
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Index memtable
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Test search
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Fix some nits
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/common/metadata.go                             |  17 +
 api/event/discovery.go                             |   9 +-
 api/proto/banyandb/v1/schema.pb.go                 | 246 ++++++++++++--
 api/proto/banyandb/v1/schema.proto                 |  13 +-
 banyand/index/index.go                             | 203 +++++++++++-
 banyand/index/index_test.go                        | 185 +++++++++++
 banyand/index/search.go                            | 356 +++++++++++++++++++++
 banyand/index/search_test.go                       | 255 +++++++++++++++
 .../posting.go => banyand/index/tsdb/field_map.go  |  67 ++--
 banyand/index/tsdb/mem.go                          |  91 ++++++
 banyand/index/tsdb/mem_test.go                     | 261 +++++++++++++++
 banyand/index/tsdb/term_map.go                     | 143 +++++++++
 banyand/index/tsdb/tsdb.go                         |  65 ++++
 banyand/internal/cmd/standalone.go                 |   4 +-
 banyand/query/processor_test.go                    |  11 +-
 banyand/series/trace/common_test.go                |   7 +-
 banyand/series/trace/service.go                    | 147 +++++++++
 banyand/series/trace/trace.go                      |  93 +-----
 banyand/series/trace/write.go                      |  45 +++
 pkg/convert/number.go                              |   4 +
 pkg/posting/posting.go                             |   2 +
 pkg/posting/roaring/roaring.go                     |  12 +
 22 files changed, 2064 insertions(+), 172 deletions(-)

diff --git a/api/common/metadata.go b/api/common/metadata.go
index 93ea657..8706aaf 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -33,3 +33,20 @@ func (md Metadata) Equal(other Metadata) bool {
                md.Spec.Group == other.Spec.Group &&
                md.Spec.Name == other.Spec.Name
 }
+
+func NewMetadata(spec *v1.Metadata) *Metadata {
+       return &Metadata{
+               KindVersion: MetadataKindVersion,
+               Spec:        spec,
+       }
+}
+
+func NewMetadataByNameAndGroup(name, group string) *Metadata {
+       return &Metadata{
+               KindVersion: MetadataKindVersion,
+               Spec: &v1.Metadata{
+                       Name:  name,
+                       Group: group,
+               },
+       }
+}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index a38298c..10476b8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -33,7 +33,9 @@ var (
                Version: "v1",
                Kind:    "event-series",
        }
-       TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+       TopicSeriesEvent     = bus.UniTopic(SeriesEventKindVersion.String())
+       IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: 
"index-rule"}
+       TopicIndexRule       = bus.UniTopic(IndexRuleKindVersion.String())
 )
 
 type Shard struct {
@@ -45,3 +47,8 @@ type Series struct {
        common.KindVersion
        Payload v1.SeriesEvent
 }
+
+type IndexRule struct {
+       common.KindVersion
+       Payload *v1.IndexRuleEvent
+}
diff --git a/api/proto/banyandb/v1/schema.pb.go 
b/api/proto/banyandb/v1/schema.pb.go
index 69f0218..56f6144 100644
--- a/api/proto/banyandb/v1/schema.pb.go
+++ b/api/proto/banyandb/v1/schema.pb.go
@@ -978,6 +978,132 @@ func (x *IndexRuleBinding) GetUpdatedAt() 
*timestamppb.Timestamp {
        return nil
 }
 
+type IndexRuleEvent struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Series *Metadata                          
`protobuf:"bytes,1,opt,name=series,proto3" json:"series,omitempty"`
+       Rules  []*IndexRuleEvent_ShardedIndexRule 
`protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+       Action Action                             
`protobuf:"varint,4,opt,name=action,proto3,enum=banyandb.v1.Action" 
json:"action,omitempty"`
+       Time   *timestamppb.Timestamp             
`protobuf:"bytes,5,opt,name=time,proto3" json:"time,omitempty"`
+}
+
+func (x *IndexRuleEvent) Reset() {
+       *x = IndexRuleEvent{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *IndexRuleEvent) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent) ProtoMessage() {}
+
+func (x *IndexRuleEvent) ProtoReflect() protoreflect.Message {
+       mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent) Descriptor() ([]byte, []int) {
+       return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10}
+}
+
+func (x *IndexRuleEvent) GetSeries() *Metadata {
+       if x != nil {
+               return x.Series
+       }
+       return nil
+}
+
+func (x *IndexRuleEvent) GetRules() []*IndexRuleEvent_ShardedIndexRule {
+       if x != nil {
+               return x.Rules
+       }
+       return nil
+}
+
+func (x *IndexRuleEvent) GetAction() Action {
+       if x != nil {
+               return x.Action
+       }
+       return Action_ACTION_UNSPECIFIED
+}
+
+func (x *IndexRuleEvent) GetTime() *timestamppb.Timestamp {
+       if x != nil {
+               return x.Time
+       }
+       return nil
+}
+
+type IndexRuleEvent_ShardedIndexRule struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       ShardId uint64       
`protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" 
json:"shard_id,omitempty"`
+       Rules   []*IndexRule `protobuf:"bytes,2,rep,name=rules,proto3" 
json:"rules,omitempty"`
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) Reset() {
+       *x = IndexRuleEvent_ShardedIndexRule{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent_ShardedIndexRule) ProtoMessage() {}
+
+func (x *IndexRuleEvent_ShardedIndexRule) ProtoReflect() protoreflect.Message {
+       mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent_ShardedIndexRule.ProtoReflect.Descriptor 
instead.
+func (*IndexRuleEvent_ShardedIndexRule) Descriptor() ([]byte, []int) {
+       return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10, 0}
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetShardId() uint64 {
+       if x != nil {
+               return x.ShardId
+       }
+       return 0
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetRules() []*IndexRule {
+       if x != nil {
+               return x.Rules
+       }
+       return nil
+}
+
 var File_banyandb_v1_schema_proto protoreflect.FileDescriptor
 
 var file_banyandb_v1_schema_proto_rawDesc = []byte{
@@ -1119,7 +1245,27 @@ var file_banyandb_v1_schema_proto_rawDesc = []byte{
        0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 
0x1a, 0x2e, 0x67, 0x6f,
        0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 
0x66, 0x2e, 0x54, 0x69,
        0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 
0x61, 0x74, 0x65, 0x64,
-       0x41, 0x74, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
+       0x41, 0x74, 0x22, 0xbd, 0x02, 0x0a, 0x0e, 0x49, 0x6e, 0x64, 0x65, 0x78, 
0x52, 0x75, 0x6c, 0x65,
+       0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x06, 0x73, 0x65, 0x72, 
0x69, 0x65, 0x73, 0x18,
+       0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x62, 0x61, 0x6e, 0x79, 
0x61, 0x6e, 0x64, 0x62,
+       0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 
0x52, 0x06, 0x73, 0x65,
+       0x72, 0x69, 0x65, 0x73, 0x12, 0x42, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 
0x73, 0x18, 0x02, 0x20,
+       0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 
0x64, 0x62, 0x2e, 0x76,
+       0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x45, 
0x76, 0x65, 0x6e, 0x74,
+       0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64, 0x49, 0x6e, 0x64, 0x65, 
0x78, 0x52, 0x75, 0x6c,
+       0x65, 0x52, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x2b, 0x0a, 0x06, 
0x61, 0x63, 0x74, 0x69,
+       0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x62, 
0x61, 0x6e, 0x79, 0x61,
+       0x6e, 0x64, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 
0x6e, 0x52, 0x06, 0x61,
+       0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 
0x65, 0x18, 0x05, 0x20,
+       0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 
0x2e, 0x70, 0x72, 0x6f,
+       0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 
0x61, 0x6d, 0x70, 0x52,
+       0x04, 0x74, 0x69, 0x6d, 0x65, 0x1a, 0x5b, 0x0a, 0x10, 0x53, 0x68, 0x61, 
0x72, 0x64, 0x65, 0x64,
+       0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x19, 0x0a, 
0x08, 0x73, 0x68, 0x61,
+       0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 
0x07, 0x73, 0x68, 0x61,
+       0x72, 0x64, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 
0x73, 0x18, 0x02, 0x20,
+       0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 
0x64, 0x62, 0x2e, 0x76,
+       0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x52, 
0x05, 0x72, 0x75, 0x6c,
+       0x65, 0x73, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
        0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 
0x62, 0x61, 0x6e, 0x79,
        0x61, 0x6e, 0x64, 0x62, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 
0x2e, 0x63, 0x6f, 0x6d,
        0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 
0x61, 0x6c, 0x6b, 0x69,
@@ -1141,52 +1287,60 @@ func file_banyandb_v1_schema_proto_rawDescGZIP() []byte 
{
 }
 
 var file_banyandb_v1_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
-var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
 var file_banyandb_v1_schema_proto_goTypes = []interface{}{
-       (Duration_Duration)(0),        // 0: banyandb.v1.Duration.Duration
-       (FieldSpec_FieldType)(0),      // 1: banyandb.v1.FieldSpec.FieldType
-       (IndexObject_IndexType)(0),    // 2: banyandb.v1.IndexObject.IndexType
-       (Series_Catalog)(0),           // 3: banyandb.v1.Series.Catalog
-       (*ShardInfo)(nil),             // 4: banyandb.v1.ShardInfo
-       (*Duration)(nil),              // 5: banyandb.v1.Duration
-       (*FieldSpec)(nil),             // 6: banyandb.v1.FieldSpec
-       (*TraceStateMap)(nil),         // 7: banyandb.v1.TraceStateMap
-       (*TraceFieldMap)(nil),         // 8: banyandb.v1.TraceFieldMap
-       (*TraceSeries)(nil),           // 9: banyandb.v1.TraceSeries
-       (*IndexObject)(nil),           // 10: banyandb.v1.IndexObject
-       (*IndexRule)(nil),             // 11: banyandb.v1.IndexRule
-       (*Series)(nil),                // 12: banyandb.v1.Series
-       (*IndexRuleBinding)(nil),      // 13: banyandb.v1.IndexRuleBinding
-       (*Metadata)(nil),              // 14: banyandb.v1.Metadata
-       (*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp
+       (Duration_Duration)(0),                  // 0: 
banyandb.v1.Duration.Duration
+       (FieldSpec_FieldType)(0),                // 1: 
banyandb.v1.FieldSpec.FieldType
+       (IndexObject_IndexType)(0),              // 2: 
banyandb.v1.IndexObject.IndexType
+       (Series_Catalog)(0),                     // 3: 
banyandb.v1.Series.Catalog
+       (*ShardInfo)(nil),                       // 4: banyandb.v1.ShardInfo
+       (*Duration)(nil),                        // 5: banyandb.v1.Duration
+       (*FieldSpec)(nil),                       // 6: banyandb.v1.FieldSpec
+       (*TraceStateMap)(nil),                   // 7: banyandb.v1.TraceStateMap
+       (*TraceFieldMap)(nil),                   // 8: banyandb.v1.TraceFieldMap
+       (*TraceSeries)(nil),                     // 9: banyandb.v1.TraceSeries
+       (*IndexObject)(nil),                     // 10: banyandb.v1.IndexObject
+       (*IndexRule)(nil),                       // 11: banyandb.v1.IndexRule
+       (*Series)(nil),                          // 12: banyandb.v1.Series
+       (*IndexRuleBinding)(nil),                // 13: 
banyandb.v1.IndexRuleBinding
+       (*IndexRuleEvent)(nil),                  // 14: 
banyandb.v1.IndexRuleEvent
+       (*IndexRuleEvent_ShardedIndexRule)(nil), // 15: 
banyandb.v1.IndexRuleEvent.ShardedIndexRule
+       (*Metadata)(nil),                        // 16: banyandb.v1.Metadata
+       (*timestamppb.Timestamp)(nil),           // 17: 
google.protobuf.Timestamp
+       (Action)(0),                             // 18: banyandb.v1.Action
 }
 var file_banyandb_v1_schema_proto_depIdxs = []int32{
        0,  // 0: banyandb.v1.Duration.unit:type_name -> 
banyandb.v1.Duration.Duration
        1,  // 1: banyandb.v1.FieldSpec.type:type_name -> 
banyandb.v1.FieldSpec.FieldType
        7,  // 2: banyandb.v1.TraceFieldMap.state:type_name -> 
banyandb.v1.TraceStateMap
-       14, // 3: banyandb.v1.TraceSeries.metadata:type_name -> 
banyandb.v1.Metadata
+       16, // 3: banyandb.v1.TraceSeries.metadata:type_name -> 
banyandb.v1.Metadata
        6,  // 4: banyandb.v1.TraceSeries.fields:type_name -> 
banyandb.v1.FieldSpec
        8,  // 5: banyandb.v1.TraceSeries.reserved_fields_map:type_name -> 
banyandb.v1.TraceFieldMap
        4,  // 6: banyandb.v1.TraceSeries.shard:type_name -> 
banyandb.v1.ShardInfo
        5,  // 7: banyandb.v1.TraceSeries.duration:type_name -> 
banyandb.v1.Duration
-       15, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> 
google.protobuf.Timestamp
+       17, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> 
google.protobuf.Timestamp
        2,  // 9: banyandb.v1.IndexObject.type:type_name -> 
banyandb.v1.IndexObject.IndexType
-       14, // 10: banyandb.v1.IndexRule.metadata:type_name -> 
banyandb.v1.Metadata
+       16, // 10: banyandb.v1.IndexRule.metadata:type_name -> 
banyandb.v1.Metadata
        10, // 11: banyandb.v1.IndexRule.objects:type_name -> 
banyandb.v1.IndexObject
-       15, // 12: banyandb.v1.IndexRule.updated_at:type_name -> 
google.protobuf.Timestamp
+       17, // 12: banyandb.v1.IndexRule.updated_at:type_name -> 
google.protobuf.Timestamp
        3,  // 13: banyandb.v1.Series.catalog:type_name -> 
banyandb.v1.Series.Catalog
-       14, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
-       14, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> 
banyandb.v1.Metadata
-       14, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> 
banyandb.v1.Metadata
+       16, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
+       16, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> 
banyandb.v1.Metadata
+       16, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> 
banyandb.v1.Metadata
        12, // 17: banyandb.v1.IndexRuleBinding.subjects:type_name -> 
banyandb.v1.Series
-       15, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> 
google.protobuf.Timestamp
-       15, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> 
google.protobuf.Timestamp
-       15, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> 
google.protobuf.Timestamp
-       21, // [21:21] is the sub-list for method output_type
-       21, // [21:21] is the sub-list for method input_type
-       21, // [21:21] is the sub-list for extension type_name
-       21, // [21:21] is the sub-list for extension extendee
-       0,  // [0:21] is the sub-list for field type_name
+       17, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> 
google.protobuf.Timestamp
+       17, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> 
google.protobuf.Timestamp
+       17, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> 
google.protobuf.Timestamp
+       16, // 21: banyandb.v1.IndexRuleEvent.series:type_name -> 
banyandb.v1.Metadata
+       15, // 22: banyandb.v1.IndexRuleEvent.rules:type_name -> 
banyandb.v1.IndexRuleEvent.ShardedIndexRule
+       18, // 23: banyandb.v1.IndexRuleEvent.action:type_name -> 
banyandb.v1.Action
+       17, // 24: banyandb.v1.IndexRuleEvent.time:type_name -> 
google.protobuf.Timestamp
+       11, // 25: banyandb.v1.IndexRuleEvent.ShardedIndexRule.rules:type_name 
-> banyandb.v1.IndexRule
+       26, // [26:26] is the sub-list for method output_type
+       26, // [26:26] is the sub-list for method input_type
+       26, // [26:26] is the sub-list for extension type_name
+       26, // [26:26] is the sub-list for extension extendee
+       0,  // [0:26] is the sub-list for field type_name
 }
 
 func init() { file_banyandb_v1_schema_proto_init() }
@@ -1316,6 +1470,30 @@ func file_banyandb_v1_schema_proto_init() {
                                return nil
                        }
                }
+               file_banyandb_v1_schema_proto_msgTypes[10].Exporter = func(v 
interface{}, i int) interface{} {
+                       switch v := v.(*IndexRuleEvent); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_banyandb_v1_schema_proto_msgTypes[11].Exporter = func(v 
interface{}, i int) interface{} {
+                       switch v := v.(*IndexRuleEvent_ShardedIndexRule); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
        }
        type x struct{}
        out := protoimpl.TypeBuilder{
@@ -1323,7 +1501,7 @@ func file_banyandb_v1_schema_proto_init() {
                        GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
                        RawDescriptor: file_banyandb_v1_schema_proto_rawDesc,
                        NumEnums:      4,
-                       NumMessages:   10,
+                       NumMessages:   12,
                        NumExtensions: 0,
                        NumServices:   0,
                },
diff --git a/api/proto/banyandb/v1/schema.proto 
b/api/proto/banyandb/v1/schema.proto
index 273c155..aeb8e5a 100644
--- a/api/proto/banyandb/v1/schema.proto
+++ b/api/proto/banyandb/v1/schema.proto
@@ -184,4 +184,15 @@ message IndexRuleBinding {
   google.protobuf.Timestamp expire_at = 5;
   // updated_at_nanoseconds indicates when the IndexRuleBinding is updated
   google.protobuf.Timestamp updated_at = 6;
-}
\ No newline at end of file
+}
+
+message IndexRuleEvent {
+  Metadata series = 1;
+  message ShardedIndexRule {
+    uint64 shard_id = 1;
+    repeated IndexRule rules  = 2;
+  }
+  repeated ShardedIndexRule rules = 2;
+  Action action = 4;
+  google.protobuf.Timestamp time = 5;
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 6ab47f2..89122e9 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,29 +19,49 @@ package index
 
 import (
        "context"
+       "sync"
+
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/event"
        apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/posting"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+var (
+       ErrShardNotFound       = errors.New("series doesn't exist")
+       ErrTraceSeriesNotFound = errors.New("trace series not found")
+       ErrUnknownField        = errors.New("the field is unknown")
+)
+
 type Condition struct {
        Key    string
        Values [][]byte
        Op     apiv1.PairQuery_BinaryOp
 }
 
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
+type Field struct {
+       ChunkID common.ChunkID
+       Name    string
+       Value   []byte
+}
+
+//go:generate mockgen -destination=./index_mock.go -package=index . Service
 type Repo interface {
        Search(seriesMeta common.Metadata, shardID uint, startTime, endTime 
uint64, indexObjectName string, conditions []Condition) (posting.List, error)
+       Insert(seriesMeta common.Metadata, shardID uint, fields *Field) error
 }
 
 type Builder interface {
-       run.Config
        run.PreRunner
+       run.Service
 }
 
 type Service interface {
@@ -49,6 +69,179 @@ type Service interface {
        Builder
 }
 
-func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline 
queue.Queue) (Service, error) {
-       return nil, nil
+type series struct {
+       repo map[uint]*shard
+}
+
+type shard struct {
+       meta  map[string][]*apiv1.IndexObject
+       store tsdb.GlobalStore
+}
+
+type service struct {
+       meta              *indexMeta
+       log               *logger.Logger
+       repo              discovery.ServiceRepo
+       stopCh            chan struct{}
+       indexRuleListener *indexRuleListener
+}
+
+func NewService(_ context.Context, repo discovery.ServiceRepo) (Service, 
error) {
+       svc := &service{
+               repo:              repo,
+               indexRuleListener: &indexRuleListener{},
+       }
+       svc.meta = &indexMeta{
+               meta: make(map[string]*series),
+       }
+       svc.indexRuleListener.indexMeta = svc.meta
+       svc.indexRuleListener.closeFunc = func() {
+               svc.stopCh <- struct{}{}
+       }
+       return svc, nil
+}
+
+func (s *service) Insert(series common.Metadata, shardID uint, field *Field) 
error {
+       sd, err := s.getShard(series, shardID)
+       if err != nil {
+               return err
+       }
+       objects, ok := sd.meta[field.Name]
+       if !ok {
+               return ErrUnknownField
+       }
+       for _, object := range objects {
+               err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
+                       Name:  []byte(compositeFieldID(object.GetName(), 
field.Name)),
+                       Value: field.Value,
+               }, field.ChunkID))
+       }
+       return err
+}
+
+func (s *service) getShard(series common.Metadata, shardID uint) (*shard, 
error) {
+       ss := s.meta.get(series.Spec)
+       if ss == nil {
+               return nil, errors.Wrapf(ErrTraceSeriesNotFound, "identify:%s", 
compositeSeriesID(series.Spec))
+       }
+       sd, existSearcher := ss.repo[shardID]
+       if !existSearcher {
+               return nil, errors.Wrapf(ErrShardNotFound, "shardID:%d", 
shardID)
+       }
+       return sd, nil
+}
+
+func (s *service) Name() string {
+       return "index"
+}
+
+func (s *service) PreRun() error {
+       //TODO: listen to written data
+       s.log = logger.GetLogger("index")
+       s.indexRuleListener.log = s.log
+       return s.repo.Subscribe(event.TopicIndexRule, s.indexRuleListener)
+}
+
+func (s *service) Serve() error {
+       s.stopCh = make(chan struct{})
+       <-s.stopCh
+       return nil
+}
+
+func (s *service) GracefulStop() {
+       if s.stopCh != nil {
+               close(s.stopCh)
+       }
+}
+
+type indexMeta struct {
+       meta map[string]*series
+       sync.RWMutex
+}
+
+func (i *indexMeta) get(series *apiv1.Metadata) *series {
+       i.RWMutex.RLock()
+       defer i.RWMutex.RUnlock()
+       s, ok := i.meta[compositeSeriesID(series)]
+       if ok {
+               return s
+       }
+       return nil
+}
+
+type indexRuleListener struct {
+       log       *logger.Logger
+       indexMeta *indexMeta
+       closeFunc func()
+}
+
+func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
+       indexRuleEvent, ok := message.Data().(*apiv1.IndexRuleEvent)
+       if !ok {
+               i.log.Warn().Msg("invalid event data type")
+               return
+       }
+       i.log.Info().
+               Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
+               Str("series-name", indexRuleEvent.Series.Name).
+               Str("series-group", indexRuleEvent.Series.Group).
+               Msg("received an index rule")
+       i.indexMeta.Lock()
+       defer i.indexMeta.Unlock()
+       switch indexRuleEvent.Action {
+       case apiv1.Action_ACTION_PUT:
+               seriesID := compositeSeriesID(indexRuleEvent.Series)
+               newSeries := &series{
+                       repo: make(map[uint]*shard),
+               }
+               for _, rule := range indexRuleEvent.Rules {
+                       store := tsdb.NewStore(indexRuleEvent.Series.Name, 
indexRuleEvent.Series.Group, uint(rule.ShardId))
+                       fields := make([]tsdb.FieldSpec, 0, len(rule.Rules))
+                       meta := make(map[string][]*apiv1.IndexObject)
+                       for _, indexRule := range rule.GetRules() {
+                               for _, object := range indexRule.Objects {
+                                       fieldsSize := len(object.Fields)
+                                       if fieldsSize > 1 {
+                                               //TODO: to support composited 
index
+                                               i.log.Error().Str("name", 
object.Name).
+                                                       Msg("index module 
doesn't support composited index object")
+                                               i.closeFunc()
+                                       } else if fieldsSize < 1 {
+                                               continue
+                                       }
+                                       field := object.Fields[0]
+                                       fieldSpec := tsdb.FieldSpec{
+                                               Name: 
compositeFieldID(object.Name, field),
+                                       }
+                                       fields = append(fields, fieldSpec)
+                                       objects, existed := meta[field]
+                                       if !existed {
+                                               objects = 
make([]*apiv1.IndexObject, 0, 1)
+                                       }
+                                       objects = append(objects, object)
+                                       meta[field] = objects
+                               }
+                       }
+                       err := store.Initialize(fields)
+                       if err != nil {
+                               i.log.Warn().Err(err).Msg("failed to initialize 
index getShard")
+                       }
+                       newSeries.repo[uint(rule.ShardId)] = &shard{
+                               store: store,
+                               meta:  meta,
+                       }
+               }
+               i.indexMeta.meta[seriesID] = newSeries
+       default:
+               i.log.Warn().Msg("unsupported action")
+       }
+       return
+}
+
+func compositeFieldID(indexObjectName, field string) string {
+       return indexObjectName + ":" + field
+}
+
+func compositeSeriesID(series *apiv1.Metadata) string {
+       return series.Name + "-" + series.Group
 }
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
new file mode 100644
index 0000000..44a89af
--- /dev/null
+++ b/banyand/index/index_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+       "context"
+       "math"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/event"
+       apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func Test_service_Insert(t *testing.T) {
+       tester := assert.New(t)
+       type args struct {
+               series  common.Metadata
+               shardID uint
+               field   *Field
+       }
+       tests := []struct {
+               name    string
+               args    args
+               wantErr bool
+       }{
+               {
+                       name: "str field",
+                       args: args{
+                               series:  
*common.NewMetadataByNameAndGroup("sw", "default"),
+                               shardID: 0,
+                               field: &Field{
+                                       ChunkID: common.ChunkID(1),
+                                       Name:    "endpoint",
+                                       Value:   []byte("/test"),
+                               },
+                       },
+               },
+               {
+                       name: "int field",
+                       args: args{
+                               series:  
*common.NewMetadataByNameAndGroup("sw", "default"),
+                               shardID: 1,
+                               field: &Field{
+                                       ChunkID: common.ChunkID(2),
+                                       Name:    "duration",
+                                       Value:   convert.Int64ToBytes(500),
+                               },
+                       },
+               },
+               {
+                       name: "unknown series",
+                       args: args{
+                               series:  
*common.NewMetadataByNameAndGroup("unknown", "default"),
+                               shardID: 0,
+                               field: &Field{
+                                       ChunkID: common.ChunkID(2),
+                                       Name:    "duration",
+                                       Value:   convert.Int64ToBytes(500),
+                               },
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "unknown shard",
+                       args: args{
+                               series:  
*common.NewMetadataByNameAndGroup("sw", "default"),
+                               shardID: math.MaxInt64,
+                               field: &Field{
+                                       ChunkID: common.ChunkID(2),
+                                       Name:    "duration",
+                                       Value:   convert.Int64ToBytes(500),
+                               },
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "unknown field",
+                       args: args{
+                               series:  
*common.NewMetadataByNameAndGroup("sw", "default"),
+                               shardID: 0,
+                               field: &Field{
+                                       ChunkID: common.ChunkID(2),
+                                       Name:    "unknown",
+                                       Value:   convert.Int64ToBytes(500),
+                               },
+                       },
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       s := setUpModules(tester)
+                       if err := s.Insert(tt.args.series, tt.args.shardID, 
tt.args.field); (err != nil) != tt.wantErr {
+                               t.Errorf("Insert() error = %v, wantErr %v", 
err, tt.wantErr)
+                       }
+               })
+       }
+}
+
+func Test_service_Init(t *testing.T) {
+       tester := assert.New(t)
+       s := setUpModules(tester)
+       tester.Equal(1, len(s.meta.meta))
+       tester.Equal(2, len(s.meta.meta["sw-default"].repo))
+}
+
+func setUpModules(tester *assert.Assertions) *service {
+       _ = logger.Bootstrap()
+       repo, err := discovery.NewServiceRepo(context.TODO())
+       tester.NoError(err)
+       svc, err := NewService(context.TODO(), repo)
+       tester.NoError(err)
+       tester.NoError(svc.PreRun())
+
+       rules := []*apiv1.IndexRule{
+               {
+                       Objects: []*apiv1.IndexObject{
+                               {
+                                       Name:   "endpoint",
+                                       Fields: []string{"endpoint"},
+                               },
+                               {
+                                       Name:   "duration",
+                                       Fields: []string{"duration"},
+                               },
+                       },
+               },
+       }
+       seriesID := &apiv1.Metadata{
+               Name:  "sw",
+               Group: "default",
+       }
+       _, err = repo.Publish(event.TopicIndexRule, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &apiv1.IndexRuleEvent{
+               Series: seriesID,
+               Rules: []*apiv1.IndexRuleEvent_ShardedIndexRule{
+                       {
+                               ShardId: 0,
+                               Rules:   rules,
+                       },
+                       {
+                               ShardId: 1,
+                               Rules:   rules,
+                       },
+               },
+               Action: apiv1.Action_ACTION_PUT,
+               Time:   timestamppb.Now(),
+       }))
+       tester.NoError(err)
+       s, ok := svc.(*service)
+       tester.True(ok)
+       deadline := time.Now().Add(10 * time.Second)
+       for {
+               if s.meta.get(seriesID) != nil {
+                       break
+               }
+               if time.Now().After(deadline) {
+                       tester.Fail("timeout")
+               }
+       }
+       return s
+}
diff --git a/banyand/index/search.go b/banyand/index/search.go
new file mode 100644
index 0000000..97da458
--- /dev/null
+++ b/banyand/index/search.go
@@ -0,0 +1,356 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+       "encoding/base64"
+       "encoding/json"
+       "strings"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrNotRangeOperation = errors.New("this is not an range operation")
+
+type executable interface {
+       execute() (posting.List, error)
+}
+
+type searchTree interface {
+       executable
+}
+
+func (s *service) Search(series common.Metadata, shardID uint, startTime, 
endTime uint64, indexObjectName string, conditions []Condition) (posting.List, 
error) {
+       sd, err := s.getShard(series, shardID)
+       if err != nil {
+               return nil, err
+       }
+       store := sd.store
+       searcher, hasData := store.Window(startTime, endTime)
+       if !hasData {
+               return roaring.EmptyPostingList, nil
+       }
+       tree, errBuild := buildSearchTree(searcher, indexObjectName, conditions)
+       if errBuild != nil {
+               return nil, err
+       }
+       s.log.Debug().Interface("search-tree", tree).Msg("build search tree")
+
+       result, err := tree.execute()
+       if result == nil {
+               return roaring.EmptyPostingList, err
+       }
+       return result, err
+}
+
+func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions 
[]Condition) (searchTree, error) {
+       condMap := toMap(indexObject, conditions)
+       root := &andNode{
+               node: &node{
+                       SubNodes: make([]executable, 0),
+                       searcher: searcher,
+               },
+       }
+       for key, conds := range condMap {
+               var rangeLeaf *rangeOp
+               for _, cond := range conds {
+                       if rangeLeaf != nil && !rangeOP(cond.Op) {
+                               return nil, errors.Wrapf(ErrNotRangeOperation, 
"op:%s", cond.Op.String())
+                       }
+                       if rangeOP(cond.Op) {
+                               if rangeLeaf == nil {
+                                       rangeLeaf = root.addRangeLeaf(key)
+                               }
+                               opts := rangeLeaf.Opts
+                               switch cond.Op {
+                               case apiv1.PairQuery_BINARY_OP_GT:
+                                       opts.Lower = bytes.Join(cond.Values...)
+                               case apiv1.PairQuery_BINARY_OP_GE:
+                                       opts.Lower = bytes.Join(cond.Values...)
+                                       opts.IncludesLower = true
+                               case apiv1.PairQuery_BINARY_OP_LT:
+                                       opts.Upper = bytes.Join(cond.Values...)
+                               case apiv1.PairQuery_BINARY_OP_LE:
+                                       opts.Upper = bytes.Join(cond.Values...)
+                                       opts.IncludesUpper = true
+                               }
+                               continue
+                       }
+                       switch cond.Op {
+                       case apiv1.PairQuery_BINARY_OP_EQ:
+                               root.addEq(key, cond.Values)
+                       case apiv1.PairQuery_BINARY_OP_NE:
+                               root.addNot(key, root.newEq(key, cond.Values))
+                       case apiv1.PairQuery_BINARY_OP_HAVING:
+                               n := root.addOrNode(len(cond.Values))
+                               for _, v := range cond.Values {
+                                       n.addEq(key, [][]byte{v})
+                               }
+                       case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
+                               n := root.newOrNode(len(cond.Values))
+                               for _, v := range cond.Values {
+                                       n.addEq(key, [][]byte{v})
+                               }
+                               root.addNot(key, n)
+                       }
+               }
+       }
+       return root, nil
+}
+
+func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
+       switch op {
+       case apiv1.PairQuery_BINARY_OP_GT,
+               apiv1.PairQuery_BINARY_OP_GE,
+               apiv1.PairQuery_BINARY_OP_LT,
+               apiv1.PairQuery_BINARY_OP_LE:
+               return true
+       }
+       return false
+}
+
+func toMap(indexObject string, condition []Condition) map[string][]Condition {
+       result := make(map[string][]Condition)
+       for _, c := range condition {
+               key := compositeFieldID(indexObject, c.Key)
+               l, ok := result[key]
+               if ok {
+                       l = append(l, c)
+                       result[key] = l
+                       continue
+               }
+               result[key] = []Condition{c}
+       }
+       return result
+}
+
+type logicalOP interface {
+       executable
+       merge(posting.List) error
+}
+
+type node struct {
+       searcher tsdb.Searcher
+       value    posting.List
+       SubNodes []executable `json:"sub_nodes,omitempty"`
+}
+
+func (n *node) newEq(key string, values [][]byte) *eq {
+       return &eq{
+               leaf: &leaf{
+                       Key:      key,
+                       Values:   values,
+                       searcher: n.searcher,
+               },
+       }
+}
+
+func (n *node) addEq(key string, values [][]byte) {
+       n.SubNodes = append(n.SubNodes, n.newEq(key, values))
+}
+
+func (n *node) addNot(key string, inner executable) {
+       n.SubNodes = append(n.SubNodes, &not{
+               Key:      key,
+               searcher: n.searcher,
+               Inner:    inner,
+       })
+}
+
+func (n *node) addRangeLeaf(key string) *rangeOp {
+       r := &rangeOp{
+               leaf: &leaf{
+                       Key:      key,
+                       searcher: n.searcher,
+               },
+               Opts: &tsdb.RangeOpts{},
+       }
+       n.SubNodes = append(n.SubNodes, r)
+       return r
+}
+
+func (n *node) newOrNode(size int) *orNode {
+       return &orNode{
+               node: &node{
+                       searcher: n.searcher,
+                       SubNodes: make([]executable, 0, size),
+               },
+       }
+}
+
+func (n *node) addOrNode(size int) *orNode {
+       on := n.newOrNode(size)
+       n.SubNodes = append(n.SubNodes, on)
+       return on
+}
+
+func (n *node) pop() (executable, bool) {
+       if len(n.SubNodes) < 1 {
+               return nil, false
+       }
+       sn := n.SubNodes[0]
+       n.SubNodes = n.SubNodes[1:]
+       return sn, true
+}
+
+func execute(n *node, lp logicalOP) (posting.List, error) {
+       ex, hasNext := n.pop()
+       if !hasNext {
+               return n.value, nil
+       }
+       r, err := ex.execute()
+       if err != nil {
+               return nil, err
+       }
+       if n.value == nil {
+               n.value = r
+               return lp.execute()
+       }
+       err = lp.merge(r)
+       if err != nil {
+               return nil, err
+       }
+       if n.value.IsEmpty() {
+               return n.value, nil
+       }
+       return lp.execute()
+}
+
+type andNode struct {
+       *node
+}
+
+func (an *andNode) merge(list posting.List) error {
+       return an.value.Intersect(list)
+}
+
+func (an *andNode) execute() (posting.List, error) {
+       return execute(an.node, an)
+}
+
+func (an *andNode) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["and"] = an.node.SubNodes
+       return json.Marshal(data)
+}
+
+type orNode struct {
+       *node
+}
+
+func (on *orNode) merge(list posting.List) error {
+       return on.value.Union(list)
+}
+
+func (on *orNode) execute() (posting.List, error) {
+       return execute(on.node, on)
+}
+
+func (on *orNode) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["or"] = on.node.SubNodes
+       return json.Marshal(data)
+}
+
+type leaf struct {
+       executable
+       Key      string
+       Values   [][]byte
+       searcher tsdb.Searcher
+}
+
+type not struct {
+       executable
+       Key      string
+       searcher tsdb.Searcher
+       Inner    executable
+}
+
+func (n *not) execute() (posting.List, error) {
+       all := n.searcher.MatchField([]byte(n.Key))
+       list, err := n.Inner.execute()
+       if err != nil {
+               return nil, err
+       }
+       err = all.Difference(list)
+       return all, err
+}
+
+func (n *not) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["not"] = n.Inner
+       return json.Marshal(data)
+}
+
+type eq struct {
+       *leaf
+}
+
+func (eq *eq) execute() (posting.List, error) {
+       return eq.searcher.MatchTerms(&tsdb.Field{
+               Name:  []byte(eq.Key),
+               Value: bytes.Join(eq.Values...),
+       }), nil
+}
+
+func (eq *eq) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["eq"] = eq.leaf
+       return json.Marshal(data)
+}
+
+type rangeOp struct {
+       *leaf
+       Opts *tsdb.RangeOpts
+}
+
+func (r *rangeOp) execute() (posting.List, error) {
+       return r.searcher.Range([]byte(r.Key), r.Opts), nil
+}
+
+func (r *rangeOp) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       var builder strings.Builder
+       if r.Opts.Lower != nil {
+               if r.Opts.IncludesLower {
+                       builder.WriteString("[")
+               } else {
+                       builder.WriteString("(")
+               }
+       }
+       builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower))
+       builder.WriteString(",")
+       builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper))
+       if r.Opts.Upper != nil {
+               if r.Opts.IncludesUpper {
+                       builder.WriteString("]")
+               } else {
+                       builder.WriteString(")")
+               }
+       }
+       data["key"] = r.Key
+       data["range"] = builder.String()
+       return json.Marshal(data)
+}
diff --git a/banyand/index/search_test.go b/banyand/index/search_test.go
new file mode 100644
index 0000000..d9eb8d9
--- /dev/null
+++ b/banyand/index/search_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+       "math"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func Test_service_Search(t *testing.T) {
+       tester := assert.New(t)
+       type args struct {
+               indexObjectName string
+               conditions      []Condition
+       }
+       tests := []struct {
+               name    string
+               args    args
+               want    posting.List
+               wantErr bool
+       }{
+               {
+                       name: "str equal",
+                       args: args{
+                               indexObjectName: "endpoint",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "endpoint",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_EQ,
+                                               Values: 
[][]byte{[]byte("/product")},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(1),
+               },
+               {
+                       name: "str not equal",
+                       args: args{
+                               indexObjectName: "endpoint",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "endpoint",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_NE,
+                                               Values: 
[][]byte{[]byte("/product")},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(2, 3),
+               },
+               {
+                       name: "str having",
+                       args: args{
+                               indexObjectName: "endpoint",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "endpoint",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_HAVING,
+                                               Values: 
[][]byte{[]byte("/product"), []byte("/sales")},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(1, 3),
+               },
+               {
+                       name: "str not having",
+                       args: args{
+                               indexObjectName: "endpoint",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "endpoint",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+                                               Values: 
[][]byte{[]byte("/product"), []byte("/sales")},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(2),
+               },
+               {
+                       name: "int equal",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_EQ,
+                                               Values: 
[][]byte{convert.Int64ToBytes(500)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(12),
+               },
+               {
+                       name: "int not equal",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_NE,
+                                               Values: 
[][]byte{convert.Int64ToBytes(500)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(11, 13, 14),
+               },
+               {
+                       name: "int having",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_HAVING,
+                                               Values: 
[][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(11, 12),
+               },
+               {
+                       name: "int not having",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+                                               Values: 
[][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(13, 14),
+               },
+               {
+                       name: "int in range",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_GT,
+                                               Values: 
[][]byte{convert.Int64ToBytes(50)},
+                                       },
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_LT,
+                                               Values: 
[][]byte{convert.Int64ToBytes(5000)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(13, 12),
+               },
+               {
+                       name: "int includes edges",
+                       args: args{
+                               indexObjectName: "duration",
+                               conditions: []Condition{
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_GE,
+                                               Values: 
[][]byte{convert.Int64ToBytes(50)},
+                                       },
+                                       {
+                                               Key:    "duration",
+                                               Op:     
apiv1.PairQuery_BINARY_OP_LE,
+                                               Values: 
[][]byte{convert.Int64ToBytes(5000)},
+                                       },
+                               },
+                       },
+                       want: roaring.NewPostingListWithInitialData(13, 12, 11, 
14),
+               },
+       }
+       s := setUpModules(tester)
+       setupData(tester, s)
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, err := 
s.Search(*common.NewMetadataByNameAndGroup("sw", "default"), 0, 0, 
math.MaxInt64, tt.args.indexObjectName, tt.args.conditions)
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("Search() error = %v, wantErr %v", 
err, tt.wantErr)
+                               return
+                       }
+                       if !got.Equal(tt.want) {
+                               t.Errorf("Search() got = %v, want %v", 
got.ToSlice(), tt.want.ToSlice())
+                       }
+               })
+       }
+}
+
+func setupData(tester *assert.Assertions, s *service) {
+       fields := []*Field{
+               {
+                       ChunkID: common.ChunkID(1),
+                       Name:    "endpoint",
+                       Value:   []byte("/product"),
+               },
+               {
+                       ChunkID: common.ChunkID(2),
+                       Name:    "endpoint",
+                       Value:   []byte("/home"),
+               },
+               {
+                       ChunkID: common.ChunkID(3),
+                       Name:    "endpoint",
+                       Value:   []byte("/sales"),
+               },
+               {
+                       ChunkID: common.ChunkID(11),
+                       Name:    "duration",
+                       Value:   convert.Int64ToBytes(50),
+               },
+               {
+                       ChunkID: common.ChunkID(12),
+                       Name:    "duration",
+                       Value:   convert.Int64ToBytes(500),
+               },
+               {
+                       ChunkID: common.ChunkID(13),
+                       Name:    "duration",
+                       Value:   convert.Int64ToBytes(100),
+               },
+               {
+                       ChunkID: common.ChunkID(14),
+                       Name:    "duration",
+                       Value:   convert.Int64ToBytes(5000),
+               },
+       }
+       for _, field := range fields {
+               if err := s.Insert(*common.NewMetadataByNameAndGroup("sw", 
"default"), 0, field); err != nil {
+                       tester.NoError(err)
+               }
+       }
+}
diff --git a/pkg/posting/posting.go b/banyand/index/tsdb/field_map.go
similarity index 51%
copy from pkg/posting/posting.go
copy to banyand/index/tsdb/field_map.go
index 3812162..4d89713 100644
--- a/pkg/posting/posting.go
+++ b/banyand/index/tsdb/field_map.go
@@ -15,55 +15,50 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package posting
+package tsdb
 
 import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
-var ErrListEmpty = errors.New("postings list is empty")
+var ErrFieldAbsent = errors.New("field doesn't exist")
 
-// List is a collection of common.ChunkID.
-type List interface {
-       Contains(id common.ChunkID) bool
+type fieldHashID uint64
 
-       IsEmpty() bool
-
-       Max() (common.ChunkID, error)
-
-       Len() int
-
-       Iterator() Iterator
-
-       Clone() List
-
-       Equal(other List) bool
-
-       Insert(i common.ChunkID)
-
-       Intersect(other List) error
-
-       Difference(other List) error
-
-       Union(other List) error
-
-       UnionMany(others []List) error
-
-       AddIterator(iter Iterator) error
-
-       AddRange(min, max common.ChunkID) error
+type fieldMap struct {
+       repo map[fieldHashID]*fieldValue
+}
 
-       RemoveRange(min, max common.ChunkID) error
+func newFieldMap(initialSize int) *fieldMap {
+       return &fieldMap{
+               repo: make(map[fieldHashID]*fieldValue, initialSize),
+       }
+}
 
-       Reset()
+func (fm *fieldMap) createKey(key []byte) {
+       fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{
+               key:   key,
+               value: newPostingMap(),
+       }
 }
 
-type Iterator interface {
-       Next() bool
+func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+       v, ok := fm.repo[fieldHashID(convert.Hash(key))]
+       return v, ok
+}
 
-       Current() common.ChunkID
+func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
+       pm, ok := fm.get(fv.Name)
+       if !ok {
+               return errors.Wrapf(ErrFieldAbsent, "filed Name:%s", fv.Name)
+       }
+       return pm.value.put(fv.Value, id)
+}
 
-       Close() error
+type fieldValue struct {
+       key   []byte
+       value *postingMap
 }
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
new file mode 100644
index 0000000..2e2ea9b
--- /dev/null
+++ b/banyand/index/tsdb/mem.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrFieldsAbsent = errors.New("fields are absent")
+
+type MemTable struct {
+       terms   *fieldMap
+       name    string
+       group   string
+       shardID uint
+}
+
+func NewMemTable(name, group string, shardID uint) *MemTable {
+       return &MemTable{
+               name:    name,
+               group:   group,
+               shardID: shardID,
+       }
+}
+
+type Field struct {
+       Name  []byte
+       Value []byte
+}
+
+type FieldSpec struct {
+       Name string
+}
+
+func (m *MemTable) Initialize(fields []FieldSpec) error {
+       if len(fields) < 1 {
+               return ErrFieldsAbsent
+       }
+       m.terms = newFieldMap(len(fields))
+       for _, f := range fields {
+               m.terms.createKey([]byte(f.Name))
+       }
+       return nil
+}
+
+func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
+       return m.terms.put(field, chunkID)
+}
+
+func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
+       fieldsValues, ok := m.terms.get(fieldName)
+       if !ok {
+               return roaring.EmptyPostingList
+       }
+       return fieldsValues.value.allValues()
+}
+
+func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
+       fieldsValues, ok := m.terms.get(field.Name)
+       if !ok {
+               return roaring.EmptyPostingList
+       }
+       return fieldsValues.value.get(field.Value).Clone()
+}
+
+func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list 
posting.List) {
+       fieldsValues, ok := m.terms.get(fieldName)
+       if !ok {
+               return roaring.EmptyPostingList
+       }
+       return fieldsValues.value.getRange(opts)
+}
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
new file mode 100644
index 0000000..fed7655
--- /dev/null
+++ b/banyand/index/tsdb/mem_test.go
@@ -0,0 +1,261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "reflect"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func TestMemTable_Initialize(t *testing.T) {
+       type args struct {
+               fields []FieldSpec
+       }
+       tests := []struct {
+               name    string
+               args    args
+               wantErr bool
+       }{
+               {
+                       name: "golden path",
+                       args: args{
+                               fields: []FieldSpec{
+                                       {
+                                               Name: "service_name",
+                                       },
+                                       {
+                                               Name: "duration",
+                                       },
+                               },
+                       },
+               },
+               {
+                       name:    "fields absent",
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       m := NewMemTable("sw", "group", 0)
+                       var err error
+                       if err = m.Initialize(tt.args.fields); (err != nil) != 
tt.wantErr {
+                               t.Errorf("Initialize() error = %v, wantErr %v", 
err, tt.wantErr)
+                       }
+                       if err != nil {
+                               return
+                       }
+                       assert.Equal(t, len(m.terms.repo), len(tt.args.fields))
+               })
+       }
+}
+
+func TestMemTable_Range(t *testing.T) {
+       type args struct {
+               fieldName []byte
+               opts      *RangeOpts
+       }
+       m := NewMemTable("sw", "group", 0)
+       setUp(t, m)
+       tests := []struct {
+               name     string
+               args     args
+               wantList posting.List
+       }{
+               {
+                       name: "in range",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower: convert.Uint16ToBytes(100),
+                                       Upper: convert.Uint16ToBytes(500),
+                               },
+                       },
+                       wantList: m.MatchTerms(&Field{
+                               Name:  []byte("duration"),
+                               Value: convert.Uint16ToBytes(200),
+                       }),
+               },
+               {
+                       name: "excludes edge",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower: convert.Uint16ToBytes(50),
+                                       Upper: convert.Uint16ToBytes(1000),
+                               },
+                       },
+                       wantList: union(m,
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(200),
+                               },
+                       ),
+               },
+               {
+                       name: "includes lower",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower:         
convert.Uint16ToBytes(50),
+                                       Upper:         
convert.Uint16ToBytes(1000),
+                                       IncludesLower: true,
+                               },
+                       },
+                       wantList: union(m,
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(50),
+                               },
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(200),
+                               },
+                       ),
+               },
+               {
+                       name: "includes upper",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower:         
convert.Uint16ToBytes(50),
+                                       Upper:         
convert.Uint16ToBytes(1000),
+                                       IncludesUpper: true,
+                               },
+                       },
+                       wantList: union(m,
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(200),
+                               },
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(1000),
+                               },
+                       ),
+               },
+               {
+                       name: "includes edges",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower:         
convert.Uint16ToBytes(50),
+                                       Upper:         
convert.Uint16ToBytes(1000),
+                                       IncludesUpper: true,
+                                       IncludesLower: true,
+                               },
+                       },
+                       wantList: union(m,
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(50),
+                               },
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(200),
+                               },
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(1000),
+                               },
+                       ),
+               },
+               {
+                       name: "match one",
+                       args: args{
+                               fieldName: []byte("duration"),
+                               opts: &RangeOpts{
+                                       Lower:         
convert.Uint16ToBytes(200),
+                                       Upper:         
convert.Uint16ToBytes(200),
+                                       IncludesUpper: true,
+                                       IncludesLower: true,
+                               },
+                       },
+                       wantList: union(m,
+                               &Field{
+                                       Name:  []byte("duration"),
+                                       Value: convert.Uint16ToBytes(200),
+                               },
+                       ),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if gotList := m.Range(tt.args.fieldName, tt.args.opts); 
!reflect.DeepEqual(gotList, tt.wantList) {
+                               t.Errorf("Range() = %v, want %v", 
gotList.Len(), tt.wantList.Len())
+                       }
+               })
+       }
+}
+
+func union(memTable *MemTable, fields ...*Field) posting.List {
+       result := roaring.NewPostingList()
+       for _, f := range fields {
+               _ = result.Union(memTable.MatchTerms(f))
+       }
+       return result
+}
+
+func setUp(t *testing.T, mt *MemTable) {
+       assert.NoError(t, mt.Initialize([]FieldSpec{
+               {
+                       Name: "service_name",
+               },
+               {
+                       Name: "duration",
+               },
+       }))
+       for i := 0; i < 100; i++ {
+               if i%2 == 0 {
+                       assert.NoError(t, mt.Insert(&Field{
+                               Name:  []byte("service_name"),
+                               Value: []byte("gateway"),
+                       }, common.ChunkID(i)))
+               } else {
+                       assert.NoError(t, mt.Insert(&Field{
+                               Name:  []byte("service_name"),
+                               Value: []byte("webpage"),
+                       }, common.ChunkID(i)))
+               }
+       }
+       for i := 100; i < 200; i++ {
+               switch {
+               case i%3 == 0:
+                       assert.NoError(t, mt.Insert(&Field{
+                               Name:  []byte("duration"),
+                               Value: convert.Uint16ToBytes(50),
+                       }, common.ChunkID(i)))
+               case i%3 == 1:
+                       assert.NoError(t, mt.Insert(&Field{
+                               Name:  []byte("duration"),
+                               Value: convert.Uint16ToBytes(200),
+                       }, common.ChunkID(i)))
+               case i%3 == 2:
+                       assert.NoError(t, mt.Insert(&Field{
+                               Name:  []byte("duration"),
+                               Value: convert.Uint16ToBytes(1000),
+                       }, common.ChunkID(i)))
+               }
+       }
+}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
new file mode 100644
index 0000000..20bc492
--- /dev/null
+++ b/banyand/index/tsdb/term_map.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "bytes"
+       "sort"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+       "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+type termHashID uint64
+
+type postingMap struct {
+       repo  map[termHashID]*postingValue
+       mutex sync.RWMutex
+}
+
+func newPostingMap() *postingMap {
+       return &postingMap{
+               repo: make(map[termHashID]*postingValue),
+       }
+}
+
+func (p *postingMap) put(key []byte, id common.ChunkID) error {
+       list := p.getOrCreate(key)
+       list.Insert(id)
+       return nil
+}
+
+func (p *postingMap) getOrCreate(key []byte) posting.List {
+       list := p.get(key)
+       if list != roaring.EmptyPostingList {
+               return list
+       }
+       p.mutex.Lock()
+       defer p.mutex.Unlock()
+       hashedKey := termHashID(convert.Hash(key))
+       v := &postingValue{
+               key:   key,
+               value: roaring.NewPostingList(),
+       }
+       p.repo[hashedKey] = v
+       return v.value
+}
+
+func (p *postingMap) get(key []byte) posting.List {
+       p.mutex.RLock()
+       defer p.mutex.RUnlock()
+       hashedKey := termHashID(convert.Hash(key))
+       v, ok := p.repo[hashedKey]
+       if !ok {
+               return roaring.EmptyPostingList
+       }
+       return v.value
+}
+
+func (p *postingMap) allValues() posting.List {
+       result := roaring.NewPostingList()
+       for _, value := range p.repo {
+               _ = result.Union(value.value)
+       }
+       return result
+}
+
+func (p *postingMap) getRange(opts *RangeOpts) posting.List {
+       switch bytes.Compare(opts.Upper, opts.Lower) {
+       case -1:
+               return roaring.EmptyPostingList
+       case 0:
+               if opts.IncludesUpper && opts.IncludesLower {
+                       return p.get(opts.Upper)
+               }
+               return roaring.EmptyPostingList
+       }
+       p.mutex.RLock()
+       defer p.mutex.RUnlock()
+       keys := make(Asc, 0, len(p.repo))
+       for _, v := range p.repo {
+               keys = append(keys, v.key)
+       }
+       sort.Sort(keys)
+       index := sort.Search(len(keys), func(i int) bool {
+               return bytes.Compare(keys[i], opts.Lower) >= 0
+       })
+       result := roaring.NewPostingList()
+       for i := index; i < len(keys); i++ {
+               k := keys[i]
+               switch {
+               case bytes.Equal(k, opts.Lower):
+                       if opts.IncludesLower {
+                               _ = 
result.Union(p.repo[termHashID(convert.Hash(k))].value)
+                       }
+               case bytes.Compare(k, opts.Upper) > 0:
+                       break
+               case bytes.Equal(k, opts.Upper):
+                       if opts.IncludesUpper {
+                               _ = 
result.Union(p.repo[termHashID(convert.Hash(k))].value)
+                       }
+               default:
+                       _ = 
result.Union(p.repo[termHashID(convert.Hash(k))].value)
+               }
+       }
+       return result
+}
+
+type Asc [][]byte
+
+func (a Asc) Len() int {
+       return len(a)
+}
+
+func (a Asc) Less(i, j int) bool {
+       return bytes.Compare(a[i], a[j]) < 0
+}
+
+func (a Asc) Swap(i, j int) {
+       a[i], a[j] = a[j], a[i]
+}
+
+type postingValue struct {
+       key   []byte
+       value posting.List
+}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
new file mode 100644
index 0000000..91f114d
--- /dev/null
+++ b/banyand/index/tsdb/tsdb.go
@@ -0,0 +1,65 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/posting"
+)
+
+type RangeOpts struct {
+       Upper         []byte
+       Lower         []byte
+       IncludesUpper bool
+       IncludesLower bool
+}
+
+type GlobalStore interface {
+       Window(startTime, endTime uint64) (Searcher, bool)
+       Initialize(fields []FieldSpec) error
+       Insert(field *Field, chunkID common.ChunkID) error
+}
+
+type Searcher interface {
+       MatchField(fieldNames []byte) (list posting.List)
+       MatchTerms(field *Field) (list posting.List)
+       Range(fieldName []byte, opts *RangeOpts) (list posting.List)
+}
+
+type store struct {
+       memTable *MemTable
+       //TODO: add data tables
+}
+
+func (s *store) Window(_, _ uint64) (Searcher, bool) {
+       return s.memTable, true
+}
+
+func (s *store) Initialize(fields []FieldSpec) error {
+       return s.memTable.Initialize(fields)
+}
+
+func (s *store) Insert(field *Field, chunkID common.ChunkID) error {
+       return s.memTable.Insert(field, chunkID)
+}
+
+func NewStore(name, group string, shardID uint) GlobalStore {
+       return &store{
+               memTable: NewMemTable(name, group, shardID),
+       }
+}
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index b024cbe..d694895 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -57,11 +57,11 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate database")
        }
-       idx, err := index.NewService(ctx, repo, pipeline)
+       idx, err := index.NewService(ctx, repo)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate index builder")
        }
-       traceSeries, err := trace.NewService(ctx, db, repo)
+       traceSeries, err := trace.NewService(ctx, db, repo, idx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate trace series")
        }
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 7ebe12f..7136e0e 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -24,6 +24,7 @@ import (
        "testing"
        "time"
 
+       "github.com/golang/mock/gomock"
        googleUUID "github.com/google/uuid"
        "github.com/stretchr/testify/require"
 
@@ -31,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/event"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/series"
        "github.com/apache/skywalking-banyandb/banyand/series/trace"
        "github.com/apache/skywalking-banyandb/banyand/storage"
@@ -49,7 +51,7 @@ type entityValue struct {
        items      []interface{}
 }
 
-func setupServices(tester *require.Assertions) (discovery.ServiceRepo, 
series.Service, func()) {
+func setupServices(t *testing.T, tester *require.Assertions) 
(discovery.ServiceRepo, series.Service, func()) {
        // Bootstrap logger system
        tester.NoError(logger.Init(logger.Logging{
                Env:   "dev",
@@ -70,7 +72,10 @@ func setupServices(tester *require.Assertions) 
(discovery.ServiceRepo, series.Se
        tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 
        // Init `Trace` module
-       traceSvc, err := trace.NewService(context.TODO(), db, repo)
+       ctrl := gomock.NewController(t)
+       mockIndex := index.NewMockService(ctrl)
+       mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), 
gomock.Any()).AnyTimes()
+       traceSvc, err := trace.NewService(context.TODO(), db, repo, mockIndex)
        tester.NoError(err)
 
        // Init `Query` module
@@ -257,7 +262,7 @@ func TestQueryProcessor(t *testing.T) {
        tester := require.New(t)
 
        // setup services
-       repo, traceSvc, gracefulStop := setupServices(tester)
+       repo, traceSvc, gracefulStop := setupServices(t, tester)
        defer gracefulStop()
 
        baseTs := time.Now()
diff --git a/banyand/series/trace/common_test.go 
b/banyand/series/trace/common_test.go
index 51f1603..f8b40ae 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -26,12 +26,14 @@ import (
        "testing"
        "time"
 
+       "github.com/golang/mock/gomock"
        googleUUID "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,7 +65,10 @@ func setup(t *testing.T) (*traceSeries, func()) {
        assert.NoError(t, err)
        rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
        assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + 
rootPath}))
-       svc, err := NewService(context.TODO(), db, nil)
+       ctrl := gomock.NewController(t)
+       mockIndex := index.NewMockService(ctrl)
+       mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), 
gomock.Any()).AnyTimes()
+       svc, err := NewService(context.TODO(), db, nil, mockIndex)
        assert.NoError(t, err)
        assert.NoError(t, svc.PreRun())
        assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
new file mode 100644
index 0000000..7049a9d
--- /dev/null
+++ b/banyand/series/trace/service.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/event"
+       v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/index"
+       "github.com/apache/skywalking-banyandb/banyand/series"
+       "github.com/apache/skywalking-banyandb/banyand/series/schema"
+       "github.com/apache/skywalking-banyandb/banyand/storage"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var _ series.Service = (*service)(nil)
+
+type service struct {
+       db        storage.Database
+       schemaMap map[string]*traceSeries
+       l         *logger.Logger
+       repo      discovery.ServiceRepo
+       stopCh    chan struct{}
+       idx       index.Service
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, db storage.Database, repo 
discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+       return &service{
+               db:   db,
+               repo: repo,
+               idx:  idx,
+       }, nil
+}
+
+func (s *service) Name() string {
+       return "trace-series"
+}
+
+func (s *service) PreRun() error {
+       schemas, err := s.TraceSeries().List(context.Background(), 
schema.ListOpt{})
+       if err != nil {
+               return err
+       }
+       s.schemaMap = make(map[string]*traceSeries, len(schemas))
+       s.l = logger.GetLogger(s.Name())
+       for _, sa := range schemas {
+               ts, errTS := newTraceSeries(sa, s.l, s.idx)
+               if errTS != nil {
+                       return errTS
+               }
+               s.db.Register(ts)
+               id := formatTraceSeriesID(ts.name, ts.group)
+               s.schemaMap[id] = ts
+               s.l.Info().Str("id", id).Msg("initialize Trace series")
+       }
+       return err
+}
+
+func (s *service) Serve() error {
+       now := time.Now().UnixNano()
+       for _, sMeta := range s.schemaMap {
+               e := pb.NewSeriesEventBuilder().
+                       SeriesMetadata(sMeta.group, sMeta.name).
+                       FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
+                       Time(time.Now()).
+                       Action(v1.Action_ACTION_PUT).
+                       Build()
+               _, err := s.repo.Publish(event.TopicSeriesEvent, 
bus.NewMessage(bus.MessageID(now), e))
+               if err != nil {
+                       return err
+               }
+               seriesObj := &v1.Series{
+                       Series: &v1.Metadata{
+                               Name:  sMeta.name,
+                               Group: sMeta.group,
+                       },
+               }
+               rules, errGetRules := s.IndexRules(context.Background(), 
seriesObj, nil)
+               if errGetRules != nil {
+                       return errGetRules
+               }
+               shardedRuleIndex := make([]*v1.IndexRuleEvent_ShardedIndexRule, 
0, len(rules)*int(sMeta.shardNum))
+               for i := 0; i < int(sMeta.shardNum); i++ {
+                       t := time.Now()
+                       e := 
pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
+                               Shard(
+                                       pb.NewShardBuilder().
+                                               
ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, 
sMeta.name).UpdatedAt(t).CreatedAt(t).
+                                               Node(pb.NewNodeBuilder().
+                                                       
ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
+                                                       Build()).
+                                               Build()).
+                               Build()
+                       _, errShard := s.repo.Publish(event.TopicShardEvent, 
bus.NewMessage(bus.MessageID(now), e))
+                       if errShard != nil {
+                               return errShard
+                       }
+                       shardedRuleIndex = append(shardedRuleIndex, 
&v1.IndexRuleEvent_ShardedIndexRule{
+                               ShardId: uint64(i),
+                               Rules:   rules,
+                       })
+               }
+
+               indexRule := &v1.IndexRuleEvent{
+                       Series: seriesObj.Series,
+                       Rules:  shardedRuleIndex,
+                       Action: v1.Action_ACTION_PUT,
+                       Time:   timestamppb.New(time.Now()),
+               }
+               _, errPublishRules := s.repo.Publish(event.TopicIndexRule, 
bus.NewMessage(bus.MessageID(now), indexRule))
+               if errPublishRules != nil {
+                       return errPublishRules
+               }
+       }
+       s.stopCh = make(chan struct{})
+       <-s.stopCh
+       return nil
+}
+
+func (s *service) GracefulStop() {
+       if s.stopCh != nil {
+               close(s.stopCh)
+       }
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 2551813..1c5f639 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -18,7 +18,6 @@
 package trace
 
 import (
-       "context"
        "strconv"
        "time"
 
@@ -26,14 +25,11 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
-       "github.com/apache/skywalking-banyandb/api/event"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
        apischema "github.com/apache/skywalking-banyandb/api/schema"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/index"
        "github.com/apache/skywalking-banyandb/banyand/series"
-       "github.com/apache/skywalking-banyandb/banyand/series/schema"
        "github.com/apache/skywalking-banyandb/banyand/storage"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
@@ -76,89 +72,6 @@ const (
        StateError   = 1
 )
 
-var _ series.Service = (*service)(nil)
-
-type service struct {
-       db        storage.Database
-       schemaMap map[string]*traceSeries
-       l         *logger.Logger
-       repo      discovery.ServiceRepo
-       stopCh    chan struct{}
-}
-
-//NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo 
discovery.ServiceRepo) (series.Service, error) {
-       return &service{
-               db:   db,
-               repo: repo,
-       }, nil
-}
-
-func (s *service) Name() string {
-       return "trace-series"
-}
-
-func (s *service) PreRun() error {
-       schemas, err := s.TraceSeries().List(context.Background(), 
schema.ListOpt{})
-       if err != nil {
-               return err
-       }
-       s.schemaMap = make(map[string]*traceSeries, len(schemas))
-       s.l = logger.GetLogger(s.Name())
-       for _, sa := range schemas {
-               ts, errTS := newTraceSeries(sa, s.l)
-               if errTS != nil {
-                       return errTS
-               }
-               s.db.Register(ts)
-               id := formatTraceSeriesID(ts.name, ts.group)
-               s.schemaMap[id] = ts
-               s.l.Info().Str("id", id).Msg("initialize Trace series")
-       }
-       return err
-}
-
-func (s *service) Serve() error {
-       now := time.Now().UnixNano()
-       for _, sMeta := range s.schemaMap {
-               e := pb.NewSeriesEventBuilder().
-                       SeriesMetadata(sMeta.group, sMeta.name).
-                       FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
-                       Time(time.Now()).
-                       Action(v1.Action_ACTION_PUT).
-                       Build()
-               _, err := s.repo.Publish(event.TopicSeriesEvent, 
bus.NewMessage(bus.MessageID(now), e))
-               if err != nil {
-                       return err
-               }
-               for i := 0; i < int(sMeta.shardNum); i++ {
-                       t := time.Now()
-                       e := 
pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
-                               Shard(
-                                       pb.NewShardBuilder().
-                                               
ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, 
sMeta.name).UpdatedAt(t).CreatedAt(t).
-                                               Node(pb.NewNodeBuilder().
-                                                       
ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
-                                                       Build()).
-                                               Build()).
-                               Build()
-                       _, errShard := s.repo.Publish(event.TopicShardEvent, 
bus.NewMessage(bus.MessageID(now), e))
-                       if errShard != nil {
-                               return errShard
-                       }
-               }
-       }
-       s.stopCh = make(chan struct{})
-       <-s.stopCh
-       return nil
-}
-
-func (s *service) GracefulStop() {
-       if s.stopCh != nil {
-               close(s.stopCh)
-       }
-}
-
 func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt 
series.ScanOptions) (data.Trace, error) {
        ts, err := s.getSeries(traceSeries)
        if err != nil {
@@ -227,6 +140,7 @@ type traceSeries struct {
        schema                       apischema.TraceSeries
        reader                       storage.StoreRepo
        writePoint                   storage.GetWritePoint
+       idx                          index.Service
        shardNum                     uint32
        fieldIndex                   map[string]int
        traceIDIndex                 int
@@ -241,11 +155,12 @@ type traceSeries struct {
        fieldsNamesCompositeSeriesID []string
 }
 
-func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger) 
(*traceSeries, error) {
+func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger, idx 
index.Service) (*traceSeries, error) {
        t := &traceSeries{
                schema: schema,
                idGen:  series.NewIDGen(),
                l:      l,
+               idx:    idx,
        }
        meta := t.schema.Spec.GetMetadata()
        shardInfo := t.schema.Spec.GetShard()
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..60ed91a 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -22,10 +22,12 @@ import (
 
        "github.com/golang/protobuf/proto"
        "github.com/pkg/errors"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+       "github.com/apache/skywalking-banyandb/banyand/index"
        bydb_bytes "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/partition"
@@ -92,9 +94,52 @@ func (t *traceSeries) Write(seriesID common.SeriesID, 
shardID uint, entity data.
                Uint("trace_shard_id", traceIDShardID).
                Uint("shard_id", shardID).
                Msg("written to Trace series")
+       id := common.ChunkID(chunkID)
+       for i, field := range entityVal.GetFields() {
+               fieldSpec := t.schema.Spec.GetFields()[i]
+               fieldName := fieldSpec.GetName()
+               switch x := field.ValueType.(type) {
+               case *v1.Field_Str:
+                       err = multierr.Append(err, t.writeStrToIndex(shardID, 
id, fieldName, x.Str.GetValue()))
+               case *v1.Field_Int:
+                       err = multierr.Append(err, t.writeIntToIndex(shardID, 
id, fieldName, x.Int.GetValue()))
+               case *v1.Field_StrArray:
+                       for _, s := range x.StrArray.GetValue() {
+                               err = multierr.Append(err, 
t.writeStrToIndex(shardID, id, fieldName, s))
+                       }
+               case *v1.Field_IntArray:
+                       for _, integer := range x.IntArray.GetValue() {
+                               err = multierr.Append(err, 
t.writeIntToIndex(shardID, id, fieldName, integer))
+                       }
+               default:
+                       continue
+               }
+       }
        return common.ChunkID(chunkID), err
 }
 
+func (t *traceSeries) writeIntToIndex(shardID uint, id common.ChunkID, name 
string, value int64) error {
+       return t.writeIndex(shardID, id, name, convert.Int64ToBytes(value))
+}
+
+func (t *traceSeries) writeStrToIndex(shardID uint, id common.ChunkID, name 
string, value string) error {
+       return t.writeIndex(shardID, id, name, []byte(value))
+}
+
+func (t *traceSeries) writeIndex(shardID uint, id common.ChunkID, name string, 
value []byte) error {
+       return t.idx.Insert(*common.NewMetadata(&v1.Metadata{
+               Name:  t.name,
+               Group: t.group,
+       }),
+               shardID,
+               &index.Field{
+                       ChunkID: id,
+                       Name:    name,
+                       Value:   value,
+               },
+       )
+}
+
 // copyEntityValueWithoutDataBinary copies all fields without DataBinary
 func copyEntityValueWithoutDataBinary(ev *v1.EntityValue) *v1.EntityValue {
        return &v1.EntityValue{
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index c06d7a4..fa2d688 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -43,6 +43,10 @@ func Uint32ToBytes(u uint32) []byte {
        return bs
 }
 
+func BytesToInt64(b []byte) int64 {
+       return int64(binary.BigEndian.Uint64(b))
+}
+
 func BytesToUint64(b []byte) uint64 {
        return binary.BigEndian.Uint64(b)
 }
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 3812162..2d7ad16 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -58,6 +58,8 @@ type List interface {
        RemoveRange(min, max common.ChunkID) error
 
        Reset()
+
+       ToSlice() []common.ChunkID
 }
 
 type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 7c778d0..0b1c9b6 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -26,6 +26,8 @@ import (
 )
 
 var (
+       EmptyPostingList = NewPostingList()
+
        ErrIntersectRoaringOnly  = errors.New("Intersect only supported between 
roaringDocId sets")
        ErrUnionRoaringOnly      = errors.New("Union only supported between 
roaringDocId sets")
        ErrDifferenceRoaringOnly = errors.New("Difference only supported 
between roaringDocId sets")
@@ -192,3 +194,13 @@ func (it *roaringIterator) Close() error {
        it.closed = true
        return nil
 }
+
+func (p *postingsList) ToSlice() []common.ChunkID {
+       iter := p.Iterator()
+       defer iter.Close()
+       s := make([]common.ChunkID, 0, p.Len())
+       for iter.Next() {
+               s = append(s, iter.Current())
+       }
+       return s
+}

Reply via email to