This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch encoding in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 08a9b4e815e655de3cef9172bc29f75d2b5ea6a0 Author: Gao Hongtao <[email protected]> AuthorDate: Sat Nov 20 08:31:18 2021 +0800 Introduce gorilla encoding for integer values Signed-off-by: Gao Hongtao <[email protected]> --- banyand/kv/badger.go | 31 +++++ banyand/kv/kv.go | 16 ++- banyand/stream/stream.go | 8 +- banyand/tsdb/block.go | 2 +- banyand/tsdb/tsdb.go | 6 +- banyand/tsdb/tsdb_test.go | 8 +- go.mod | 2 +- go.sum | 4 +- pkg/bit/reader.go | 104 +++++++++++++++ pkg/{bytes/bytes.go => bit/reader_test.go} | 41 ++++-- pkg/bit/writer.go | 92 ++++++++++++++ pkg/{bytes/bytes.go => bit/writer_test.go} | 50 ++++++-- pkg/buffer/writer.go | 72 +++++++++++ pkg/encoding/encoding.go | 14 ++- pkg/encoding/int.go | 195 +++++++++++++++++++++++++++++ pkg/encoding/{stream_chunk.go => plain.go} | 175 +++++++++++++++----------- pkg/encoding/xor.go | 177 ++++++++++++++++++++++++++ pkg/encoding/xor_test.go | 60 +++++++++ 18 files changed, 932 insertions(+), 125 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 24820f0..728a298 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -259,6 +259,37 @@ func (l *badgerLog) Debugf(f string, v ...interface{}) { l.delegated.Debug().Msgf(f, v...) } +var _ bydb.TSetEncoderPool = (*encoderPoolDelegate)(nil) + +type encoderPoolDelegate struct { + encoding.SeriesEncoderPool +} + +func (e *encoderPoolDelegate) Get(metadata []byte) bydb.TSetEncoder { + return e.SeriesEncoderPool.Get(metadata) +} + +func (e *encoderPoolDelegate) Put(encoder bydb.TSetEncoder) { + e.SeriesEncoderPool.Put(encoder) +} + +var _ bydb.TSetDecoderPool = (*decoderPoolDelegate)(nil) + +type decoderPoolDelegate struct { + encoding.SeriesDecoderPool +} + +func (e *decoderPoolDelegate) Get(metadata []byte) bydb.TSetDecoder { + return &decoderDelegate{ + e.SeriesDecoderPool.Get(metadata), + } +} + +func (e *decoderPoolDelegate) Put(decoder bydb.TSetDecoder) { + dd := decoder.(*decoderDelegate) + e.SeriesDecoderPool.Put(dd.SeriesDecoder) +} + var _ bydb.TSetDecoder = (*decoderDelegate)(nil) type decoderDelegate struct { diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index ea92d1c..2def4dc 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -23,7 +23,6 @@ import ( "math" "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/bydb" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/pkg/encoding" @@ -103,16 +102,15 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions { } } -func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions { +func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions { return func(store TimeSeriesStore) { if btss, ok := store.(*badgerTSS); ok { - btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder { - return encoderFactory() - }, func() bydb.TSetDecoder { - return &decoderDelegate{ - SeriesDecoder: decoderFactory(), - } - }) + btss.dbOpts = btss.dbOpts.WithExternalCompactor( + &encoderPoolDelegate{ + encoderPool, + }, &decoderPoolDelegate{ + decoderPool, + }) } } } diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 760687d..444a483 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -107,12 +107,8 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error) ShardNum: sm.schema.GetOpts().GetShardNum(), IndexRules: spec.indexRules, EncodingMethod: tsdb.EncodingMethod{ - EncoderFactory: func() encoding.SeriesEncoder { - return encoding.NewStreamChunkEncoder(chunkSize) - }, - DecoderFactory: func() encoding.SeriesDecoder { - return encoding.NewStreamChunkDecoder(chunkSize) - }, + EncoderPool: encoding.NewPlainEncoderPool(chunkSize), + DecoderPool: encoding.NewPlainDecoderPool(chunkSize), }, }) if err != nil { diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 2f38308..956bf13 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -78,7 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { if b.store, err = kv.OpenTimeSeriesStore( 0, b.path+"/store", - kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory), + kv.TSSWithEncoding(encodingMethod.EncoderPool, encodingMethod.DecoderPool), kv.TSSWithLogger(b.l), ); err != nil { return nil, err diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 8df233b..e9ed19a 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -82,8 +82,8 @@ type DatabaseOpts struct { } type EncodingMethod struct { - EncoderFactory encoding.SeriesEncoderFactory - DecoderFactory encoding.SeriesDecoderFactory + EncoderPool encoding.SeriesEncoderPool + DecoderPool encoding.SeriesDecoderPool } type database struct { @@ -124,7 +124,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { db.logger = pl.Named("tsdb") } } - if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil { + if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil { return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database") } if _, err := mkdir(opts.Location); err != nil { diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index 89c56d6..c570fdf 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -58,12 +58,8 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database Location: tempDir, ShardNum: 1, EncodingMethod: EncodingMethod{ - EncoderFactory: func() encoding.SeriesEncoder { - return nil - }, - DecoderFactory: func() encoding.SeriesDecoder { - return nil - }, + EncoderPool: encoding.NewPlainEncoderPool(0), + DecoderPool: encoding.NewPlainDecoderPool(0), }, }) t.NoError(err) diff --git a/go.mod b/go.mod index 24e4ce0..b2815cb 100644 --- a/go.mod +++ b/go.mod @@ -101,4 +101,4 @@ require ( sigs.k8s.io/yaml v1.2.0 // indirect ) -replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 +replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a diff --git a/go.sum b/go.sum index 957109b..afb3bff 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE= github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM= github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc= -github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA= -github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= +github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E= +github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/pkg/bit/reader.go b/pkg/bit/reader.go new file mode 100644 index 0000000..22cb929 --- /dev/null +++ b/pkg/bit/reader.go @@ -0,0 +1,104 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bit + +import ( + "io" +) + +// Reader reads bits from buffer +type Reader struct { + in io.ByteReader + cache byte + len byte +} + +// NewReader crate bit reader +func NewReader(in io.ByteReader) *Reader { + return &Reader{ + in: in, + } +} + +// ReadBool reads a bit, 1 returns true, 0 returns false +func (r *Reader) ReadBool() (bool, error) { + if r.len == 0 { + b, err := r.in.ReadByte() + if err != nil { + return false, err + } + r.cache = b + r.len = 8 + } + r.len-- + b := r.cache & 0x80 + r.cache <<= 1 + return b != 0, nil +} + +// ReadBits read number of bits +func (r *Reader) ReadBits(numBits int) (uint64, error) { + var result uint64 + + for ; numBits >= 8; numBits -= 8 { + b, err := r.ReadByte() + if err != nil { + return 0, err + } + + result = (result << 8) | uint64(b) + } + + for ; numBits > 0; numBits-- { + byt, err := r.ReadBool() + if err != nil { + return 0, err + } + result <<= 1 + if byt { + result |= 1 + } + } + + return result, nil +} + +// ReadByte reads a byte +func (r *Reader) ReadByte() (byte, error) { + if r.len == 0 { + b, err := r.in.ReadByte() + if err != nil { + return b, err + } + r.cache = b + return b, err + } + b, err := r.in.ReadByte() + if err != nil { + return b, err + } + result := r.cache | b>>r.len + r.cache = b << (8 - r.len) + return result, nil +} + +// Reset resets the reader to read from a new slice +func (r *Reader) Reset() { + r.len = 0 + r.cache = 0 +} diff --git a/pkg/bytes/bytes.go b/pkg/bit/reader_test.go similarity index 56% copy from pkg/bytes/bytes.go copy to pkg/bit/reader_test.go index b4d051c..6f30bbe 100644 --- a/pkg/bytes/bytes.go +++ b/pkg/bit/reader_test.go @@ -15,17 +15,38 @@ // specific language governing permissions and limitations // under the License. -package bytes +package bit -func Join(s ...[]byte) []byte { - n := 0 - for _, v := range s { - n += len(v) - } +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReader(t *testing.T) { + data := []byte{3, 255, 0xcc, 0x1a, 0xbc, 0xde, 0x80} + + r := NewReader(bytes.NewBuffer(data)) + a := assert.New(t) + + eq(a, byte(3))(r.ReadByte()) + eq(a, uint64(255))(r.ReadBits(8)) + + eq(a, uint64(0xc))(r.ReadBits(4)) + + eq(a, uint64(0xc1))(r.ReadBits(8)) + + eq(a, uint64(0xabcde))(r.ReadBits(20)) + + eq(a, true)(r.ReadBool()) + eq(a, false)(r.ReadBool()) + +} - b, i := make([]byte, n), 0 - for _, v := range s { - i += copy(b[i:], v) +func eq(a *assert.Assertions, expected interface{}) func(interface{}, error) { + return func(actual interface{}, err error) { + a.NoError(err) + a.Equal(expected, actual) } - return b } diff --git a/pkg/bit/writer.go b/pkg/bit/writer.go new file mode 100644 index 0000000..6302f9e --- /dev/null +++ b/pkg/bit/writer.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bit + +import ( + "bytes" +) + +// Writer writes bits to an io.BufferWriter +type Writer struct { + out *bytes.Buffer + cache byte + available byte +} + +// NewWriter create bit writer +func NewWriter(buffer *bytes.Buffer) *Writer { + var bw Writer + bw.Reset(buffer) + return &bw +} + +// Reset writes to a new writer +func (w *Writer) Reset(buffer *bytes.Buffer) { + w.out = buffer + w.cache = 0 + w.available = 8 +} + +// WriteBool writes a boolean value +// true: 1 +// false: 0 +func (w *Writer) WriteBool(b bool) { + if b { + w.cache |= 1 << (w.available - 1) + } + + w.available-- + + if w.available == 0 { + // WriteByte never returns error + _ = w.out.WriteByte(w.cache) + w.cache = 0 + w.available = 8 + } +} + +// WriteBits writes number of bits +func (w *Writer) WriteBits(u uint64, numBits int) { + u <<= 64 - uint(numBits) + + for ; numBits >= 8; numBits -= 8 { + byt := byte(u >> 56) + w.WriteByte(byt) + u <<= 8 + } + + remainder := byte(u >> 56) + for ; numBits > 0; numBits-- { + w.WriteBool((remainder & 0x80) != 0) + remainder <<= 1 + } +} + +// WriteByte write a byte +func (w *Writer) WriteByte(b byte) { + _ = w.out.WriteByte(w.cache | (b >> (8 - w.available))) + w.cache = b << w.available +} + +// Flush flushes the currently in-process byte +func (w *Writer) Flush() { + if w.available != 8 { + _ = w.out.WriteByte(w.cache) + } + w.Reset(w.out) +} diff --git a/pkg/bytes/bytes.go b/pkg/bit/writer_test.go similarity index 57% rename from pkg/bytes/bytes.go rename to pkg/bit/writer_test.go index b4d051c..7624ff8 100644 --- a/pkg/bytes/bytes.go +++ b/pkg/bit/writer_test.go @@ -15,17 +15,41 @@ // specific language governing permissions and limitations // under the License. -package bytes - -func Join(s ...[]byte) []byte { - n := 0 - for _, v := range s { - n += len(v) - } - - b, i := make([]byte, n), 0 - for _, v := range s { - i += copy(b[i:], v) - } - return b +package bit + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWriter(t *testing.T) { + out := &bytes.Buffer{} + w := NewWriter(out) + + a := assert.New(t) + + w.WriteByte(0xc1) + w.WriteBool(false) + w.WriteBits(0x3f, 6) + w.WriteBool(true) + w.WriteByte(0xac) + w.WriteBits(0x01, 1) + w.WriteBits(0x1248f, 20) + w.Flush() + + w.WriteByte(0x01) + w.WriteByte(0x02) + + w.WriteBits(0x0f, 4) + + w.WriteByte(0x80) + w.WriteByte(0x8f) + w.Flush() + + w.WriteBits(0x01, 1) + w.WriteByte(0xff) + w.Flush() + a.Equal([]byte{0xc1, 0x7f, 0xac, 0x89, 0x24, 0x78, 0x01, 0x02, 0xf8, 0x08, 0xf0, 0xff, 0x80}, out.Bytes()) } diff --git a/pkg/buffer/writer.go b/pkg/buffer/writer.go new file mode 100644 index 0000000..e109d25 --- /dev/null +++ b/pkg/buffer/writer.go @@ -0,0 +1,72 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package buffer + +import ( + "bytes" + "encoding/binary" +) + +// Writer writes data into a buffer +type Writer struct { + buf *bytes.Buffer + + scratch [binary.MaxVarintLen64]byte +} + +func NewBufferWriter(buf *bytes.Buffer) *Writer { + return &Writer{ + buf: buf, + } +} + +func (w *Writer) Write(p []byte) { + _, _ = w.buf.Write(p) +} + +func (w *Writer) WriteTo(other *Writer) (n int64) { + n, _ = w.buf.WriteTo(other.buf) + return n +} + +func (w *Writer) PutUint16(v uint16) { + binary.LittleEndian.PutUint16(w.scratch[:], v) + _, _ = w.buf.Write(w.scratch[:2]) +} + +func (w *Writer) PutUint32(v uint32) { + binary.LittleEndian.PutUint32(w.scratch[:], v) + _, _ = w.buf.Write(w.scratch[:4]) +} + +func (w *Writer) PutUint64(v uint64) { + binary.LittleEndian.PutUint64(w.scratch[:], v) + _, _ = w.buf.Write(w.scratch[:8]) +} + +func (w *Writer) Reset() { + w.buf.Reset() +} + +func (w *Writer) Len() int { + return w.buf.Len() +} + +func (w *Writer) Bytes() []byte { + return w.buf.Bytes() +} diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go index 5e04e72..05a1f14 100644 --- a/pkg/encoding/encoding.go +++ b/pkg/encoding/encoding.go @@ -21,7 +21,10 @@ import "github.com/pkg/errors" var ErrEncodeEmpty = errors.New("encode an empty value") -type SeriesEncoderFactory func() SeriesEncoder +type SeriesEncoderPool interface { + Get(metadata []byte) SeriesEncoder + Put(encoder SeriesEncoder) +} // SeriesEncoder encodes time series data point type SeriesEncoder interface { @@ -30,19 +33,22 @@ type SeriesEncoder interface { // IsFull returns whether the encoded data reached its capacity IsFull() bool // Reset the underlying buffer - Reset() + Reset(key []byte) // Encode the time series data point to a binary Encode() ([]byte, error) // StartTime indicates the first entry's time StartTime() uint64 } -type SeriesDecoderFactory func() SeriesDecoder +type SeriesDecoderPool interface { + Get(metadata []byte) SeriesDecoder + Put(encoder SeriesDecoder) +} // SeriesDecoder decodes encoded time series data type SeriesDecoder interface { // Decode the time series data - Decode(data []byte) error + Decode(key, data []byte) error // Len denotes the size of iterator Len() int // IsFull returns whether the encoded data reached its capacity diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go new file mode 100644 index 0000000..9571ec4 --- /dev/null +++ b/pkg/encoding/int.go @@ -0,0 +1,195 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package encoding + +import ( + "bytes" + "encoding/binary" + "time" + + "github.com/apache/skywalking-banyandb/pkg/bit" + "github.com/apache/skywalking-banyandb/pkg/buffer" + "github.com/apache/skywalking-banyandb/pkg/convert" +) + +var ( + _ SeriesEncoder = (*intEncoder)(nil) +) + +type ParseInterval = func(key []byte) time.Duration + +type intEncoder struct { + buff *bytes.Buffer + bw *bit.Writer + values *XOREncoder + fn ParseInterval + interval time.Duration + startTime uint64 + num int + size int +} + +func NewIntEncoder(size int, fn ParseInterval) SeriesEncoder { + buff := &bytes.Buffer{} + bw := bit.NewWriter(buff) + return &intEncoder{ + buff: buff, + bw: bw, + values: NewXOREncoder(bw), + fn: fn, + size: size, + } +} + +func (ie *intEncoder) Append(ts uint64, value []byte) { + if len(value) > 8 { + return + } + if ie.startTime == 0 { + ie.startTime = ts + } + gap := int(ts) - int(ie.startTime) + if gap < 0 { + return + } + zeroNum := gap/int(ie.interval) - 1 + for i := 0; i < zeroNum; i++ { + ie.bw.WriteBool(false) + ie.num++ + } + ie.bw.WriteBool(true) + ie.values.Write(binary.LittleEndian.Uint64(value)) + ie.num++ +} + +func (ie *intEncoder) IsFull() bool { + return ie.num >= ie.size +} + +func (ie *intEncoder) Reset(key []byte) { + ie.bw.Reset(nil) + ie.interval = ie.fn(key) +} + +func (ie *intEncoder) Encode() ([]byte, error) { + ie.bw.Flush() + buffWriter := buffer.NewBufferWriter(ie.buff) + buffWriter.PutUint64(ie.startTime) + buffWriter.PutUint16(uint16(ie.size)) + return ie.buff.Bytes(), nil +} + +func (ie *intEncoder) StartTime() uint64 { + return ie.startTime +} + +var _ SeriesDecoder = (*intDecoder)(nil) + +type intDecoder struct { + fn ParseInterval + size int + interval time.Duration + startTime uint64 + num int + area []byte +} + +func NewIntDecoder(size int, fn ParseInterval) SeriesDecoder { + return &intDecoder{ + fn: fn, + size: size, + } +} + +func (i intDecoder) Decode(key, data []byte) error { + i.interval = i.fn(key) + i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2]) + i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:])) + i.area = data[:len(data)-10] + return nil +} + +func (i intDecoder) Len() int { + return i.num +} + +func (i intDecoder) IsFull() bool { + return i.num >= i.size +} + +func (i intDecoder) Get(ts uint64) ([]byte, error) { + for iter := i.Iterator(); iter.Next(); { + if iter.Time() == ts { + return iter.Val(), nil + } + } + return nil, nil +} + +func (i intDecoder) Iterator() SeriesIterator { + br := bit.NewReader(bytes.NewReader(i.area)) + return &intIterator{ + startTime: i.startTime, + interval: int(i.interval), + br: br, + values: NewXORDecoder(br), + } +} + +var _ SeriesIterator = (*intIterator)(nil) + +type intIterator struct { + startTime uint64 + interval int + br *bit.Reader + values *XORDecoder + + currVal uint64 + currTime uint64 + index int + err error +} + +func (i *intIterator) Next() bool { + var b bool + b, i.err = i.br.ReadBool() + if i.err != nil { + return false + } + if b { + if i.values.Next() { + i.currVal = i.values.Value() + } + } + i.currVal = 0 + i.currTime = i.startTime + uint64(i.interval*i.index) + i.index++ + return true +} + +func (i *intIterator) Val() []byte { + return convert.Uint64ToBytes(i.currVal) +} + +func (i *intIterator) Time() uint64 { + return i.currTime +} + +func (i *intIterator) Error() error { + return i.err +} diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/plain.go similarity index 54% rename from pkg/encoding/stream_chunk.go rename to pkg/encoding/plain.go index 7ff5094..0b3fb8c 100644 --- a/pkg/encoding/stream_chunk.go +++ b/pkg/encoding/plain.go @@ -22,106 +22,147 @@ import ( "encoding/binary" "fmt" "sort" + "sync" "github.com/klauspost/compress/zstd" "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/pkg/buffer" ) var ( - decoder, _ = zstd.NewReader(nil) - encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) - _ SeriesEncoder = (*streamChunkEncoder)(nil) - _ SeriesDecoder = (*StreamChunkDecoder)(nil) + encoderPool = sync.Pool{ + New: newPlainEncoder, + } + decoderPool = sync.Pool{ + New: func() interface{} { + return &plainDecoder{} + }, + } +) + +type plainEncoderPool struct { + pool *sync.Pool + size int +} + +func NewPlainEncoderPool(size int) SeriesEncoderPool { + return &plainEncoderPool{ + pool: &encoderPool, + size: size, + } +} + +func (b *plainEncoderPool) Get(metadata []byte) SeriesEncoder { + encoder := b.pool.Get().(*plainEncoder) + encoder.Reset(metadata) + encoder.valueSize = b.size + return encoder +} + +func (b *plainEncoderPool) Put(encoder SeriesEncoder) { + b.pool.Put(encoder) +} + +type plainDecoderPool struct { + pool *sync.Pool + size int +} + +func NewPlainDecoderPool(size int) SeriesDecoderPool { + return &plainDecoderPool{ + pool: &decoderPool, + size: size, + } +} + +func (b *plainDecoderPool) Get(_ []byte) SeriesDecoder { + decoder := b.pool.Get().(*plainDecoder) + decoder.valueSize = b.size + return decoder +} + +func (b *plainDecoderPool) Put(decoder SeriesDecoder) { + b.pool.Put(decoder) +} + +var ( + zstdDecoder, _ = zstd.NewReader(nil) + zstdEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + _ SeriesEncoder = (*plainEncoder)(nil) + _ SeriesDecoder = (*plainDecoder)(nil) ) -//streamChunkEncoder backport to reduced value -type streamChunkEncoder struct { - tsBuff bytes.Buffer - valBuff bytes.Buffer - scratch [binary.MaxVarintLen64]byte +//plainEncoder backport to reduced value +type plainEncoder struct { + tsBuff *buffer.Writer + valBuff *buffer.Writer len uint32 num uint32 startTime uint64 valueSize int } -func NewStreamChunkEncoder(size int) SeriesEncoder { - return &streamChunkEncoder{ - valueSize: size, +func newPlainEncoder() interface{} { + return &plainEncoder{ + tsBuff: buffer.NewBufferWriter(&bytes.Buffer{}), + valBuff: buffer.NewBufferWriter(&bytes.Buffer{}), } } -func (t *streamChunkEncoder) Append(ts uint64, value []byte) { +func (t *plainEncoder) Append(ts uint64, value []byte) { if t.startTime == 0 { t.startTime = ts } else if t.startTime > ts { t.startTime = ts } vLen := len(value) - offset := uint32(len(t.valBuff.Bytes())) - t.valBuff.Write(t.putUint32(uint32(vLen))) + offset := uint32(t.valBuff.Len()) + t.valBuff.PutUint32(uint32(vLen)) t.valBuff.Write(value) - t.tsBuff.Write(t.putUint64(ts)) - t.tsBuff.Write(t.putUint32(offset)) - t.num = t.num + 1 + t.tsBuff.PutUint64(ts) + t.tsBuff.PutUint32(offset) + t.num++ } -func (t *streamChunkEncoder) IsFull() bool { +func (t *plainEncoder) IsFull() bool { return t.valBuff.Len() >= t.valueSize } -func (t *streamChunkEncoder) Reset() { +func (t *plainEncoder) Reset(_ []byte) { t.tsBuff.Reset() t.valBuff.Reset() t.num = 0 t.startTime = 0 } -func (t *streamChunkEncoder) Encode() ([]byte, error) { +func (t *plainEncoder) Encode() ([]byte, error) { if t.tsBuff.Len() < 1 { return nil, ErrEncodeEmpty } val := t.valBuff.Bytes() t.len = uint32(len(val)) - _, err := t.tsBuff.WriteTo(&t.valBuff) - if err != nil { - return nil, err - } - t.valBuff.Write(t.putUint32(t.num)) - t.valBuff.Write(t.putUint32(t.len)) + t.tsBuff.WriteTo(t.valBuff) + t.valBuff.PutUint32(t.num) + t.valBuff.PutUint32(t.len) data := t.valBuff.Bytes() l := len(data) dst := make([]byte, 0, compressBound(l)) - dst = encoder.EncodeAll(data, dst) - result := make([]byte, len(dst)+2) - copy(result, dst) - copy(result[len(dst):], t.putUint16(uint16(l))) - return result, nil + dst = zstdEncoder.EncodeAll(data, dst) + result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2))) + result.Write(dst) + result.PutUint16(uint16(l)) + return result.Bytes(), nil } func compressBound(srcSize int) int { return srcSize + (srcSize >> 8) } -func (t *streamChunkEncoder) StartTime() uint64 { +func (t *plainEncoder) StartTime() uint64 { return t.startTime } -func (t *streamChunkEncoder) putUint16(v uint16) []byte { - binary.LittleEndian.PutUint16(t.scratch[:], v) - return t.scratch[:2] -} - -func (t *streamChunkEncoder) putUint32(v uint32) []byte { - binary.LittleEndian.PutUint32(t.scratch[:], v) - return t.scratch[:4] -} - -func (t *streamChunkEncoder) putUint64(v uint64) []byte { - binary.LittleEndian.PutUint64(t.scratch[:], v) - return t.scratch[:8] -} - const ( // TsLen equals ts(uint64) + data_offset(uint32) TsLen = 8 + 4 @@ -129,8 +170,8 @@ const ( var ErrInvalidValue = errors.New("invalid encoded value") -//StreamChunkDecoder decodes encoded time index -type StreamChunkDecoder struct { +//plainDecoder decodes encoded time index +type plainDecoder struct { ts []byte val []byte len uint32 @@ -138,20 +179,14 @@ type StreamChunkDecoder struct { valueSize int } -func NewStreamChunkDecoder(size int) SeriesDecoder { - return &StreamChunkDecoder{ - valueSize: size, - } -} - -func (t *StreamChunkDecoder) Len() int { +func (t *plainDecoder) Len() int { return int(t.num) } -func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) { +func (t *plainDecoder) Decode(_, rawData []byte) (err error) { var data []byte size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:]) - if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil { + if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil { return err } l := uint32(len(data)) @@ -170,11 +205,11 @@ func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) { return nil } -func (t *StreamChunkDecoder) IsFull() bool { +func (t *plainDecoder) IsFull() bool { return int(t.len) >= t.valueSize } -func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) { +func (t *plainDecoder) Get(ts uint64) ([]byte, error) { i := sort.Search(int(t.num), func(i int) bool { slot := getTSSlot(t.ts, i) return parseTS(slot) <= ts @@ -189,7 +224,7 @@ func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) { return getVal(t.val, parseOffset(slot)) } -func (t *StreamChunkDecoder) Iterator() SeriesIterator { +func (t *plainDecoder) Iterator() SeriesIterator { return newBlockItemIterator(t) } @@ -213,17 +248,17 @@ func parseOffset(tsSlot []byte) uint32 { return binary.LittleEndian.Uint32(tsSlot[8:]) } -var _ SeriesIterator = (*chunkIterator)(nil) +var _ SeriesIterator = (*plainIterator)(nil) -type chunkIterator struct { +type plainIterator struct { index []byte data []byte idx int num int } -func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator { - return &chunkIterator{ +func newBlockItemIterator(decoder *plainDecoder) SeriesIterator { + return &plainIterator{ idx: -1, index: decoder.ts, data: decoder.val, @@ -231,20 +266,20 @@ func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator { } } -func (b *chunkIterator) Next() bool { +func (b *plainIterator) Next() bool { b.idx++ return b.idx >= 0 && b.idx < b.num } -func (b *chunkIterator) Val() []byte { +func (b *plainIterator) Val() []byte { v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx))) return v } -func (b *chunkIterator) Time() uint64 { +func (b *plainIterator) Time() uint64 { return parseTS(getTSSlot(b.index, b.idx)) } -func (b *chunkIterator) Error() error { +func (b *plainIterator) Error() error { return nil } diff --git a/pkg/encoding/xor.go b/pkg/encoding/xor.go new file mode 100644 index 0000000..43203ca --- /dev/null +++ b/pkg/encoding/xor.go @@ -0,0 +1,177 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package encoding + +import ( + "math/bits" + + "github.com/apache/skywalking-banyandb/pkg/bit" +) + +const ( + ctrlBitsNoContainMeaningful = 0x2 + ctrlBitsContainMeaningful = 0x3 +) + +// XOREncoder intends to compress uint64 data +// https://www.vldb.org/pvldb/vol8/p1816-teller.pdf +type XOREncoder struct { + bw *bit.Writer + preVal uint64 + leading int + trailing int + + first bool +} + +// NewXOREncoder creates xor zstdEncoder for compressing uint64 data +func NewXOREncoder(bw *bit.Writer) *XOREncoder { + return &XOREncoder{ + bw: bw, + first: true, + } +} + +func (e *XOREncoder) Write(val uint64) { + if e.first { + e.first = false + e.preVal = val + e.bw.WriteBits(val, 64) + return + } + + delta := val ^ e.preVal + e.preVal = val + if delta == 0 { + e.bw.WriteBool(false) + return + } + + leading := bits.LeadingZeros64(delta) + trailing := bits.TrailingZeros64(delta) + if leading >= e.leading && trailing >= e.trailing { + // write control '10' to reuse previous block meaningful bits + e.bw.WriteBits(ctrlBitsNoContainMeaningful, 2) + e.bw.WriteBits(delta>>uint(e.trailing), 64-e.leading-e.trailing) + } else { + // write control '11' to create a new block meaningful bits + e.bw.WriteBits(ctrlBitsContainMeaningful, 2) + meaningfulLen := 64 - leading - trailing + e.bw.WriteBits(uint64(leading), 6) + // meaningfulLen is at least 1, so we can subtract 1 from it and encode it in 6 bits + e.bw.WriteBits(uint64(meaningfulLen-1), 6) + e.bw.WriteBits(delta>>uint(trailing), meaningfulLen) + + e.leading = leading + e.trailing = trailing + } +} + +// XORDecoder decodes buffer to uint64 values using xor compress +type XORDecoder struct { + val uint64 + + br *bit.Reader + + leading uint64 + trailing uint64 + + first bool + err error +} + +// NewXORDecoder create zstdDecoder decompress buffer using xor +func NewXORDecoder(br *bit.Reader) *XORDecoder { + s := &XORDecoder{ + br: br, + first: true, + } + return s +} + +// Reset resets the underlying buffer to decode +func (d *XORDecoder) Reset() { + d.first = true + d.leading = 0 + d.trailing = 0 + d.val = 0 +} + +// Next return if zstdDecoder has value in buffer using xor, do uncompress logic in next method, +// data format reference zstdEncoder format +func (d *XORDecoder) Next() bool { + if d.first { + // read first value + d.first = false + d.val, d.err = d.br.ReadBits(64) + return d.err == nil + } + + var b bool + // read delta control bit + b, d.err = d.br.ReadBool() + if d.err != nil { + return false + } + if !b { + return true + } + ctrlBits := ctrlBitsNoContainMeaningful + // read control bit + b, d.err = d.br.ReadBool() + if d.err != nil { + return false + } + if b { + ctrlBits |= 1 + } + var blockSize uint64 + if ctrlBits == ctrlBitsNoContainMeaningful { + blockSize = 64 - d.leading - d.trailing + } else { + // read leading and trailing, because block is diff with previous + d.leading, d.err = d.br.ReadBits(6) + if d.err != nil { + return false + } + blockSize, d.err = d.br.ReadBits(6) + if d.err != nil { + return false + } + blockSize++ + d.trailing = 64 - d.leading - blockSize + } + delta, err := d.br.ReadBits(int(blockSize)) + if err != nil { + d.err = err + return false + } + val := delta << d.trailing + d.val ^= val + return true +} + +// Value returns uint64 from buffer +func (d *XORDecoder) Value() uint64 { + return d.val +} + +// Err returns error raised in Next() +func (d *XORDecoder) Err() error { + return d.err +} diff --git a/pkg/encoding/xor_test.go b/pkg/encoding/xor_test.go new file mode 100644 index 0000000..282ab57 --- /dev/null +++ b/pkg/encoding/xor_test.go @@ -0,0 +1,60 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package encoding + +import ( + "bytes" + "io" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-banyandb/pkg/bit" +) + +func TestXOR(t *testing.T) { + var buf bytes.Buffer + bitWriter := bit.NewWriter(&buf) + e := NewXOREncoder(bitWriter) + e.Write(uint64(76)) + e.Write(uint64(50)) + e.Write(uint64(50)) + e.Write(uint64(999999999)) + e.Write(uint64(100)) + + bitWriter.Flush() + data := buf.Bytes() + + reader := bit.NewReader(bytes.NewReader(data)) + d := NewXORDecoder(reader) + a := assert.New(t) + verify(d, a, uint64(76)) + verify(d, a, uint64(50)) + verify(d, a, uint64(50)) + verify(d, a, uint64(999999999)) + verify(d, a, uint64(100)) +} + +func verify(d *XORDecoder, a *assert.Assertions, except uint64) { + a.True(d.Next()) + if d.Err() != nil && !errors.Is(d.Err(), io.EOF) { + a.Fail("error: %v", d.Err()) + } + a.Equal(except, d.Value()) +}
