This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e85aeb57 Fix several bugs (#473)
e85aeb57 is described below
commit e85aeb5790a10b8a26db628fc15140a034865009
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jun 21 14:11:33 2024 +0800
Fix several bugs (#473)
---
CHANGES.md | 2 +
banyand/metadata/client.go | 2 +-
banyand/metadata/embeddedserver/server.go | 4 +-
banyand/observability/service.go | 2 +-
banyand/queue/pub/client.go | 10 ++--
banyand/stream/query.go | 9 +++-
bydbctl/internal/cmd/property.go | 4 +-
bydbctl/internal/cmd/property_test.go | 79 +++++++++++++++++++++++++++++++
bydbctl/internal/cmd/root.go | 2 +-
pkg/cmdsetup/root.go | 4 +-
pkg/index/inverted/inverted.go | 4 ++
pkg/index/inverted/sort.go | 3 ++
12 files changed, 109 insertions(+), 16 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 52332c17..13a0e2b6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,8 @@ Release Notes.
- Fix querying old data points when the data is in a newer part. A version
column is introduced to each data point and stored in the timestamp file.
- Fix the bug that duplicated data points from different data nodes are
returned.
- Fix the bug that the data node can't re-register to etcd when the connection
is lost.
+- Fix memory leak in sorting the stream by the inverted index.
+- Fix the wrong array flags parsing in command line. The array flags should be
parsed by "StringSlice" instead of "StringArray".
## 0.6.1
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index d6495738..c60ef7d0 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -78,7 +78,7 @@ func (s *clientService) SchemaRegistry() schema.Registry {
func (s *clientService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The
namespace of the metadata stored in etcd")
- fs.StringArrayVar(&s.endpoints, FlagEtcdEndpointsName,
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
+ fs.StringSliceVar(&s.endpoints, FlagEtcdEndpointsName,
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
fs.StringVar(&s.etcdUsername, flagEtcdUsername, "", "A username of
etcd")
fs.StringVar(&s.etcdPassword, flagEtcdPassword, "", "A password of etcd
user")
fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted
certificate authority")
diff --git a/banyand/metadata/embeddedserver/server.go
b/banyand/metadata/embeddedserver/server.go
index cc0425a3..3fbe9f6c 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -48,8 +48,8 @@ func (s *server) Role() databasev1.Role {
func (s *server) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("metadata")
fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path
of metadata")
- fs.StringArrayVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
- fs.StringArrayVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+ fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
+ fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
return fs
}
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index ba46f2d2..a0b8baf2 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -77,7 +77,7 @@ type metricService struct {
func (p *metricService) FlagSet() *run.FlagSet {
flagSet := run.NewFlagSet("observability")
flagSet.StringVar(&p.listenAddr, "observability-listener-addr",
":2121", "listen addr for observability")
- flagSet.StringArrayVar(&p.modes, "observability-modes",
[]string{"prometheus"}, "modes for observability")
+ flagSet.StringSliceVar(&p.modes, "observability-modes",
[]string{"prometheus"}, "modes for observability")
return flagSet
}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 1ebeb463..7adc5978 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -115,8 +115,8 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
}
p.log.Info().Str("status", p.dump()).Str("node",
name).Msg("node is removed from active queue by the new gRPC address updated
event")
}
- p.registered[name] = node
}
+ p.registered[name] = node
if _, ok := p.active[name]; ok {
return
@@ -186,14 +186,14 @@ func (p *pub) OnDelete(md schema.Metadata) {
elapsed += backoff
p.mu.Lock()
defer p.mu.Unlock()
- if p.removeNodeIfUnhealthy(md,
node, client) {
-
p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after",
elapsed).Msg("remove node from active queue by delete event")
- return true
- }
if _, ok := p.registered[name];
ok {
// The client has been
added back to registered clients map, just return
return true
}
+ if p.removeNodeIfUnhealthy(md,
node, client) {
+
p.log.Info().Str("status", p.dump()).Stringer("node", node).Dur("after",
elapsed).Msg("remove node from active queue by delete event")
+ return true
+ }
return false
}() {
return
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 2edff01c..c614a392 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -422,7 +422,7 @@ func (s *stream) Query(ctx context.Context, sqo
pbv1.StreamQueryOptions) (sqr pb
if err != nil {
return nil, err
}
- elementRefList, sortedRefMap, err := indexSort(s, sqo, tabWrappers,
seriesList, filteredRefMap)
+ elementRefList, sortedRefMap, err := s.indexSort(sqo, tabWrappers,
seriesList, filteredRefMap)
if err != nil {
return nil, err
}
@@ -547,7 +547,7 @@ func indexSearch(ctx context.Context, sqo
pbv1.StreamQueryOptions,
return filteredRefMap, nil
}
-func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers
[]storage.TSTableWrapper[*tsTable],
+func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers
[]storage.TSTableWrapper[*tsTable],
seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64,
) ([]*elementRef, map[common.SeriesID][]int64, error) {
if sqo.Order == nil || sqo.Order.Index == nil {
@@ -559,6 +559,11 @@ func indexSort(s *stream, sqo pbv1.StreamQueryOptions,
tabWrappers []storage.TST
}
desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort ==
modelv1.Sort_SORT_DESC
itemIter := itersort.NewItemIter[*index.ItemRef](iters, desc)
+ defer func() {
+ if err := itemIter.Close(); err != nil {
+ s.l.Err(err).Msg("failed to close itemIter")
+ }
+ }()
var elementRefList []*elementRef
sortedRefMap := make(map[common.SeriesID][]int64)
diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go
index 3f608725..e8aacbe7 100644
--- a/bydbctl/internal/cmd/property.go
+++ b/bydbctl/internal/cmd/property.go
@@ -115,8 +115,8 @@ func newPropertyCmd() *cobra.Command {
},
}
listCmd.Flags().StringVarP(&name, "name", "n", "", "the name of the
resource")
- listCmd.Flags().StringArrayVarP(&ids, "ids", "", nil, "id selector")
- listCmd.Flags().StringArrayVarP(&tags, "tags", "t", nil, "tag selector")
+ listCmd.Flags().StringSliceVarP(&ids, "ids", "", nil, "id selector")
+ listCmd.Flags().StringSliceVarP(&tags, "tags", "t", nil, "tag selector")
var leaseID int64
keepAliveCmd := &cobra.Command{
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index 2ad70ab2..fc57c965 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -295,6 +295,85 @@ tags:
helpers.UnmarshalYAML([]byte(out), resp)
Expect(resp.Property).To(HaveLen(2))
})
+
+ It("list properties in a container by id", func() {
+ // create another property for list operation
+ rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: spring
+tags:
+ - key: content
+ value:
+ str:
+ value: bar
+ - key: state
+ value:
+ int:
+ value: 1
+`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("created: true"))
+ Expect(out).To(ContainSubstring("tagsNum: 2"))
+
+ rootCmd.SetArgs([]string{"property", "list", "-g",
"ui-template", "-n", "service", "--ids", "spring"})
+ out = capturer.CaptureStdout(func() {
+ cmd.ResetFlags()
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(propertyv1.ListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Property).To(HaveLen(1))
+ Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring"))
+ Expect(resp.Property[0].Tags).To(HaveLen(2))
+ })
+
+ It("list properties in a container by ids and tags", func() {
+ // create another property for list operation
+ rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: spring
+tags:
+ - key: content
+ value:
+ str:
+ value: bar
+ - key: state
+ value:
+ int:
+ value: 1
+`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("created: true"))
+ Expect(out).To(ContainSubstring("tagsNum: 2"))
+
+ rootCmd.SetArgs([]string{"property", "list", "-g",
"ui-template", "-n", "service", "--ids", "spring", "--tags", "content"})
+ out = capturer.CaptureStdout(func() {
+ cmd.ResetFlags()
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(propertyv1.ListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Property).To(HaveLen(1))
+ Expect(resp.Property[0].Metadata.GetId()).To(Equal("spring"))
+ Expect(resp.Property[0].Tags).To(HaveLen(1))
+ })
+
It("keepalive not found", func() {
rootCmd.SetArgs([]string{
"property", "keepalive", "-i", "111",
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index f9645fca..5f562281 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -139,7 +139,7 @@ func bindNameAndIDAndTagsFlag(commands ...*cobra.Command) {
bindNameFlag(commands...)
for _, c := range commands {
c.Flags().StringVarP(&id, "id", "i", "", "the property's id")
- c.Flags().StringArrayVarP(&tags, "tags", "t", nil, "the
property's tags")
+ c.Flags().StringSliceVarP(&tags, "tags", "t", nil, "the
property's tags")
_ = c.MarkFlagRequired("name")
_ = c.MarkFlagRequired("id")
}
diff --git a/pkg/cmdsetup/root.go b/pkg/cmdsetup/root.go
index 1e49fbc5..c767ce20 100644
--- a/pkg/cmdsetup/root.go
+++ b/pkg/cmdsetup/root.go
@@ -62,8 +62,8 @@ BanyanDB, as an observability database, aims to ingest,
analyze and store Metric
cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "",
"the node host of the server only used when node-host-provider is \"flag\"")
cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod",
"the logging")
cmd.PersistentFlags().StringVar(&logging.Level, "logging-level",
"info", "the root level of logging")
- cmd.PersistentFlags().StringArrayVar(&logging.Modules,
"logging-modules", nil, "the specific module")
- cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels",
nil, "the level logging of logging")
+ cmd.PersistentFlags().StringSliceVar(&logging.Modules,
"logging-modules", nil, "the specific module")
+ cmd.PersistentFlags().StringSliceVar(&logging.Levels, "logging-levels",
nil, "the level logging of logging")
cmd.AddCommand(newStandaloneCmd(runners...))
cmd.AddCommand(newDataCmd(runners...))
cmd.AddCommand(newLiaisonCmd(runners...))
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index b1d93d73..95ec0c9c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -166,6 +166,9 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts, ord
bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
return index.DummyFieldIterator, nil
}
+ if !s.closer.AddRunning() {
+ return nil, nil
+ }
reader, err := s.writer.Reader()
if err != nil {
@@ -211,6 +214,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts, ord
sortedKey: sortedKey,
size: preLoadSize,
sid: fieldKey.SeriesID,
+ closer: s.closer,
}
return result, nil
}
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 0447e255..e4d6854e 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order
modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err
error) {
@@ -72,6 +73,7 @@ type sortIterator struct {
err error
reader *bluge.Reader
current *blugeMatchIterator
+ closer *run.Closer
sortedKey string
size int
skipped int
@@ -132,6 +134,7 @@ func (si *sortIterator) Val() *index.ItemRef {
}
func (si *sortIterator) Close() error {
+ defer si.closer.Done()
if errors.Is(si.err, io.EOF) {
si.err = nil
if si.current != nil {