This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/time-series by this push:
     new 30a74a6  Add stream moduel
30a74a6 is described below

commit 30a74a6eb39e9ad046f7a921322380cf2c9d0e64
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 2 00:23:29 2021 +0800

    Add stream moduel
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 .../model/v2/write.proto => data/stream.go}        |  36 +++--
 api/proto/banyandb/model/v2/write.pb.go            |  38 +++--
 api/proto/banyandb/model/v2/write.proto            |   1 +
 api/proto/banyandb/stream/v2/write.pb.go           | 169 ++++++++++++++-------
 api/proto/banyandb/stream/v2/write.proto           |  14 +-
 banyand/metadata/metadata.go                       |  21 ++-
 banyand/stream/service.go                          | 102 +++++++++++++
 banyand/stream/stream.go                           |  95 ++++++++++++
 banyand/stream/stream_write.go                     | 130 ++++++++++++++++
 banyand/stream/stream_write_test.go                | 153 +++++++++++++++++++
 banyand/stream/testdata/shard0.json                |  18 +++
 banyand/tsdb/block.go                              |  57 ++++++-
 banyand/tsdb/series.go                             | 168 +++++++++++++++++++-
 banyand/tsdb/seriesdb.go                           | 113 ++++++++------
 banyand/tsdb/seriesdb_test.go                      |  56 ++++---
 banyand/tsdb/shard.go                              |   2 +-
 banyand/tsdb/tsdb.go                               |  32 ++--
 go.mod                                             |   1 +
 18 files changed, 1022 insertions(+), 184 deletions(-)

diff --git a/api/proto/banyandb/model/v2/write.proto b/api/data/stream.go
similarity index 56%
copy from api/proto/banyandb/model/v2/write.proto
copy to api/data/stream.go
index 836423a..65fc2ef 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/data/stream.go
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-syntax = "proto3";
+package data
 
-option java_package = "org.apache.skywalking.banyandb.model.v2";
-option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2";
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       streamv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
 
-package banyandb.model.v2;
+var StreamKindVersion = common.KindVersion{Version: "v2", Kind: "data-stream"}
 
-import "google/protobuf/struct.proto";
-import "banyandb/model/v2/common.proto";
+var StreamWriteEventKindVersion = common.KindVersion{
+       Version: "v2",
+       Kind:    "stream-write",
+}
+
+var TopicStreamWriteEvent = bus.UniTopic(StreamWriteEventKindVersion.String())
+
+type Stream struct {
+       common.KindVersion
+       Entities []Entity
+}
 
-message Tag {
-    oneof value_type {
-        google.protobuf.NullValue null = 1;
-        Str str = 2;
-        StrArray str_array = 3;
-        Int int = 4;
-        IntArray int_array = 5;
-    }
+type StreamWriteData struct {
+       ShardID      uint
+       SeriesID     uint64
+       WriteRequest *streamv2.WriteRequest
 }
diff --git a/api/proto/banyandb/model/v2/write.pb.go 
b/api/proto/banyandb/model/v2/write.pb.go
index 0d36e62..1640f11 100644
--- a/api/proto/banyandb/model/v2/write.pb.go
+++ b/api/proto/banyandb/model/v2/write.pb.go
@@ -50,6 +50,7 @@ type Tag struct {
        //      *Tag_StrArray
        //      *Tag_Int
        //      *Tag_IntArray
+       //      *Tag_BinaryData
        ValueType isTag_ValueType `protobuf_oneof:"value_type"`
 }
 
@@ -127,6 +128,13 @@ func (x *Tag) GetIntArray() *IntArray {
        return nil
 }
 
+func (x *Tag) GetBinaryData() []byte {
+       if x, ok := x.GetValueType().(*Tag_BinaryData); ok {
+               return x.BinaryData
+       }
+       return nil
+}
+
 type isTag_ValueType interface {
        isTag_ValueType()
 }
@@ -151,6 +159,10 @@ type Tag_IntArray struct {
        IntArray *IntArray 
`protobuf:"bytes,5,opt,name=int_array,json=intArray,proto3,oneof"`
 }
 
+type Tag_BinaryData struct {
+       BinaryData []byte 
`protobuf:"bytes,6,opt,name=binary_data,json=binaryData,proto3,oneof"`
+}
+
 func (*Tag_Null) isTag_ValueType() {}
 
 func (*Tag_Str) isTag_ValueType() {}
@@ -161,6 +173,8 @@ func (*Tag_Int) isTag_ValueType() {}
 
 func (*Tag_IntArray) isTag_ValueType() {}
 
+func (*Tag_BinaryData) isTag_ValueType() {}
+
 var File_banyandb_model_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_model_v2_write_proto_rawDesc = []byte{
@@ -171,7 +185,7 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
        0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 
0x72, 0x6f, 0x74, 0x6f,
        0x1a, 0x1e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 
0x6f, 0x64, 0x65, 0x6c,
        0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 
0x72, 0x6f, 0x74, 0x6f,
-       0x22, 0x95, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 
0x6e, 0x75, 0x6c, 0x6c,
+       0x22, 0xb8, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 
0x6e, 0x75, 0x6c, 0x6c,
        0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 
0x67, 0x6c, 0x65, 0x2e,
        0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4e, 0x75, 0x6c, 
0x6c, 0x56, 0x61, 0x6c,
        0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x6e, 0x75, 0x6c, 0x6c, 0x12, 0x2a, 
0x0a, 0x03, 0x73, 0x74,
@@ -187,15 +201,18 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
        0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x61, 0x72, 0x72, 0x61, 0x79, 0x18, 
0x05, 0x20, 0x01, 0x28,
        0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 
0x2e, 0x6d, 0x6f, 0x64,
        0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x74, 0x41, 0x72, 0x72, 
0x61, 0x79, 0x48, 0x00,
-       0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x42, 0x0c, 
0x0a, 0x0a, 0x76, 0x61,
-       0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 
0x6f, 0x72, 0x67, 0x2e,
-       0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 
0x6c, 0x6b, 0x69, 0x6e,
-       0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 
0x6f, 0x64, 0x65, 0x6c,
-       0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 
0x63, 0x6f, 0x6d, 0x2f,
-       0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 
0x6c, 0x6b, 0x69, 0x6e,
-       0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 
0x70, 0x69, 0x2f, 0x70,
-       0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 
0x62, 0x2f, 0x6d, 0x6f,
-       0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 
0x6f, 0x33,
+       0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x12, 0x21, 
0x0a, 0x0b, 0x62, 0x69,
+       0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 
0x01, 0x28, 0x0c, 0x48,
+       0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 
0x61, 0x42, 0x0c, 0x0a,
+       0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 
0x6c, 0x0a, 0x27, 0x6f,
+       0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 
0x79, 0x77, 0x61, 0x6c,
+       0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 
0x62, 0x2e, 0x6d, 0x6f,
+       0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 
0x75, 0x62, 0x2e, 0x63,
+       0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 
0x79, 0x77, 0x61, 0x6c,
+       0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 
0x62, 0x2f, 0x61, 0x70,
+       0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 
0x61, 0x6e, 0x64, 0x62,
+       0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 
0x72, 0x6f, 0x74, 0x6f,
+       0x33,
 }
 
 var (
@@ -258,6 +275,7 @@ func file_banyandb_model_v2_write_proto_init() {
                (*Tag_StrArray)(nil),
                (*Tag_Int)(nil),
                (*Tag_IntArray)(nil),
+               (*Tag_BinaryData)(nil),
        }
        type x struct{}
        out := protoimpl.TypeBuilder{
diff --git a/api/proto/banyandb/model/v2/write.proto 
b/api/proto/banyandb/model/v2/write.proto
index 836423a..4636bd3 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/proto/banyandb/model/v2/write.proto
@@ -32,5 +32,6 @@ message Tag {
         StrArray str_array = 3;
         Int int = 4;
         IntArray int_array = 5;
+        bytes binary_data = 6;
     }
 }
diff --git a/api/proto/banyandb/stream/v2/write.pb.go 
b/api/proto/banyandb/stream/v2/write.pb.go
index 664078b..ecc7655 100644
--- a/api/proto/banyandb/stream/v2/write.pb.go
+++ b/api/proto/banyandb/stream/v2/write.pb.go
@@ -31,8 +31,8 @@ import (
        protoimpl "google.golang.org/protobuf/runtime/protoimpl"
        timestamppb "google.golang.org/protobuf/types/known/timestamppb"
 
-       v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
-       v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+       v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 )
 
 const (
@@ -53,13 +53,8 @@ type ElementValue struct {
        // 1) either the start time of a Span/Segment,
        // 2) or the timestamp of a log
        Timestamp *timestamppb.Timestamp 
`protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
-       // binary representation of segments, including tags, spans...
-       DataBinary []byte 
`protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" 
json:"data_binary,omitempty"`
-       // support all of indexed tags in the tags.
-       // Tag only has value, as the value of value_type match with the key
-       // by the index rules and index rule bindings of Metadata group.
-       // indexed tags of multiple entities are compression in the tags.
-       Tags []*v2.Tag `protobuf:"bytes,4,rep,name=tags,proto3" 
json:"tags,omitempty"`
+       // the order of tag_families' items match the stream schema
+       TagFamilies []*ElementValue_TagFamily 
`protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" 
json:"tag_families,omitempty"`
 }
 
 func (x *ElementValue) Reset() {
@@ -108,16 +103,9 @@ func (x *ElementValue) GetTimestamp() 
*timestamppb.Timestamp {
        return nil
 }
 
-func (x *ElementValue) GetDataBinary() []byte {
+func (x *ElementValue) GetTagFamilies() []*ElementValue_TagFamily {
        if x != nil {
-               return x.DataBinary
-       }
-       return nil
-}
-
-func (x *ElementValue) GetTags() []*v2.Tag {
-       if x != nil {
-               return x.Tags
+               return x.TagFamilies
        }
        return nil
 }
@@ -128,7 +116,7 @@ type WriteRequest struct {
        unknownFields protoimpl.UnknownFields
 
        // the metadata is only required in the first write.
-       Metadata *v21.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" 
json:"metadata,omitempty"`
+       Metadata *v2.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" 
json:"metadata,omitempty"`
        // the element is required.
        Element *ElementValue `protobuf:"bytes,2,opt,name=element,proto3" 
json:"element,omitempty"`
 }
@@ -165,7 +153,7 @@ func (*WriteRequest) Descriptor() ([]byte, []int) {
        return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{1}
 }
 
-func (x *WriteRequest) GetMetadata() *v21.Metadata {
+func (x *WriteRequest) GetMetadata() *v2.Metadata {
        if x != nil {
                return x.Metadata
        }
@@ -217,6 +205,53 @@ func (*WriteResponse) Descriptor() ([]byte, []int) {
        return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{2}
 }
 
+type ElementValue_TagFamily struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Tags []*v21.Tag `protobuf:"bytes,1,rep,name=tags,proto3" 
json:"tags,omitempty"`
+}
+
+func (x *ElementValue_TagFamily) Reset() {
+       *x = ElementValue_TagFamily{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *ElementValue_TagFamily) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ElementValue_TagFamily) ProtoMessage() {}
+
+func (x *ElementValue_TagFamily) ProtoReflect() protoreflect.Message {
+       mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use ElementValue_TagFamily.ProtoReflect.Descriptor instead.
+func (*ElementValue_TagFamily) Descriptor() ([]byte, []int) {
+       return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *ElementValue_TagFamily) GetTags() []*v21.Tag {
+       if x != nil {
+               return x.Tags
+       }
+       return nil
+}
+
 var File_banyandb_stream_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
@@ -229,35 +264,39 @@ var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
        0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 
0x6d, 0x6d, 0x6f, 0x6e,
        0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1d, 0x62, 0x61, 0x6e, 0x79, 
0x61, 0x6e, 0x64, 0x62,
        0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x2f, 0x77, 0x72, 
0x69, 0x74, 0x65, 0x2e,
-       0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 
0x65, 0x6d, 0x65, 0x6e,
+       0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 
0x65, 0x6d, 0x65, 0x6e,
        0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6c, 
0x65, 0x6d, 0x65, 0x6e,
        0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 
0x65, 0x6c, 0x65, 0x6d,
        0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 
0x65, 0x73, 0x74, 0x61,
        0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 
0x6f, 0x6f, 0x67, 0x6c,
        0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 
0x69, 0x6d, 0x65, 0x73,
        0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 
0x61, 0x6d, 0x70, 0x12,
-       0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x69, 0x6e, 0x61, 
0x72, 0x79, 0x18, 0x03,
-       0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x42, 0x69, 
0x6e, 0x61, 0x72, 0x79,
-       0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 
0x28, 0x0b, 0x32, 0x16,
-       0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 
0x64, 0x65, 0x6c, 0x2e,
-       0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 
0x22, 0x84, 0x01, 0x0a,
-       0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 
0x74, 0x12, 0x38, 0x0a,
-       0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 
0x01, 0x28, 0x0b, 0x32,
-       0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 
0x6f, 0x6d, 0x6d, 0x6f,
-       0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 
0x61, 0x52, 0x08, 0x6d,
-       0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 
0x6c, 0x65, 0x6d, 0x65,
-       0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 
0x61, 0x6e, 0x79, 0x61,
-       0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 
0x32, 0x2e, 0x45, 0x6c,
-       0x65, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 
0x65, 0x6c, 0x65, 0x6d,
-       0x65, 0x6e, 0x74, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 
0x52, 0x65, 0x73, 0x70,
-       0x6f, 0x6e, 0x73, 0x65, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
-       0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 
0x67, 0x2e, 0x62, 0x61,
-       0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 
0x6d, 0x2e, 0x76, 0x32,
-       0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 
0x2f, 0x61, 0x70, 0x61,
-       0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 
0x6e, 0x67, 0x2d, 0x62,
-       0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 
0x70, 0x72, 0x6f, 0x74,
-       0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 
0x74, 0x72, 0x65, 0x61,
-       0x6d, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+       0x4d, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 
0x69, 0x65, 0x73, 0x18,
+       0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 
0x61, 0x6e, 0x64, 0x62,
+       0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 
0x6c, 0x65, 0x6d, 0x65,
+       0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x54, 0x61, 0x67, 0x46, 
0x61, 0x6d, 0x69, 0x6c,
+       0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 
0x65, 0x73, 0x1a, 0x37,
+       0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 
0x2a, 0x0a, 0x04, 0x74,
+       0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 
0x62, 0x61, 0x6e, 0x79,
+       0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 
0x32, 0x2e, 0x54, 0x61,
+       0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a, 0x0c, 
0x57, 0x72, 0x69, 0x74,
+       0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x08, 
0x6d, 0x65, 0x74, 0x61,
+       0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 
0x2e, 0x62, 0x61, 0x6e,
+       0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 
0x2e, 0x76, 0x32, 0x2e,
+       0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 
0x74, 0x61, 0x64, 0x61,
+       0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 
0x74, 0x18, 0x02, 0x20,
+       0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 
0x64, 0x62, 0x2e, 0x73,
+       0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 
0x6d, 0x65, 0x6e, 0x74,
+       0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 
0x6e, 0x74, 0x22, 0x0f,
+       0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 
0x6e, 0x73, 0x65, 0x42,
+       0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 
0x65, 0x2e, 0x73, 0x6b,
+       0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 
0x79, 0x61, 0x6e, 0x64,
+       0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x5a, 
0x42, 0x67, 0x69, 0x74,
+       0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 
0x68, 0x65, 0x2f, 0x73,
+       0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 
0x6e, 0x79, 0x61, 0x6e,
+       0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 
0x2f, 0x62, 0x61, 0x6e,
+       0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 
0x2f, 0x76, 0x32, 0x62,
+       0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -272,25 +311,27 @@ func file_banyandb_stream_v2_write_proto_rawDescGZIP() 
[]byte {
        return file_banyandb_stream_v2_write_proto_rawDescData
 }
 
-var file_banyandb_stream_v2_write_proto_msgTypes = 
make([]protoimpl.MessageInfo, 3)
+var file_banyandb_stream_v2_write_proto_msgTypes = 
make([]protoimpl.MessageInfo, 4)
 var file_banyandb_stream_v2_write_proto_goTypes = []interface{}{
-       (*ElementValue)(nil),          // 0: banyandb.stream.v2.ElementValue
-       (*WriteRequest)(nil),          // 1: banyandb.stream.v2.WriteRequest
-       (*WriteResponse)(nil),         // 2: banyandb.stream.v2.WriteResponse
-       (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
-       (*v2.Tag)(nil),                // 4: banyandb.model.v2.Tag
-       (*v21.Metadata)(nil),          // 5: banyandb.common.v2.Metadata
+       (*ElementValue)(nil),           // 0: banyandb.stream.v2.ElementValue
+       (*WriteRequest)(nil),           // 1: banyandb.stream.v2.WriteRequest
+       (*WriteResponse)(nil),          // 2: banyandb.stream.v2.WriteResponse
+       (*ElementValue_TagFamily)(nil), // 3: 
banyandb.stream.v2.ElementValue.TagFamily
+       (*timestamppb.Timestamp)(nil),  // 4: google.protobuf.Timestamp
+       (*v2.Metadata)(nil),            // 5: banyandb.common.v2.Metadata
+       (*v21.Tag)(nil),                // 6: banyandb.model.v2.Tag
 }
 var file_banyandb_stream_v2_write_proto_depIdxs = []int32{
-       3, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> 
google.protobuf.Timestamp
-       4, // 1: banyandb.stream.v2.ElementValue.tags:type_name -> 
banyandb.model.v2.Tag
+       4, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> 
google.protobuf.Timestamp
+       3, // 1: banyandb.stream.v2.ElementValue.tag_families:type_name -> 
banyandb.stream.v2.ElementValue.TagFamily
        5, // 2: banyandb.stream.v2.WriteRequest.metadata:type_name -> 
banyandb.common.v2.Metadata
        0, // 3: banyandb.stream.v2.WriteRequest.element:type_name -> 
banyandb.stream.v2.ElementValue
-       4, // [4:4] is the sub-list for method output_type
-       4, // [4:4] is the sub-list for method input_type
-       4, // [4:4] is the sub-list for extension type_name
-       4, // [4:4] is the sub-list for extension extendee
-       0, // [0:4] is the sub-list for field type_name
+       6, // 4: banyandb.stream.v2.ElementValue.TagFamily.tags:type_name -> 
banyandb.model.v2.Tag
+       5, // [5:5] is the sub-list for method output_type
+       5, // [5:5] is the sub-list for method input_type
+       5, // [5:5] is the sub-list for extension type_name
+       5, // [5:5] is the sub-list for extension extendee
+       0, // [0:5] is the sub-list for field type_name
 }
 
 func init() { file_banyandb_stream_v2_write_proto_init() }
@@ -335,6 +376,18 @@ func file_banyandb_stream_v2_write_proto_init() {
                                return nil
                        }
                }
+               file_banyandb_stream_v2_write_proto_msgTypes[3].Exporter = 
func(v interface{}, i int) interface{} {
+                       switch v := v.(*ElementValue_TagFamily); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
        }
        type x struct{}
        out := protoimpl.TypeBuilder{
@@ -342,7 +395,7 @@ func file_banyandb_stream_v2_write_proto_init() {
                        GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
                        RawDescriptor: 
file_banyandb_stream_v2_write_proto_rawDesc,
                        NumEnums:      0,
-                       NumMessages:   3,
+                       NumMessages:   4,
                        NumExtensions: 0,
                        NumServices:   0,
                },
diff --git a/api/proto/banyandb/stream/v2/write.proto 
b/api/proto/banyandb/stream/v2/write.proto
index 59b0eef..39ef649 100644
--- a/api/proto/banyandb/stream/v2/write.proto
+++ b/api/proto/banyandb/stream/v2/write.proto
@@ -26,6 +26,8 @@ import "google/protobuf/timestamp.proto";
 import "banyandb/common/v2/common.proto";
 import "banyandb/model/v2/write.proto";
 
+
+
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the 
context of stream
   string element_id = 1;
@@ -33,13 +35,11 @@ message ElementValue {
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
-  // binary representation of segments, including tags, spans...
-  bytes data_binary = 3;
-  // support all of indexed tags in the tags.
-  // Tag only has value, as the value of value_type match with the key
-  // by the index rules and index rule bindings of Metadata group.
-  // indexed tags of multiple entities are compression in the tags.
-  repeated model.v2.Tag tags = 4;
+  message TagFamily {
+    repeated model.v2.Tag tags = 1;
+  }
+  // the order of tag_families' items match the stream schema
+  repeated TagFamily tag_families = 3;
 }
 
 message WriteRequest {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 57e1fd9..7bf3fc8 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -35,8 +35,13 @@ type IndexFilter interface {
        IndexRules(ctx context.Context, subject *commonv2.Metadata) 
([]*databasev2.IndexRule, error)
 }
 
-type Metadata interface {
+type Repo interface {
        IndexFilter
+       Stream() schema.Stream
+}
+
+type Service interface {
+       Repo
        run.Unit
 }
 
@@ -46,11 +51,7 @@ type service struct {
        indexRuleBinding schema.IndexRuleBinding
 }
 
-func (s *service) Name() string {
-       return "metadata"
-}
-
-func NewService(_ context.Context) (Metadata, error) {
+func NewService(_ context.Context) (Repo, error) {
        stream, err := schema.NewStream()
        if err != nil {
                return nil, err
@@ -70,6 +71,14 @@ func NewService(_ context.Context) (Metadata, error) {
        }, nil
 }
 
+func (s *service) Stream() schema.Stream {
+       return s.stream
+}
+
+func (s *service) Name() string {
+       return "metadata"
+}
+
 func (s *service) IndexRules(ctx context.Context, subject *commonv2.Metadata) 
([]*databasev2.IndexRule, error) {
        bindings, err := s.indexRuleBinding.List(ctx, schema.ListOpt{Group: 
subject.Group})
        if err != nil {
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
new file mode 100644
index 0000000..a187f29
--- /dev/null
+++ b/banyand/stream/service.go
@@ -0,0 +1,102 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "context"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+var ErrEmptyRootPath = errors.New("root path is empty")
+
+type Service interface {
+       run.PreRunner
+       run.Config
+       run.Service
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+       schemaMap     map[string]*stream
+       writeListener *writeCallback
+       l             *logger.Logger
+       metadata      metadata.Repo
+       root          string
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+       flagS := run.NewFlagSet("storage")
+       flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of 
database")
+       return flagS
+}
+
+func (s *service) Validate() error {
+       if s.root == "" {
+               return ErrEmptyRootPath
+       }
+       return nil
+}
+
+func (s *service) Name() string {
+       return "stream"
+}
+
+func (s *service) PreRun() error {
+       schemas, err := s.metadata.Stream().List(context.Background(), 
schema.ListOpt{})
+       if err != nil {
+               return err
+       }
+
+       s.schemaMap = make(map[string]*stream, len(schemas))
+       s.l = logger.GetLogger(s.Name())
+       for _, sa := range schemas {
+               sm, errTS := openStream(s.root, sa, s.l)
+               if errTS != nil {
+                       return errTS
+               }
+               id := formatStreamID(sm.name, sm.group)
+               s.schemaMap[id] = sm
+               s.writeListener.schemaMap[id] = sm
+               s.l.Info().Str("id", id).Msg("initialize stream")
+       }
+       s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+       return err
+}
+
+func (s *service) Serve() error {
+       panic("implement me")
+}
+
+func (s *service) GracefulStop() {
+       panic("implement me")
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, pipeline 
queue.Queue) (Service, error) {
+       return &service{
+               metadata: metadata,
+       }, nil
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
new file mode 100644
index 0000000..bb3e5f3
--- /dev/null
+++ b/banyand/stream/stream.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "context"
+
+       "github.com/pkg/errors"
+
+       databasev2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+       ErrMalformedElement = errors.New("element is malformed")
+)
+
+type stream struct {
+       name            string
+       group           string
+       l               *logger.Logger
+       schema          *databasev2.Stream
+       db              tsdb.Database
+       familyNameIndex []string
+       entityIndex     []struct {
+               family int
+               tag    int
+       }
+}
+
+func (s *stream) Close() error {
+       return s.db.Close()
+}
+
+func (s *stream) parseSchema() {
+       sm := s.schema
+       meta := sm.GetMetadata()
+       s.name, s.group = meta.GetName(), meta.GetGroup()
+       for _, family := range sm.GetTagFamilies() {
+               s.familyNameIndex = append(s.familyNameIndex, family.GetName())
+       }
+       for _, tagInEntity := range sm.Entity.GetTagNames() {
+       nextEntityTag:
+               for fi, family := range sm.GetTagFamilies() {
+                       for ti, tag := range family.Tags {
+                               if tagInEntity == tag.GetName() {
+                                       s.entityIndex = append(s.entityIndex, 
struct {
+                                               family int
+                                               tag    int
+                                       }{family: fi, tag: ti})
+                                       break nextEntityTag
+                               }
+                       }
+               }
+       }
+}
+
+func openStream(root string, schema *databasev2.Stream, l *logger.Logger) 
(*stream, error) {
+       sm := &stream{
+               schema: schema,
+               l:      l,
+       }
+       sm.parseSchema()
+       db, err := tsdb.OpenDatabase(
+               context.WithValue(context.Background(), logger.ContextKey, l),
+               tsdb.DatabaseOpts{
+                       Location: root,
+                       ShardNum: uint(schema.GetShardNum()),
+               })
+       if err != nil {
+               return nil, err
+       }
+       sm.db = db
+       return sm, nil
+}
+
+func formatStreamID(name, group string) string {
+       return name + ":" + group
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
new file mode 100644
index 0000000..42d79ab
--- /dev/null
+++ b/banyand/stream/stream_write.go
@@ -0,0 +1,130 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       streamv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an 
entry in an entity")
+
+func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
+       shard, err := s.db.Shard(shardID)
+       if err != nil {
+               return err
+       }
+       entity, err := s.buildEntity(value)
+       if err != nil {
+               return err
+       }
+       series, err := shard.Series().Get(entity)
+       if err != nil {
+               return err
+       }
+       t := value.GetTimestamp().AsTime()
+       wp, err := series.Span(tsdb.TimeRange{
+               Start:    t,
+               Duration: 0,
+       })
+       if err != nil {
+               return err
+       }
+       defer func() {
+               _ = wp.Close()
+       }()
+       builder := wp.WriterBuilder().Time(t)
+       for i, family := range value.GetTagFamilies() {
+               bb, errMarshal := proto.Marshal(family)
+               if errMarshal != nil {
+                       return errMarshal
+               }
+               builder.Family(s.familyNameIndex[i], bb)
+       }
+       writer, err := builder.Build()
+       if err != nil {
+               return err
+       }
+       _, err = writer.Write()
+       return err
+}
+
+func (s *stream) buildEntity(value *streamv2.ElementValue) (entity 
tsdb.Entity, err error) {
+       for _, index := range s.entityIndex {
+               if index.family >= len(value.GetTagFamilies()) {
+                       return nil, ErrMalformedElement
+               }
+               family := value.GetTagFamilies()[index.family]
+               if index.tag >= len(family.GetTags()) {
+                       return nil, ErrMalformedElement
+               }
+               entry, err := tagConvEntry(family.GetTags()[index.tag])
+               if err != nil {
+                       return nil, err
+               }
+               entity = append(entity, entry)
+       }
+       return entity, nil
+}
+
+func tagConvEntry(tag *modelv2.Tag) (tsdb.Entry, error) {
+       switch tag.GetValueType().(type) {
+       case *modelv2.Tag_Str:
+               return tsdb.Entry(tag.GetStr().GetValue()), nil
+       case *modelv2.Tag_Int:
+               return convert.Int64ToBytes(tag.GetInt().GetValue()), nil
+       default:
+               return nil, ErrUnsupportedTagTypeAsEntry
+       }
+}
+
+type writeCallback struct {
+       l         *logger.Logger
+       schemaMap map[string]*stream
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaMap map[string]*stream) 
*writeCallback {
+       wcb := &writeCallback{
+               l:         l,
+               schemaMap: schemaMap,
+       }
+       return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+       writeEvent, ok := message.Data().(data.StreamWriteData)
+       if !ok {
+               w.l.Warn().Msg("invalid event data type")
+               return
+       }
+       sm := writeEvent.WriteRequest.GetMetadata()
+       id := formatStreamID(sm.GetName(), sm.GetGroup())
+       err := w.schemaMap[id].Write(writeEvent.ShardID, 
writeEvent.WriteRequest.GetElement())
+       if err != nil {
+               w.l.Debug().Err(err)
+       }
+       return
+}
diff --git a/banyand/stream/stream_write_test.go 
b/banyand/stream/stream_write_test.go
new file mode 100644
index 0000000..e458396
--- /dev/null
+++ b/banyand/stream/stream_write_test.go
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "context"
+       "encoding/base64"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+       modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       streamv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func Test_Stream_Write(t *testing.T) {
+       tester := assert.New(t)
+       s, deferFunc := setup(tester)
+       defer deferFunc()
+
+       type args struct {
+               shardID uint
+               ele     *streamv2.ElementValue
+       }
+       tests := []struct {
+               name    string
+               args    args
+               wantErr bool
+       }{
+               {
+                       name: "golden path",
+                       args: args{
+                               shardID: 0,
+                               ele: getEle(
+                                       "trace_id-xxfff.111323",
+                                       0,
+                                       "webapp_id",
+                                       "10.0.0.1_id",
+                                       "/home_id",
+                                       300,
+                                       1622933202000000000,
+                               ),
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       err := s.Write(tt.args.shardID, tt.args.ele)
+                       if tt.wantErr {
+                               tester.Error(err)
+                               return
+                       }
+                       tester.NoError(err)
+               })
+       }
+
+}
+
+func setup(t *assert.Assertions) (*stream, func()) {
+       t.NoError(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "info",
+       }))
+       tempDir, deferFunc := test.Space(t)
+       streamRepo, err := schema.NewStream()
+       t.NoError(err)
+       streamSpec, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
+               Name:  "sw",
+               Group: "default",
+       })
+       t.NoError(err)
+       s, err := openStream(tempDir, streamSpec, logger.GetLogger("test"))
+       t.NoError(err)
+       return s, func() {
+               _ = s.Close()
+               deferFunc()
+       }
+}
+
+func getEle(tags ...interface{}) *streamv2.ElementValue {
+       searchableTags := make([]*modelv2.Tag, 0)
+       for _, tag := range tags {
+               searchableTags = append(searchableTags, getTag(tag))
+       }
+       bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+       e := &streamv2.ElementValue{
+               ElementId: "1231.dfd.123123ssf",
+               Timestamp: timestamppb.Now(),
+               TagFamilies: []*streamv2.ElementValue_TagFamily{
+                       {
+                               Tags: []*modelv2.Tag{
+                                       {
+                                               ValueType: 
&modelv2.Tag_BinaryData{
+                                                       BinaryData: bb,
+                                               },
+                                       },
+                               },
+                       },
+                       {
+                               Tags: searchableTags,
+                       },
+               },
+       }
+       return e
+}
+
+func getTag(tag interface{}) *modelv2.Tag {
+       if tag == nil {
+               return &modelv2.Tag{
+                       ValueType: &modelv2.Tag_Null{},
+               }
+       }
+       switch t := tag.(type) {
+       case int:
+               return &modelv2.Tag{
+                       ValueType: &modelv2.Tag_Int{
+                               Int: &modelv2.Int{
+                                       Value: int64(t),
+                               },
+                       },
+               }
+       case string:
+               return &modelv2.Tag{
+                       ValueType: &modelv2.Tag_Str{
+                               Str: &modelv2.Str{
+                                       Value: t,
+                               },
+                       },
+               }
+       }
+       return nil
+}
diff --git a/banyand/stream/testdata/shard0.json 
b/banyand/stream/testdata/shard0.json
new file mode 100644
index 0000000..28de2f3
--- /dev/null
+++ b/banyand/stream/testdata/shard0.json
@@ -0,0 +1,18 @@
+[
+  {
+    "element_id": "1",
+    "timestamp": "2021-04-15T01:30:15.01Z",
+    "tag_families": [
+      {
+        "tags": [
+          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
+        ]
+      },
+      {
+        "tags": [
+          {"str": ""}
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index dcab966..6505582 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,9 @@ package tsdb
 import (
        "context"
        "io"
+       "time"
+
+       "github.com/dgraph-io/ristretto/z"
 
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -28,10 +31,14 @@ import (
 type block struct {
        path string
        l    *logger.Logger
+       ref  *z.Closer
 
        store       kv.TimeSeriesStore
        treeIndex   kv.Store
        closableLst []io.Closer
+       endTime     time.Time
+       startTime   time.Time
+
        //revertedIndex kv.Store
 }
 
@@ -43,7 +50,9 @@ type blockOpts struct {
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
        b = &block{
-               path: opts.path,
+               path:      opts.path,
+               ref:       z.NewCloser(1),
+               startTime: time.Now(),
        }
        parentLogger := ctx.Value(logger.ContextKey)
        if parentLogger != nil {
@@ -62,8 +71,54 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
        return b, nil
 }
 
+func (b *block) delegate() blockDelegate {
+       b.incRef()
+       return &bDelegate{
+               delegate: b,
+       }
+}
+
+func (b *block) dscRef() {
+       b.ref.Done()
+}
+
+func (b *block) incRef() {
+       b.ref.AddRunning(1)
+}
+
 func (b *block) close() {
+       b.dscRef()
+       b.ref.SignalAndWait()
        for _, closer := range b.closableLst {
                _ = closer.Close()
        }
 }
+
+type blockDelegate interface {
+       io.Closer
+       contains(ts time.Time) bool
+       write(key []byte, val []byte, ts time.Time) error
+}
+
+var _ blockDelegate = (*bDelegate)(nil)
+
+type bDelegate struct {
+       delegate *block
+}
+
+func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
+       return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+}
+
+func (d *bDelegate) contains(ts time.Time) bool {
+       greaterAndEqualStart := d.delegate.startTime.Equal(ts) || 
d.delegate.startTime.Before(ts)
+       if d.delegate.endTime.IsZero() {
+               return greaterAndEqualStart
+       }
+       return greaterAndEqualStart && d.delegate.endTime.After(ts)
+}
+
+func (d *bDelegate) Close() error {
+       d.delegate.dscRef()
+       return nil
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 810aae0..ee8ce57 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -18,12 +18,20 @@
 package tsdb
 
 import (
+       "bytes"
+       "io"
        "time"
 
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
+
        "github.com/apache/skywalking-banyandb/api/common"
        modelv2 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+var ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+
 type Iterator interface {
        Next() bool
        Val() Item
@@ -43,6 +51,10 @@ type ConditionValue struct {
 type Condition map[string][]ConditionValue
 
 type ItemID struct {
+       //shardID int
+       //segID   []byte
+       //blockID []byte
+       id []byte
 }
 
 type TimeRange struct {
@@ -57,21 +69,23 @@ type Series interface {
 }
 
 type SeriesSpan interface {
+       io.Closer
        WriterBuilder() WriterBuilder
        Iterator() Iterator
        SeekerBuilder() SeekerBuilder
 }
 
 type WriterBuilder interface {
-       Family(name string) WriterBuilder
+       Family(name string, val []byte) WriterBuilder
        Time(ts time.Time) WriterBuilder
        Val(val []byte) WriterBuilder
-       OrderBy(order modelv2.QueryOrder_Sort) WriterBuilder
-       Build() Writer
+       Build() (Writer, error)
 }
 
 type Writer interface {
-       Write() ItemID
+       Write() (ItemID, error)
+       WriteLSMIndex(name string, val []byte) error
+       WriteInvertedIndex(name string, val []byte) error
 }
 
 type SeekerBuilder interface {
@@ -88,12 +102,14 @@ type Seeker interface {
 var _ Series = (*series)(nil)
 
 type series struct {
-       id common.SeriesID
+       id      common.SeriesID
+       blockDB blockDatabase
 }
 
-func newSeries(id common.SeriesID) *series {
+func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
        return &series{
-               id: id,
+               id:      id,
+               blockDB: blockDB,
        }
 }
 
@@ -102,9 +118,145 @@ func (s *series) ID() common.SeriesID {
 }
 
 func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
-       panic("implement me")
+       blocks := s.blockDB.span(timeRange)
+       if len(blocks) < 1 {
+               return nil, ErrEmptySeriesSpan
+       }
+       return newSeriesSpan(blocks, s.id), nil
 }
 
 func (s *series) Get(id ItemID) (Item, error) {
+       panic("not implemented")
+}
+
+var _ SeriesSpan = (*seriesSpan)(nil)
+
+type seriesSpan struct {
+       blocks   []blockDelegate
+       seriesID common.SeriesID
+}
+
+func (s *seriesSpan) Close() (err error) {
+       for _, delegate := range s.blocks {
+               err = multierr.Append(err, delegate.Close())
+       }
+       return err
+}
+
+func (s *seriesSpan) WriterBuilder() WriterBuilder {
+       return newWriterBuilder(s)
+}
+
+func (s *seriesSpan) Iterator() Iterator {
        panic("implement me")
 }
+
+func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
+       panic("implement me")
+}
+
+func newSeriesSpan(blocks []blockDelegate, id common.SeriesID) *seriesSpan {
+       return &seriesSpan{
+               blocks:   blocks,
+               seriesID: id,
+       }
+}
+
+var _ WriterBuilder = (*writerBuilder)(nil)
+
+type writerBuilder struct {
+       series *seriesSpan
+       block  blockDelegate
+       values []struct {
+               family []byte
+               val    []byte
+       }
+       ts            time.Time
+       seriesIDBytes []byte
+}
+
+func (w *writerBuilder) Family(name string, val []byte) WriterBuilder {
+       w.values = append(w.values, struct {
+               family []byte
+               val    []byte
+       }{family: bytes.Join([][]byte{w.seriesIDBytes, hash([]byte(name))}, 
nil), val: val})
+       return w
+}
+
+func (w *writerBuilder) Time(ts time.Time) WriterBuilder {
+       w.ts = ts
+       for _, b := range w.series.blocks {
+               if b.contains(ts) {
+                       w.block = b
+                       break
+               }
+       }
+       return w
+}
+
+func (w *writerBuilder) Val(val []byte) WriterBuilder {
+       w.values = append(w.values, struct {
+               family []byte
+               val    []byte
+       }{val: val})
+       return w
+}
+
+var ErrNoTime = errors.New("no time specified")
+var ErrNoVal = errors.New("no value specified")
+
+func (w *writerBuilder) Build() (Writer, error) {
+       if w.block == nil {
+               return nil, ErrNoTime
+       }
+       if len(w.values) < 1 {
+               return nil, ErrNoVal
+       }
+       wt := &writer{
+               block:    w.block,
+               ts:       w.ts,
+               seriesID: w.seriesIDBytes,
+               itemID:   bytes.Join([][]byte{w.seriesIDBytes, 
convert.Int64ToBytes(w.ts.UnixNano())}, nil),
+               columns:  w.values,
+       }
+       return wt, nil
+}
+
+func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
+       return &writerBuilder{
+               series:        seriesSpan,
+               seriesIDBytes: 
convert.Uint64ToBytes(uint64(seriesSpan.seriesID)),
+       }
+}
+
+var _ Writer = (*writer)(nil)
+
+type writer struct {
+       block    blockDelegate
+       ts       time.Time
+       seriesID []byte
+       columns  []struct {
+               family []byte
+               val    []byte
+       }
+       itemID []byte
+}
+
+func (w *writer) WriteLSMIndex(name string, val []byte) error {
+       panic("implement me")
+}
+
+func (w *writer) WriteInvertedIndex(name string, val []byte) error {
+       panic("implement me")
+}
+
+func (w *writer) Write() (id ItemID, err error) {
+       for _, c := range w.columns {
+               err = w.block.write(c.family, c.val, w.ts)
+               if err != nil {
+                       return id, err
+               }
+       }
+       id.id = w.itemID
+       return id, nil
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index b38c30f..aa9ec22 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -80,11 +80,16 @@ func NewPath(entries []Entry) Path {
 
 type SeriesDatabase interface {
        io.Closer
-       Create(entity Entity) error
+       Get(entity Entity) (Series, error)
        List(path Path) (SeriesList, error)
 }
 
+type blockDatabase interface {
+       span(timeRange TimeRange) []blockDelegate
+}
+
 var _ SeriesDatabase = (*seriesDB)(nil)
+var _ blockDatabase = (*seriesDB)(nil)
 
 type seriesDB struct {
        sync.Mutex
@@ -94,59 +99,23 @@ type seriesDB struct {
        seriesMetadata kv.Store
 }
 
-func (s *seriesDB) Close() error {
-       for _, seg := range s.lst {
-               seg.close()
-       }
-       return s.seriesMetadata.Close()
-}
-
-func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, 
error) {
-       sdb := &seriesDB{}
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger == nil {
-               return nil, logger.ErrNoLoggerInContext
-       }
-       if pl, ok := parentLogger.(*logger.Logger); ok {
-               sdb.l = pl.Named("series")
-       }
-       var err error
-       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
-       if err != nil {
-               return nil, err
-       }
-       segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
-       if err != nil {
-               return nil, err
-       }
-       seg, err := newSegment(ctx, segPath)
-       if err != nil {
-               return nil, err
-       }
-       {
-               sdb.Lock()
-               defer sdb.Unlock()
-               sdb.lst = append(sdb.lst, seg)
-       }
-       return sdb, nil
-}
-
-func (s *seriesDB) Create(entity Entity) error {
+func (s *seriesDB) Get(entity Entity) (Series, error) {
        key := hashEntity(entity)
-       _, err := s.seriesMetadata.Get(key)
+       seriesID, err := s.seriesMetadata.Get(key)
        if err != nil && err != kv.ErrKeyNotFound {
-               return err
+               return nil, err
        }
        if err == nil {
-               return nil
+               return newSeries(bytesConvSeriesID(seriesID), s), nil
        }
        s.Lock()
        defer s.Unlock()
-       err = s.seriesMetadata.Put(key, hash(key))
+       seriesID = hash(key)
+       err = s.seriesMetadata.Put(key, seriesID)
        if err != nil {
-               return err
+               return nil, err
        }
-       return nil
+       return newSeries(bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -156,7 +125,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
                        return nil, err
                }
                if err == nil {
-                       return 
[]Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
+                       return []Series{newSeries(bytesConvSeriesID(id), s)}, 
nil
                }
                return nil, nil
        }
@@ -173,7 +142,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
                                err = multierr.Append(err, errGetVal)
                                return nil
                        }
-                       result = append(result, 
newSeries(common.SeriesID(convert.BytesToUint64(id))))
+                       result = append(result, 
newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
                }
                return nil
        })
@@ -183,6 +152,52 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
        return result, err
 }
 
+func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+       //TODO: return correct blocks
+       result := make([]blockDelegate, 0, len(s.lst[0].lst))
+       for _, b := range s.lst[0].lst {
+               result = append(result, b.delegate())
+       }
+       return result
+}
+
+func (s *seriesDB) Close() error {
+       for _, seg := range s.lst {
+               seg.close()
+       }
+       return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, 
error) {
+       sdb := &seriesDB{}
+       parentLogger := ctx.Value(logger.ContextKey)
+       if parentLogger == nil {
+               return nil, logger.ErrNoLoggerInContext
+       }
+       if pl, ok := parentLogger.(*logger.Logger); ok {
+               sdb.l = pl.Named("series")
+       }
+       var err error
+       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
+       if err != nil {
+               return nil, err
+       }
+       segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+       if err != nil {
+               return nil, err
+       }
+       seg, err := newSegment(ctx, segPath)
+       if err != nil {
+               return nil, err
+       }
+       {
+               sdb.Lock()
+               defer sdb.Unlock()
+               sdb.lst = append(sdb.lst, seg)
+       }
+       return sdb, nil
+}
+
 func hashEntity(entity Entity) []byte {
        result := make(Entry, 0, len(entity)*8)
        for _, entry := range entity {
@@ -195,6 +210,10 @@ func hash(entry []byte) []byte {
        return convert.Uint64ToBytes(convert.Hash(entry))
 }
 
+func bytesConvSeriesID(data []byte) common.SeriesID {
+       return common.SeriesID(convert.BytesToUint64(data))
+}
+
 type SeriesList []Series
 
 func (a SeriesList) Len() int {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 098648a..83b3008 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -141,7 +141,7 @@ func TestNewPath(t *testing.T) {
        }
 }
 
-func Test_SeriesDatabase_Create(t *testing.T) {
+func Test_SeriesDatabase_Get(t *testing.T) {
 
        tests := []struct {
                name     string
@@ -152,7 +152,7 @@ func Test_SeriesDatabase_Create(t *testing.T) {
                        entities: []Entity{{
                                Entry("productpage"),
                                Entry("10.0.0.1"),
-                               Entry(convert.Uint64ToBytes(0)),
+                               convert.Uint64ToBytes(0),
                        }},
                },
                {
@@ -161,12 +161,12 @@ func Test_SeriesDatabase_Create(t *testing.T) {
                                {
                                        Entry("productpage"),
                                        Entry("10.0.0.1"),
-                                       Entry(convert.Uint64ToBytes(0)),
+                                       convert.Uint64ToBytes(0),
                                },
                                {
                                        Entry("productpage"),
                                        Entry("10.0.0.1"),
-                                       Entry(convert.Uint64ToBytes(0)),
+                                       convert.Uint64ToBytes(0),
                                },
                        },
                },
@@ -183,7 +183,9 @@ func Test_SeriesDatabase_Create(t *testing.T) {
                        s, err := 
newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test")), dir)
                        tester.NoError(err)
                        for _, entity := range tt.entities {
-                               tester.NoError(s.Create(entity))
+                               series, err := s.Get(entity)
+                               tester.NoError(err)
+                               tester.Equal(hashEntity(entity), series.ID())
                        }
                })
        }
@@ -214,7 +216,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                convert.Uint64ToBytes(0),
                        }),
                        want: SeriesList{
-                               newSeries(data[0].id),
+                               newMockSeries(data[0].id),
                        },
                },
                {
@@ -225,8 +227,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                AnyEntry,
                        }),
                        want: SeriesList{
-                               newSeries(data[1].id),
-                               newSeries(data[2].id),
+                               newMockSeries(data[1].id),
+                               newMockSeries(data[2].id),
                        },
                },
                {
@@ -237,10 +239,10 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                AnyEntry,
                        }),
                        want: SeriesList{
-                               newSeries(data[0].id),
-                               newSeries(data[1].id),
-                               newSeries(data[2].id),
-                               newSeries(data[3].id),
+                               newMockSeries(data[0].id),
+                               newMockSeries(data[1].id),
+                               newMockSeries(data[2].id),
+                               newMockSeries(data[3].id),
                        },
                },
                {
@@ -251,9 +253,9 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                convert.Uint64ToBytes(0),
                        }),
                        want: SeriesList{
-                               newSeries(data[0].id),
-                               newSeries(data[1].id),
-                               newSeries(data[3].id),
+                               newMockSeries(data[0].id),
+                               newMockSeries(data[1].id),
+                               newMockSeries(data[3].id),
                        },
                },
                {
@@ -264,8 +266,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                convert.Uint64ToBytes(1),
                        }),
                        want: SeriesList{
-                               newSeries(data[2].id),
-                               newSeries(data[4].id),
+                               newMockSeries(data[2].id),
+                               newMockSeries(data[4].id),
                        },
                },
        }
@@ -278,9 +280,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
                                return
                        }
                        tester.NoError(err)
-                       sort.Sort(tt.want)
-                       sort.Sort(series)
-                       tester.Equal(tt.want, series)
+                       tester.Equal(transform(tt.want), transform(series))
                })
        }
 }
@@ -330,7 +330,21 @@ func setUpEntities(t *assert.Assertions, db 
SeriesDatabase) []*entityWithID {
        }
        for _, d := range data {
                d.id = 
common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
-               t.NoError(db.Create(d.entity))
+               series, err := db.Get(d.entity)
+               t.NoError(err)
+               t.Equal(hashEntity(d.entity), series.ID())
        }
        return data
 }
+
+func newMockSeries(id common.SeriesID) *series {
+       return newSeries(id, nil)
+}
+
+func transform(list SeriesList) (seriesIDs []common.SeriesID) {
+       sort.Sort(list)
+       for _, s := range list {
+               seriesIDs = append(seriesIDs, s.ID())
+       }
+       return seriesIDs
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 050444e..94cb90a 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,7 +31,7 @@ type shard struct {
 }
 
 func (s *shard) Series() SeriesDatabase {
-       panic("implement me")
+       return s.seriesDatabase
 }
 
 func (s *shard) Index() IndexDatabase {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 503b0e8..ded75a2 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -44,9 +44,12 @@ const (
        dirPerm = 0700
 )
 
+var ErrInvalidShardID = errors.New("invalid shard id")
+
 type Database interface {
        io.Closer
        Shards() []Shard
+       Shard(id uint) (Shard, error)
 }
 
 type Shard interface {
@@ -71,6 +74,24 @@ type database struct {
        sync.Mutex
 }
 
+func (d *database) Shards() []Shard {
+       return d.sLst
+}
+
+func (d *database) Shard(id uint) (Shard, error) {
+       if int(id) >= len(d.sLst) {
+               return nil, ErrInvalidShardID
+       }
+       return d.sLst[id], nil
+}
+
+func (d *database) Close() error {
+       for _, s := range d.sLst {
+               _ = s.Close()
+       }
+       return nil
+}
+
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
        db := &database{
                location: opts.Location,
@@ -98,17 +119,6 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
        return createDatabase(thisContext, db)
 }
 
-func (d *database) Close() error {
-       for _, s := range d.sLst {
-               _ = s.Close()
-       }
-       return nil
-}
-
-func (d *database) Shards() []Shard {
-       return d.sLst
-}
-
 func createDatabase(ctx context.Context, db *database) (Database, error) {
        var err error
        db.Lock()
diff --git a/go.mod b/go.mod
index 65674a5..55e7aa9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
        github.com/RoaringBitmap/roaring v0.9.1
        github.com/cespare/xxhash v1.1.0
        github.com/dgraph-io/badger/v3 v3.2011.1
+       github.com/dgraph-io/ristretto v0.1.0
        github.com/golang/mock v1.5.0
        github.com/golang/protobuf v1.5.2
        github.com/google/go-cmp v0.5.6

Reply via email to