This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 88f5efb  Introduce gorilla encoding for integer values (#60)
88f5efb is described below

commit 88f5efb2e045f3b7d4d567906d03a62ca96efd2e
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Nov 20 13:15:24 2021 +0800

    Introduce gorilla encoding for integer values (#60)
---
 banyand/kv/badger.go                       |  31 +++++
 banyand/kv/kv.go                           |  16 ++-
 banyand/stream/stream.go                   |   8 +-
 banyand/tsdb/block.go                      |   2 +-
 banyand/tsdb/tsdb.go                       |   6 +-
 banyand/tsdb/tsdb_test.go                  |   8 +-
 go.mod                                     |   2 +-
 go.sum                                     |   4 +-
 pkg/bit/reader.go                          | 104 +++++++++++++++
 pkg/{bytes/bytes.go => bit/reader_test.go} |  41 ++++--
 pkg/bit/writer.go                          |  92 ++++++++++++++
 pkg/{bytes/bytes.go => bit/writer_test.go} |  50 ++++++--
 pkg/buffer/writer.go                       |  72 +++++++++++
 pkg/encoding/encoding.go                   |  14 ++-
 pkg/encoding/int.go                        | 195 +++++++++++++++++++++++++++++
 pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++-----------
 pkg/encoding/xor.go                        | 177 ++++++++++++++++++++++++++
 pkg/encoding/xor_test.go                   |  60 +++++++++
 18 files changed, 932 insertions(+), 125 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 24820f0..728a298 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) {
        l.delegated.Debug().Msgf(f, v...)
 }
 
+var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil)
+
+type encoderPoolDelegate struct {
+       encoding.SeriesEncoderPool
+}
+
+func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder {
+       return e.SeriesEncoderPool.Get(metadata)
+}
+
+func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) {
+       e.SeriesEncoderPool.Put(encoder)
+}
+
+var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil)
+
+type decoderPoolDelegate struct {
+       encoding.SeriesDecoderPool
+}
+
+func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder {
+       return &decoderDelegate{
+               e.SeriesDecoderPool.Get(metadata),
+       }
+}
+
+func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) {
+       dd := decoder.(*decoderDelegate)
+       e.SeriesDecoderPool.Put(dd.SeriesDecoder)
+}
+
 var _ bydb.TSetDecoder = (*decoderDelegate)(nil)
 
 type decoderDelegate struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index ea92d1c..2def4dc 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -23,7 +23,6 @@ import (
        "math"
 
        "github.com/dgraph-io/badger/v3"
-       "github.com/dgraph-io/badger/v3/bydb"
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
        }
 }
 
-func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, 
decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions {
+func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool 
encoding.SeriesDecoderPool) TimeSeriesOptions {
        return func(store TimeSeriesStore) {
                if btss, ok := store.(*badgerTSS); ok {
-                       btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() 
bydb.TSetEncoder {
-                               return encoderFactory()
-                       }, func() bydb.TSetDecoder {
-                               return &decoderDelegate{
-                                       SeriesDecoder: decoderFactory(),
-                               }
-                       })
+                       btss.dbOpts = btss.dbOpts.WithExternalCompactor(
+                               &encoderPoolDelegate{
+                                       encoderPool,
+                               }, &decoderPoolDelegate{
+                                       decoderPool,
+                               })
                }
        }
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 760687d..444a483 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l 
*logger.Logger) (*stream, error)
                        ShardNum:   sm.schema.GetOpts().GetShardNum(),
                        IndexRules: spec.indexRules,
                        EncodingMethod: tsdb.EncodingMethod{
-                               EncoderFactory: func() encoding.SeriesEncoder {
-                                       return 
encoding.NewStreamChunkEncoder(chunkSize)
-                               },
-                               DecoderFactory: func() encoding.SeriesDecoder {
-                                       return 
encoding.NewStreamChunkDecoder(chunkSize)
-                               },
+                               EncoderPool: 
encoding.NewPlainEncoderPool(chunkSize),
+                               DecoderPool: 
encoding.NewPlainDecoderPool(chunkSize),
                        },
                })
        if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 2f38308..956bf13 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, 
err error) {
        if b.store, err = kv.OpenTimeSeriesStore(
                0,
                b.path+"/store",
-               kv.TSSWithEncoding(encodingMethod.EncoderFactory, 
encodingMethod.DecoderFactory),
+               kv.TSSWithEncoding(encodingMethod.EncoderPool, 
encodingMethod.DecoderPool),
                kv.TSSWithLogger(b.l),
        ); err != nil {
                return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8df233b..e9ed19a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -82,8 +82,8 @@ type DatabaseOpts struct {
 }
 
 type EncodingMethod struct {
-       EncoderFactory encoding.SeriesEncoderFactory
-       DecoderFactory encoding.SeriesDecoderFactory
+       EncoderPool encoding.SeriesEncoderPool
+       DecoderPool encoding.SeriesDecoderPool
 }
 
 type database struct {
@@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
                        db.logger = pl.Named("tsdb")
                }
        }
-       if opts.EncodingMethod.EncoderFactory == nil || 
opts.EncodingMethod.DecoderFactory == nil {
+       if opts.EncodingMethod.EncoderPool == nil || 
opts.EncodingMethod.DecoderPool == nil {
                return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
open database")
        }
        if _, err := mkdir(opts.Location); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 89c56d6..c570fdf 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc 
func(), db Database
                        Location: tempDir,
                        ShardNum: 1,
                        EncodingMethod: EncodingMethod{
-                               EncoderFactory: func() encoding.SeriesEncoder {
-                                       return nil
-                               },
-                               DecoderFactory: func() encoding.SeriesDecoder {
-                                       return nil
-                               },
+                               EncoderPool: encoding.NewPlainEncoderPool(0),
+                               DecoderPool: encoding.NewPlainDecoderPool(0),
                        },
                })
        t.NoError(err)
diff --git a/go.mod b/go.mod
index 24e4ce0..b2815cb 100644
--- a/go.mod
+++ b/go.mod
@@ -101,4 +101,4 @@ require (
        sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => 
github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => 
github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
diff --git a/go.sum b/go.sum
index 957109b..afb3bff 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod 
h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets 
v0.0.0-20190726190000-eb7c87156f76/go.mod 
h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 
h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod 
h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 
h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA=
-github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod 
h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a 
h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
+github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod 
h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go
new file mode 100644
index 0000000..22cb929
--- /dev/null
+++ b/pkg/bit/reader.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+       "io"
+)
+
+// Reader reads bits from buffer
+type Reader struct {
+       in    io.ByteReader
+       cache byte
+       len   byte
+}
+
+// NewReader crate bit reader
+func NewReader(in io.ByteReader) *Reader {
+       return &Reader{
+               in: in,
+       }
+}
+
+// ReadBool reads a bit, 1 returns true, 0 returns false
+func (r *Reader) ReadBool() (bool, error) {
+       if r.len == 0 {
+               b, err := r.in.ReadByte()
+               if err != nil {
+                       return false, err
+               }
+               r.cache = b
+               r.len = 8
+       }
+       r.len--
+       b := r.cache & 0x80
+       r.cache <<= 1
+       return b != 0, nil
+}
+
+// ReadBits read number of bits
+func (r *Reader) ReadBits(numBits int) (uint64, error) {
+       var result uint64
+
+       for ; numBits >= 8; numBits -= 8 {
+               b, err := r.ReadByte()
+               if err != nil {
+                       return 0, err
+               }
+
+               result = (result << 8) | uint64(b)
+       }
+
+       for ; numBits > 0; numBits-- {
+               byt, err := r.ReadBool()
+               if err != nil {
+                       return 0, err
+               }
+               result <<= 1
+               if byt {
+                       result |= 1
+               }
+       }
+
+       return result, nil
+}
+
+// ReadByte reads a byte
+func (r *Reader) ReadByte() (byte, error) {
+       if r.len == 0 {
+               b, err := r.in.ReadByte()
+               if err != nil {
+                       return b, err
+               }
+               r.cache = b
+               return b, err
+       }
+       b, err := r.in.ReadByte()
+       if err != nil {
+               return b, err
+       }
+       result := r.cache | b>>r.len
+       r.cache = b << (8 - r.len)
+       return result, nil
+}
+
+// Reset resets the reader to read from a new slice
+func (r *Reader) Reset() {
+       r.len = 0
+       r.cache = 0
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go
similarity index 56%
copy from pkg/bytes/bytes.go
copy to pkg/bit/reader_test.go
index b4d051c..6f30bbe 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/reader_test.go
@@ -15,17 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
+package bit
 
-func Join(s ...[]byte) []byte {
-       n := 0
-       for _, v := range s {
-               n += len(v)
-       }
+import (
+       "bytes"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestReader(t *testing.T) {
+       data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80}
+
+       r := NewReader(bytes.NewBuffer(data))
+       a := assert.New(t)
+
+       eq(a, byte(3))(r.ReadByte())
+       eq(a, uint64(255))(r.ReadBits(8))
+
+       eq(a, uint64(0xc))(r.ReadBits(4))
+
+       eq(a, uint64(0xc1))(r.ReadBits(8))
+
+       eq(a, uint64(0xabcde))(r.ReadBits(20))
+
+       eq(a, true)(r.ReadBool())
+       eq(a, false)(r.ReadBool())
+
+}
 
-       b, i := make([]byte, n), 0
-       for _, v := range s {
-               i += copy(b[i:], v)
+func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) {
+       return func(actual interface{}, err error) {
+               a.NoError(err)
+               a.Equal(expected, actual)
        }
-       return b
 }
diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go
new file mode 100644
index 0000000..6302f9e
--- /dev/null
+++ b/pkg/bit/writer.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bit
+
+import (
+       "bytes"
+)
+
+// Writer writes bits to an io.BufferWriter
+type Writer struct {
+       out       *bytes.Buffer
+       cache     byte
+       available byte
+}
+
+// NewWriter create bit writer
+func NewWriter(buffer *bytes.Buffer) *Writer {
+       var bw Writer
+       bw.Reset(buffer)
+       return &bw
+}
+
+// Reset writes to a new writer
+func (w *Writer) Reset(buffer *bytes.Buffer) {
+       w.out = buffer
+       w.cache = 0
+       w.available = 8
+}
+
+// WriteBool writes a boolean value
+// true: 1
+// false: 0
+func (w *Writer) WriteBool(b bool) {
+       if b {
+               w.cache |= 1 << (w.available - 1)
+       }
+
+       w.available--
+
+       if w.available == 0 {
+               // WriteByte never returns error
+               _ = w.out.WriteByte(w.cache)
+               w.cache = 0
+               w.available = 8
+       }
+}
+
+// WriteBits writes number of bits
+func (w *Writer) WriteBits(u uint64, numBits int) {
+       u <<= 64 - uint(numBits)
+
+       for ; numBits >= 8; numBits -= 8 {
+               byt := byte(u >> 56)
+               w.WriteByte(byt)
+               u <<= 8
+       }
+
+       remainder := byte(u >> 56)
+       for ; numBits > 0; numBits-- {
+               w.WriteBool((remainder & 0x80) != 0)
+               remainder <<= 1
+       }
+}
+
+// WriteByte write a byte
+func (w *Writer) WriteByte(b byte) {
+       _ = w.out.WriteByte(w.cache | (b >> (8 - w.available)))
+       w.cache = b << w.available
+}
+
+// Flush flushes the currently in-process byte
+func (w *Writer) Flush() {
+       if w.available != 8 {
+               _ = w.out.WriteByte(w.cache)
+       }
+       w.Reset(w.out)
+}
diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go
similarity index 57%
rename from pkg/bytes/bytes.go
rename to pkg/bit/writer_test.go
index b4d051c..7624ff8 100644
--- a/pkg/bytes/bytes.go
+++ b/pkg/bit/writer_test.go
@@ -15,17 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package bytes
-
-func Join(s ...[]byte) []byte {
-       n := 0
-       for _, v := range s {
-               n += len(v)
-       }
-
-       b, i := make([]byte, n), 0
-       for _, v := range s {
-               i += copy(b[i:], v)
-       }
-       return b
+package bit
+
+import (
+       "bytes"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestWriter(t *testing.T) {
+       out := &bytes.Buffer{}
+       w := NewWriter(out)
+
+       a := assert.New(t)
+
+       w.WriteByte(0xc1)
+       w.WriteBool(false)
+       w.WriteBits(0x3f, 6)
+       w.WriteBool(true)
+       w.WriteByte(0xac)
+       w.WriteBits(0x01, 1)
+       w.WriteBits(0x1248f, 20)
+       w.Flush()
+
+       w.WriteByte(0x01)
+       w.WriteByte(0x02)
+
+       w.WriteBits(0x0f, 4)
+
+       w.WriteByte(0x80)
+       w.WriteByte(0x8f)
+       w.Flush()
+
+       w.WriteBits(0x01, 1)
+       w.WriteByte(0xff)
+       w.Flush()
+       a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 
0x08, 0xf0, 0xff, 0x80}, out.Bytes())
 }
diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go
new file mode 100644
index 0000000..e109d25
--- /dev/null
+++ b/pkg/buffer/writer.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package buffer
+
+import (
+       "bytes"
+       "encoding/binary"
+)
+
+// Writer writes data into a buffer
+type Writer struct {
+       buf *bytes.Buffer
+
+       scratch [binary.MaxVarintLen64]byte
+}
+
+func NewBufferWriter(buf *bytes.Buffer) *Writer {
+       return &Writer{
+               buf: buf,
+       }
+}
+
+func (w *Writer) Write(p []byte) {
+       _, _ = w.buf.Write(p)
+}
+
+func (w *Writer) WriteTo(other *Writer) (n int64) {
+       n, _ = w.buf.WriteTo(other.buf)
+       return n
+}
+
+func (w *Writer) PutUint16(v uint16) {
+       binary.LittleEndian.PutUint16(w.scratch[:], v)
+       _, _ = w.buf.Write(w.scratch[:2])
+}
+
+func (w *Writer) PutUint32(v uint32) {
+       binary.LittleEndian.PutUint32(w.scratch[:], v)
+       _, _ = w.buf.Write(w.scratch[:4])
+}
+
+func (w *Writer) PutUint64(v uint64) {
+       binary.LittleEndian.PutUint64(w.scratch[:], v)
+       _, _ = w.buf.Write(w.scratch[:8])
+}
+
+func (w *Writer) Reset() {
+       w.buf.Reset()
+}
+
+func (w *Writer) Len() int {
+       return w.buf.Len()
+}
+
+func (w *Writer) Bytes() []byte {
+       return w.buf.Bytes()
+}
diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go
index 5e04e72..05a1f14 100644
--- a/pkg/encoding/encoding.go
+++ b/pkg/encoding/encoding.go
@@ -21,7 +21,10 @@ import "github.com/pkg/errors"
 
 var ErrEncodeEmpty = errors.New("encode an empty value")
 
-type SeriesEncoderFactory func() SeriesEncoder
+type SeriesEncoderPool interface {
+       Get(metadata []byte) SeriesEncoder
+       Put(encoder SeriesEncoder)
+}
 
 // SeriesEncoder encodes time series data point
 type SeriesEncoder interface {
@@ -30,19 +33,22 @@ type SeriesEncoder interface {
        // IsFull returns whether the encoded data reached its capacity
        IsFull() bool
        // Reset the underlying buffer
-       Reset()
+       Reset(key []byte)
        // Encode the time series data point to a binary
        Encode() ([]byte, error)
        // StartTime indicates the first entry's time
        StartTime() uint64
 }
 
-type SeriesDecoderFactory func() SeriesDecoder
+type SeriesDecoderPool interface {
+       Get(metadata []byte) SeriesDecoder
+       Put(encoder SeriesDecoder)
+}
 
 // SeriesDecoder decodes encoded time series data
 type SeriesDecoder interface {
        // Decode the time series data
-       Decode(data []byte) error
+       Decode(key, data []byte) error
        // Len denotes the size of iterator
        Len() int
        // IsFull returns whether the encoded data reached its capacity
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
new file mode 100644
index 0000000..9571ec4
--- /dev/null
+++ b/pkg/encoding/int.go
@@ -0,0 +1,195 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+       "bytes"
+       "encoding/binary"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/bit"
+       "github.com/apache/skywalking-banyandb/pkg/buffer"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+       _ SeriesEncoder = (*intEncoder)(nil)
+)
+
+type ParseInterval = func(key []byte) time.Duration
+
+type intEncoder struct {
+       buff      *bytes.Buffer
+       bw        *bit.Writer
+       values    *XOREncoder
+       fn        ParseInterval
+       interval  time.Duration
+       startTime uint64
+       num       int
+       size      int
+}
+
+func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder {
+       buff := &bytes.Buffer{}
+       bw := bit.NewWriter(buff)
+       return &intEncoder{
+               buff:   buff,
+               bw:     bw,
+               values: NewXOREncoder(bw),
+               fn:     fn,
+               size:   size,
+       }
+}
+
+func (ie *intEncoder) Append(ts uint64, value []byte) {
+       if len(value) > 8 {
+               return
+       }
+       if ie.startTime == 0 {
+               ie.startTime = ts
+       }
+       gap := int(ts) - int(ie.startTime)
+       if gap < 0 {
+               return
+       }
+       zeroNum := gap/int(ie.interval) - 1
+       for i := 0; i < zeroNum; i++ {
+               ie.bw.WriteBool(false)
+               ie.num++
+       }
+       ie.bw.WriteBool(true)
+       ie.values.Write(binary.LittleEndian.Uint64(value))
+       ie.num++
+}
+
+func (ie *intEncoder) IsFull() bool {
+       return ie.num >= ie.size
+}
+
+func (ie *intEncoder) Reset(key []byte) {
+       ie.bw.Reset(nil)
+       ie.interval = ie.fn(key)
+}
+
+func (ie *intEncoder) Encode() ([]byte, error) {
+       ie.bw.Flush()
+       buffWriter := buffer.NewBufferWriter(ie.buff)
+       buffWriter.PutUint64(ie.startTime)
+       buffWriter.PutUint16(uint16(ie.size))
+       return ie.buff.Bytes(), nil
+}
+
+func (ie *intEncoder) StartTime() uint64 {
+       return ie.startTime
+}
+
+var _ SeriesDecoder = (*intDecoder)(nil)
+
+type intDecoder struct {
+       fn        ParseInterval
+       size      int
+       interval  time.Duration
+       startTime uint64
+       num       int
+       area      []byte
+}
+
+func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder {
+       return &intDecoder{
+               fn:   fn,
+               size: size,
+       }
+}
+
+func (i intDecoder) Decode(key, data []byte) error {
+       i.interval = i.fn(key)
+       i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : 
len(data)-2])
+       i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
+       i.area = data[:len(data)-10]
+       return nil
+}
+
+func (i intDecoder) Len() int {
+       return i.num
+}
+
+func (i intDecoder) IsFull() bool {
+       return i.num >= i.size
+}
+
+func (i intDecoder) Get(ts uint64) ([]byte, error) {
+       for iter := i.Iterator(); iter.Next(); {
+               if iter.Time() == ts {
+                       return iter.Val(), nil
+               }
+       }
+       return nil, nil
+}
+
+func (i intDecoder) Iterator() SeriesIterator {
+       br := bit.NewReader(bytes.NewReader(i.area))
+       return &intIterator{
+               startTime: i.startTime,
+               interval:  int(i.interval),
+               br:        br,
+               values:    NewXORDecoder(br),
+       }
+}
+
+var _ SeriesIterator = (*intIterator)(nil)
+
+type intIterator struct {
+       startTime uint64
+       interval  int
+       br        *bit.Reader
+       values    *XORDecoder
+
+       currVal  uint64
+       currTime uint64
+       index    int
+       err      error
+}
+
+func (i *intIterator) Next() bool {
+       var b bool
+       b, i.err = i.br.ReadBool()
+       if i.err != nil {
+               return false
+       }
+       if b {
+               if i.values.Next() {
+                       i.currVal = i.values.Value()
+               }
+       }
+       i.currVal = 0
+       i.currTime = i.startTime + uint64(i.interval*i.index)
+       i.index++
+       return true
+}
+
+func (i *intIterator) Val() []byte {
+       return convert.Uint64ToBytes(i.currVal)
+}
+
+func (i *intIterator) Time() uint64 {
+       return i.currTime
+}
+
+func (i *intIterator) Error() error {
+       return i.err
+}
diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go
similarity index 54%
rename from pkg/encoding/stream_chunk.go
rename to pkg/encoding/plain.go
index 7ff5094..0b3fb8c 100644
--- a/pkg/encoding/stream_chunk.go
+++ b/pkg/encoding/plain.go
@@ -22,106 +22,147 @@ import (
        "encoding/binary"
        "fmt"
        "sort"
+       "sync"
 
        "github.com/klauspost/compress/zstd"
        "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/pkg/buffer"
 )
 
 var (
-       decoder, _               = zstd.NewReader(nil)
-       encoder, _               = zstd.NewWriter(nil, 
zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
-       _          SeriesEncoder = (*streamChunkEncoder)(nil)
-       _          SeriesDecoder = (*StreamChunkDecoder)(nil)
+       encoderPool = sync.Pool{
+               New: newPlainEncoder,
+       }
+       decoderPool = sync.Pool{
+               New: func() interface{} {
+                       return &plainDecoder{}
+               },
+       }
+)
+
+type plainEncoderPool struct {
+       pool *sync.Pool
+       size int
+}
+
+func NewPlainEncoderPool(size int) SeriesEncoderPool {
+       return &plainEncoderPool{
+               pool: &encoderPool,
+               size: size,
+       }
+}
+
+func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder {
+       encoder := b.pool.Get().(*plainEncoder)
+       encoder.Reset(metadata)
+       encoder.valueSize = b.size
+       return encoder
+}
+
+func (b *plainEncoderPool) Put(encoder SeriesEncoder) {
+       b.pool.Put(encoder)
+}
+
+type plainDecoderPool struct {
+       pool *sync.Pool
+       size int
+}
+
+func NewPlainDecoderPool(size int) SeriesDecoderPool {
+       return &plainDecoderPool{
+               pool: &decoderPool,
+               size: size,
+       }
+}
+
+func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder {
+       decoder := b.pool.Get().(*plainDecoder)
+       decoder.valueSize = b.size
+       return decoder
+}
+
+func (b *plainDecoderPool) Put(decoder SeriesDecoder) {
+       b.pool.Put(decoder)
+}
+
+var (
+       zstdDecoder, _               = zstd.NewReader(nil)
+       zstdEncoder, _               = zstd.NewWriter(nil, 
zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
+       _              SeriesEncoder = (*plainEncoder)(nil)
+       _              SeriesDecoder = (*plainDecoder)(nil)
 )
 
-//streamChunkEncoder backport to reduced value
-type streamChunkEncoder struct {
-       tsBuff    bytes.Buffer
-       valBuff   bytes.Buffer
-       scratch   [binary.MaxVarintLen64]byte
+//plainEncoder backport to reduced value
+type plainEncoder struct {
+       tsBuff    *buffer.Writer
+       valBuff   *buffer.Writer
        len       uint32
        num       uint32
        startTime uint64
        valueSize int
 }
 
-func NewStreamChunkEncoder(size int) SeriesEncoder {
-       return &streamChunkEncoder{
-               valueSize: size,
+func newPlainEncoder() interface{} {
+       return &plainEncoder{
+               tsBuff:  buffer.NewBufferWriter(&bytes.Buffer{}),
+               valBuff: buffer.NewBufferWriter(&bytes.Buffer{}),
        }
 }
 
-func (t *streamChunkEncoder) Append(ts uint64, value []byte) {
+func (t *plainEncoder) Append(ts uint64, value []byte) {
        if t.startTime == 0 {
                t.startTime = ts
        } else if t.startTime > ts {
                t.startTime = ts
        }
        vLen := len(value)
-       offset := uint32(len(t.valBuff.Bytes()))
-       t.valBuff.Write(t.putUint32(uint32(vLen)))
+       offset := uint32(t.valBuff.Len())
+       t.valBuff.PutUint32(uint32(vLen))
        t.valBuff.Write(value)
-       t.tsBuff.Write(t.putUint64(ts))
-       t.tsBuff.Write(t.putUint32(offset))
-       t.num = t.num + 1
+       t.tsBuff.PutUint64(ts)
+       t.tsBuff.PutUint32(offset)
+       t.num++
 }
 
-func (t *streamChunkEncoder) IsFull() bool {
+func (t *plainEncoder) IsFull() bool {
        return t.valBuff.Len() >= t.valueSize
 }
 
-func (t *streamChunkEncoder) Reset() {
+func (t *plainEncoder) Reset(_ []byte) {
        t.tsBuff.Reset()
        t.valBuff.Reset()
        t.num = 0
        t.startTime = 0
 }
 
-func (t *streamChunkEncoder) Encode() ([]byte, error) {
+func (t *plainEncoder) Encode() ([]byte, error) {
        if t.tsBuff.Len() < 1 {
                return nil, ErrEncodeEmpty
        }
        val := t.valBuff.Bytes()
        t.len = uint32(len(val))
-       _, err := t.tsBuff.WriteTo(&t.valBuff)
-       if err != nil {
-               return nil, err
-       }
-       t.valBuff.Write(t.putUint32(t.num))
-       t.valBuff.Write(t.putUint32(t.len))
+       t.tsBuff.WriteTo(t.valBuff)
+       t.valBuff.PutUint32(t.num)
+       t.valBuff.PutUint32(t.len)
        data := t.valBuff.Bytes()
        l := len(data)
        dst := make([]byte, 0, compressBound(l))
-       dst = encoder.EncodeAll(data, dst)
-       result := make([]byte, len(dst)+2)
-       copy(result, dst)
-       copy(result[len(dst):], t.putUint16(uint16(l)))
-       return result, nil
+       dst = zstdEncoder.EncodeAll(data, dst)
+       result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, 
len(dst)+2)))
+       result.Write(dst)
+       result.PutUint16(uint16(l))
+       return result.Bytes(), nil
 }
 
 func compressBound(srcSize int) int {
        return srcSize + (srcSize >> 8)
 }
 
-func (t *streamChunkEncoder) StartTime() uint64 {
+func (t *plainEncoder) StartTime() uint64 {
        return t.startTime
 }
 
-func (t *streamChunkEncoder) putUint16(v uint16) []byte {
-       binary.LittleEndian.PutUint16(t.scratch[:], v)
-       return t.scratch[:2]
-}
-
-func (t *streamChunkEncoder) putUint32(v uint32) []byte {
-       binary.LittleEndian.PutUint32(t.scratch[:], v)
-       return t.scratch[:4]
-}
-
-func (t *streamChunkEncoder) putUint64(v uint64) []byte {
-       binary.LittleEndian.PutUint64(t.scratch[:], v)
-       return t.scratch[:8]
-}
-
 const (
        // TsLen equals ts(uint64) + data_offset(uint32)
        TsLen = 8 + 4
@@ -129,8 +170,8 @@ const (
 
 var ErrInvalidValue = errors.New("invalid encoded value")
 
-//StreamChunkDecoder decodes encoded time index
-type StreamChunkDecoder struct {
+//plainDecoder decodes encoded time index
+type plainDecoder struct {
        ts        []byte
        val       []byte
        len       uint32
@@ -138,20 +179,14 @@ type StreamChunkDecoder struct {
        valueSize int
 }
 
-func NewStreamChunkDecoder(size int) SeriesDecoder {
-       return &StreamChunkDecoder{
-               valueSize: size,
-       }
-}
-
-func (t *StreamChunkDecoder) Len() int {
+func (t *plainDecoder) Len() int {
        return int(t.num)
 }
 
-func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) {
+func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
        var data []byte
        size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
-       if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 
0, size)); err != nil {
+       if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], 
make([]byte, 0, size)); err != nil {
                return err
        }
        l := uint32(len(data))
@@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err 
error) {
        return nil
 }
 
-func (t *StreamChunkDecoder) IsFull() bool {
+func (t *plainDecoder) IsFull() bool {
        return int(t.len) >= t.valueSize
 }
 
-func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) {
+func (t *plainDecoder) Get(ts uint64) ([]byte, error) {
        i := sort.Search(int(t.num), func(i int) bool {
                slot := getTSSlot(t.ts, i)
                return parseTS(slot) <= ts
@@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) 
{
        return getVal(t.val, parseOffset(slot))
 }
 
-func (t *StreamChunkDecoder) Iterator() SeriesIterator {
+func (t *plainDecoder) Iterator() SeriesIterator {
        return newBlockItemIterator(t)
 }
 
@@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 {
        return binary.LittleEndian.Uint32(tsSlot[8:])
 }
 
-var _ SeriesIterator = (*chunkIterator)(nil)
+var _ SeriesIterator = (*plainIterator)(nil)
 
-type chunkIterator struct {
+type plainIterator struct {
        index []byte
        data  []byte
        idx   int
        num   int
 }
 
-func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator {
-       return &chunkIterator{
+func newBlockItemIterator(decoder *plainDecoder) SeriesIterator {
+       return &plainIterator{
                idx:   -1,
                index: decoder.ts,
                data:  decoder.val,
@@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) 
SeriesIterator {
        }
 }
 
-func (b *chunkIterator) Next() bool {
+func (b *plainIterator) Next() bool {
        b.idx++
        return b.idx >= 0 && b.idx < b.num
 }
 
-func (b *chunkIterator) Val() []byte {
+func (b *plainIterator) Val() []byte {
        v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx)))
        return v
 }
 
-func (b *chunkIterator) Time() uint64 {
+func (b *plainIterator) Time() uint64 {
        return parseTS(getTSSlot(b.index, b.idx))
 }
 
-func (b *chunkIterator) Error() error {
+func (b *plainIterator) Error() error {
        return nil
 }
diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go
new file mode 100644
index 0000000..43203ca
--- /dev/null
+++ b/pkg/encoding/xor.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+       "math/bits"
+
+       "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+const (
+       ctrlBitsNoContainMeaningful = 0x2
+       ctrlBitsContainMeaningful   = 0x3
+)
+
+// XOREncoder intends to compress uint64 data
+// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
+type XOREncoder struct {
+       bw       *bit.Writer
+       preVal   uint64
+       leading  int
+       trailing int
+
+       first bool
+}
+
+// NewXOREncoder creates xor zstdEncoder for compressing uint64 data
+func NewXOREncoder(bw *bit.Writer) *XOREncoder {
+       return &XOREncoder{
+               bw:    bw,
+               first: true,
+       }
+}
+
+func (e *XOREncoder) Write(val uint64) {
+       if e.first {
+               e.first = false
+               e.preVal = val
+               e.bw.WriteBits(val, 64)
+               return
+       }
+
+       delta := val ^ e.preVal
+       e.preVal = val
+       if delta == 0 {
+               e.bw.WriteBool(false)
+               return
+       }
+
+       leading := bits.LeadingZeros64(delta)
+       trailing := bits.TrailingZeros64(delta)
+       if leading >= e.leading && trailing >= e.trailing {
+               // write control '10' to reuse previous block meaningful bits
+               e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2)
+               e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing)
+       } else {
+               // write control '11' to create a new block meaningful bits
+               e.bw.WriteBits(ctrlBitsContainMeaningful, 2)
+               meaningfulLen := 64 - leading - trailing
+               e.bw.WriteBits(uint64(leading), 6)
+               // meaningfulLen is at least 1, so we can subtract 1 from it 
and encode it in 6 bits
+               e.bw.WriteBits(uint64(meaningfulLen-1), 6)
+               e.bw.WriteBits(delta>>uint(trailing), meaningfulLen)
+
+               e.leading = leading
+               e.trailing = trailing
+       }
+}
+
+// XORDecoder decodes buffer to uint64 values using xor compress
+type XORDecoder struct {
+       val uint64
+
+       br *bit.Reader
+
+       leading  uint64
+       trailing uint64
+
+       first bool
+       err   error
+}
+
+// NewXORDecoder create zstdDecoder decompress buffer using xor
+func NewXORDecoder(br *bit.Reader) *XORDecoder {
+       s := &XORDecoder{
+               br:    br,
+               first: true,
+       }
+       return s
+}
+
+// Reset resets the underlying buffer to decode
+func (d *XORDecoder) Reset() {
+       d.first = true
+       d.leading = 0
+       d.trailing = 0
+       d.val = 0
+}
+
+// Next return if zstdDecoder has value in buffer using xor, do uncompress 
logic in next method,
+// data format reference zstdEncoder format
+func (d *XORDecoder) Next() bool {
+       if d.first {
+               // read first value
+               d.first = false
+               d.val, d.err = d.br.ReadBits(64)
+               return d.err == nil
+       }
+
+       var b bool
+       // read delta control bit
+       b, d.err = d.br.ReadBool()
+       if d.err != nil {
+               return false
+       }
+       if !b {
+               return true
+       }
+       ctrlBits := ctrlBitsNoContainMeaningful
+       // read control bit
+       b, d.err = d.br.ReadBool()
+       if d.err != nil {
+               return false
+       }
+       if b {
+               ctrlBits |= 1
+       }
+       var blockSize uint64
+       if ctrlBits == ctrlBitsNoContainMeaningful {
+               blockSize = 64 - d.leading - d.trailing
+       } else {
+               // read leading and trailing, because block is diff with 
previous
+               d.leading, d.err = d.br.ReadBits(6)
+               if d.err != nil {
+                       return false
+               }
+               blockSize, d.err = d.br.ReadBits(6)
+               if d.err != nil {
+                       return false
+               }
+               blockSize++
+               d.trailing = 64 - d.leading - blockSize
+       }
+       delta, err := d.br.ReadBits(int(blockSize))
+       if err != nil {
+               d.err = err
+               return false
+       }
+       val := delta << d.trailing
+       d.val ^= val
+       return true
+}
+
+// Value returns uint64 from buffer
+func (d *XORDecoder) Value() uint64 {
+       return d.val
+}
+
+// Err returns error raised in Next()
+func (d *XORDecoder) Err() error {
+       return d.err
+}
diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go
new file mode 100644
index 0000000..282ab57
--- /dev/null
+++ b/pkg/encoding/xor_test.go
@@ -0,0 +1,60 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package encoding
+
+import (
+       "bytes"
+       "io"
+       "testing"
+
+       "github.com/pkg/errors"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/pkg/bit"
+)
+
+func TestXOR(t *testing.T) {
+       var buf bytes.Buffer
+       bitWriter := bit.NewWriter(&buf)
+       e := NewXOREncoder(bitWriter)
+       e.Write(uint64(76))
+       e.Write(uint64(50))
+       e.Write(uint64(50))
+       e.Write(uint64(999999999))
+       e.Write(uint64(100))
+
+       bitWriter.Flush()
+       data := buf.Bytes()
+
+       reader := bit.NewReader(bytes.NewReader(data))
+       d := NewXORDecoder(reader)
+       a := assert.New(t)
+       verify(d, a, uint64(76))
+       verify(d, a, uint64(50))
+       verify(d, a, uint64(50))
+       verify(d, a, uint64(999999999))
+       verify(d, a, uint64(100))
+}
+
+func verify(d *XORDecoder, a *assert.Assertions, except uint64) {
+       a.True(d.Next())
+       if d.Err() != nil && !errors.Is(d.Err(), io.EOF) {
+               a.Fail("error: %v", d.Err())
+       }
+       a.Equal(except, d.Value())
+}

Reply via email to