This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new df4abcf Other data models operation (#190)
df4abcf is described below
commit df4abcfaaa313252ebe4b2262cafc54320389352
Author: sacloud <[email protected]>
AuthorDate: Sat Oct 15 21:43:20 2022 +0800
Other data models operation (#190)
* add indexRule and property commands
* Add check grpc server
Signed-off-by: Gao Hongtao <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/liaison/grpc/registry_test.go | 3 +
banyand/measure/measure_topn.go | 2 +-
bydbctl/internal/cmd/group.go | 5 +-
bydbctl/internal/cmd/{measure.go => index_rule.go} | 73 +++----
.../cmd/{measure.go => index_rule_binding.go} | 73 +++----
bydbctl/internal/cmd/index_rule_binding_test.go | 165 +++++++++++++++
bydbctl/internal/cmd/index_rule_test.go | 154 ++++++++++++++
bydbctl/internal/cmd/measure.go | 8 +-
bydbctl/internal/cmd/measure_test.go | 61 +++++-
bydbctl/internal/cmd/property.go | 116 +++++++++++
bydbctl/internal/cmd/property_test.go | 230 +++++++++++++++++++++
bydbctl/internal/cmd/rest.go | 102 ++++++++-
bydbctl/internal/cmd/root.go | 12 +-
bydbctl/internal/cmd/stream.go | 8 +-
bydbctl/internal/cmd/stream_test.go | 5 +-
test/cases/measure/data/data.go | 2 +-
test/integration/cold_query/query_suite_test.go | 1 +
test/integration/query/query_suite_test.go | 1 +
18 files changed, 906 insertions(+), 115 deletions(-)
diff --git a/banyand/liaison/grpc/registry_test.go
b/banyand/liaison/grpc/registry_test.go
index e2fc8c6..1b45410 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -19,6 +19,7 @@ package grpc_test
import (
"context"
+ "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -34,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
)
@@ -185,6 +187,7 @@ func setupForRegistry() func() {
preloadStreamSvc,
tcp,
)
+ Eventually(helpers.HealthCheck("localhost:17912", 10*time.Second,
10*time.Second), 20*time.Second).Should(Succeed())
return func() {
deferFunc()
metaDeferFunc()
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index d8cb66d..8a964b1 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -137,7 +137,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord) err
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
timeBucket := eventTime.Format(timeBucketFormat)
var err error
- t.l.Warn().
+ t.l.Debug().
Str("TopN", t.topNSchema.GetMetadata().GetName()).
Int("rankNums", len(tuples)).
Msg("Write a tuple")
diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go
index ba4b0e4..60d1f1d 100644
--- a/bydbctl/internal/cmd/group.go
+++ b/bydbctl/internal/cmd/group.go
@@ -18,7 +18,6 @@
package cmd
import (
- "encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
@@ -52,7 +51,7 @@ func newGroupCmd() *cobra.Command {
cr :=
&database_v1.GroupRegistryServiceCreateRequest{
Group: g,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
@@ -81,7 +80,7 @@ func newGroupCmd() *cobra.Command {
cr :=
&database_v1.GroupRegistryServiceUpdateRequest{
Group: g,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
diff --git a/bydbctl/internal/cmd/measure.go
b/bydbctl/internal/cmd/index_rule.go
similarity index 64%
copy from bydbctl/internal/cmd/measure.go
copy to bydbctl/internal/cmd/index_rule.go
index 9a66323..aea2088 100644
--- a/bydbctl/internal/cmd/measure.go
+++ b/bydbctl/internal/cmd/index_rule.go
@@ -18,7 +18,6 @@
package cmd
import (
- "encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
@@ -29,40 +28,40 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const measureSchemaPath = "/api/v1/measure/schema"
+const indexRuleSchemaPath = "/api/v1/index-rule/schema"
-var measureSchemaPathWithParams = measureSchemaPath + "/{group}/{name}"
+var indexRuleSchemaPathWithParams = indexRuleSchemaPath + "/{group}/{name}"
-func newMeasureCmd() *cobra.Command {
- measureCmd := &cobra.Command{
- Use: "measure",
+func newIndexRuleCmd() *cobra.Command {
+ indexRuleCmd := &cobra.Command{
+ Use: "indexRule",
Version: version.Build(),
- Short: "Measure operation",
+ Short: "IndexRule operation",
}
createCmd := &cobra.Command{
Use: "create -f [file|dir|-]",
Version: version.Build(),
- Short: "Create measures from files",
+ Short: "Create indexRules from files",
RunE: func(cmd *cobra.Command, _ []string) error {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Measure)
+ s := new(database_v1.IndexRule)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.MeasureRegistryServiceCreateRequest{
- Measure: s,
+ cr :=
&database_v1.IndexRuleRegistryServiceCreateRequest{
+ IndexRule: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
- return
request.req.SetBody(b).Post(getPath(measureSchemaPath))
+ return
request.req.SetBody(b).Post(getPath(indexRuleSchemaPath))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is created",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRule %s.%s is
created", reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -72,28 +71,28 @@ func newMeasureCmd() *cobra.Command {
updateCmd := &cobra.Command{
Use: "update -f [file|dir|-]",
Version: version.Build(),
- Short: "Update measures from files",
+ Short: "Update indexRules from files",
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Measure)
+ s := new(database_v1.IndexRule)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.MeasureRegistryServiceUpdateRequest{
- Measure: s,
+ cr :=
&database_v1.IndexRuleRegistryServiceUpdateRequest{
+ IndexRule: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
return request.req.SetBody(b).
SetPathParam("name",
request.name).SetPathParam("group", request.group).
-
Put(getPath(measureSchemaPathWithParams))
+
Put(getPath(indexRuleSchemaPathWithParams))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is updated",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRule %s.%s is
updated", reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -103,10 +102,10 @@ func newMeasureCmd() *cobra.Command {
getCmd := &cobra.Command{
Use: "get [-g group] -n name",
Version: version.Build(),
- Short: "Get a measure",
+ Short: "Get a indexRule",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(measureSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(indexRuleSchemaPathWithParams))
}, yamlPrinter)
},
}
@@ -114,12 +113,12 @@ func newMeasureCmd() *cobra.Command {
deleteCmd := &cobra.Command{
Use: "delete [-g group] -n name",
Version: version.Build(),
- Short: "Delete a measure",
+ Short: "Delete a indexRule",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(measureSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(indexRuleSchemaPathWithParams))
}, func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is deleted",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRule %s.%s is deleted",
reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -130,28 +129,16 @@ func newMeasureCmd() *cobra.Command {
listCmd := &cobra.Command{
Use: "list [-g group]",
Version: version.Build(),
- Short: "List measures",
+ Short: "List indexRules",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/index-rule/schema/lists/{group}"))
}, yamlPrinter)
},
}
- queryCmd := &cobra.Command{
- Use: "query -f [file|dir|-]",
- Version: version.Build(),
- Short: "Query data in a measure",
- RunE: func(cmd *cobra.Command, _ []string) (err error) {
- return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
- func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
- }, yamlPrinter)
- },
- }
- bindFileFlag(createCmd, updateCmd, queryCmd)
- bindTimeRangeFlag(queryCmd)
+ bindFileFlag(createCmd, updateCmd)
- measureCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
- return measureCmd
+ indexRuleCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd,
listCmd)
+ return indexRuleCmd
}
diff --git a/bydbctl/internal/cmd/measure.go
b/bydbctl/internal/cmd/index_rule_binding.go
similarity index 64%
copy from bydbctl/internal/cmd/measure.go
copy to bydbctl/internal/cmd/index_rule_binding.go
index 9a66323..5c7bcb5 100644
--- a/bydbctl/internal/cmd/measure.go
+++ b/bydbctl/internal/cmd/index_rule_binding.go
@@ -18,7 +18,6 @@
package cmd
import (
- "encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
@@ -29,40 +28,40 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const measureSchemaPath = "/api/v1/measure/schema"
+const indexRuleBindingSchemaPath = "/api/v1/index-rule-binding/schema"
-var measureSchemaPathWithParams = measureSchemaPath + "/{group}/{name}"
+var indexRuleBindingSchemaPathWithParams = indexRuleBindingSchemaPath +
"/{group}/{name}"
-func newMeasureCmd() *cobra.Command {
- measureCmd := &cobra.Command{
- Use: "measure",
+func newIndexRuleBindingCmd() *cobra.Command {
+ indexRuleBindingCmd := &cobra.Command{
+ Use: "indexRuleBinding",
Version: version.Build(),
- Short: "Measure operation",
+ Short: "IndexRuleBinding operation",
}
createCmd := &cobra.Command{
Use: "create -f [file|dir|-]",
Version: version.Build(),
- Short: "Create measures from files",
+ Short: "Create indexRuleBindings from files",
RunE: func(cmd *cobra.Command, _ []string) error {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Measure)
+ s := new(database_v1.IndexRuleBinding)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.MeasureRegistryServiceCreateRequest{
- Measure: s,
+ cr :=
&database_v1.IndexRuleBindingRegistryServiceCreateRequest{
+ IndexRuleBinding: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
- return
request.req.SetBody(b).Post(getPath(measureSchemaPath))
+ return
request.req.SetBody(b).Post(getPath(indexRuleBindingSchemaPath))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is created",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRuleBinding %s.%s is
created", reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -72,28 +71,28 @@ func newMeasureCmd() *cobra.Command {
updateCmd := &cobra.Command{
Use: "update -f [file|dir|-]",
Version: version.Build(),
- Short: "Update measures from files",
+ Short: "Update indexRuleBindings from files",
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Measure)
+ s := new(database_v1.IndexRuleBinding)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.MeasureRegistryServiceUpdateRequest{
- Measure: s,
+ cr :=
&database_v1.IndexRuleBindingRegistryServiceUpdateRequest{
+ IndexRuleBinding: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
return request.req.SetBody(b).
SetPathParam("name",
request.name).SetPathParam("group", request.group).
-
Put(getPath(measureSchemaPathWithParams))
+
Put(getPath(indexRuleBindingSchemaPathWithParams))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is updated",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRuleBinding %s.%s is
updated", reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -103,10 +102,10 @@ func newMeasureCmd() *cobra.Command {
getCmd := &cobra.Command{
Use: "get [-g group] -n name",
Version: version.Build(),
- Short: "Get a measure",
+ Short: "Get a indexRuleBinding",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(measureSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(indexRuleBindingSchemaPathWithParams))
}, yamlPrinter)
},
}
@@ -114,12 +113,12 @@ func newMeasureCmd() *cobra.Command {
deleteCmd := &cobra.Command{
Use: "delete [-g group] -n name",
Version: version.Build(),
- Short: "Delete a measure",
+ Short: "Delete a indexRuleBinding",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(measureSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(indexRuleBindingSchemaPathWithParams))
}, func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("measure %s.%s is deleted",
reqBody.group, reqBody.name)
+ fmt.Printf("indexRuleBinding %s.%s is deleted",
reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -130,28 +129,16 @@ func newMeasureCmd() *cobra.Command {
listCmd := &cobra.Command{
Use: "list [-g group]",
Version: version.Build(),
- Short: "List measures",
+ Short: "List indexRuleBindings",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/index-rule-binding/schema/lists/{group}"))
}, yamlPrinter)
},
}
- queryCmd := &cobra.Command{
- Use: "query -f [file|dir|-]",
- Version: version.Build(),
- Short: "Query data in a measure",
- RunE: func(cmd *cobra.Command, _ []string) (err error) {
- return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
- func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
- }, yamlPrinter)
- },
- }
- bindFileFlag(createCmd, updateCmd, queryCmd)
- bindTimeRangeFlag(queryCmd)
+ bindFileFlag(createCmd, updateCmd)
- measureCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
- return measureCmd
+ indexRuleBindingCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd,
listCmd)
+ return indexRuleBindingCmd
}
diff --git a/bydbctl/internal/cmd/index_rule_binding_test.go
b/bydbctl/internal/cmd/index_rule_binding_test.go
new file mode 100644
index 0000000..ac3e328
--- /dev/null
+++ b/bydbctl/internal/cmd/index_rule_binding_test.go
@@ -0,0 +1,165 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+var _ = Describe("IndexRuleBindingSchema Operation", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ // extracting the operation of creating indexRuleBinding schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+ shard_num: 2
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is created"))
+ rootCmd.SetArgs([]string{"indexRuleBinding", "create", "-a",
addr, "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1
+subject:
+ catalog: CATALOG_STREAM
+ name: stream1`))
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRuleBinding group1.name1
is created"))
+ })
+
+ It("get indexRuleBinding schema", func() {
+ rootCmd.SetArgs([]string{"indexRuleBinding", "get", "-g",
"group1", "-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ resp :=
new(database_v1.IndexRuleBindingRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRuleBinding.Metadata.Group).To(Equal("group1"))
+ Expect(resp.IndexRuleBinding.Metadata.Name).To(Equal("name1"))
+ Expect(resp.IndexRuleBinding.Subject.Name).To(Equal("stream1"))
+ })
+
+ It("update indexRuleBinding schema", func() {
+ rootCmd.SetArgs([]string{"indexRuleBinding", "update", "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1
+subject:
+ catalog: CATALOG_STREAM
+ name: stream2`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRuleBinding group1.name1
is updated"))
+ rootCmd.SetArgs([]string{"indexRuleBinding", "get", "-g",
"group1", "-n", "name1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp :=
new(database_v1.IndexRuleBindingRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRuleBinding.Metadata.Group).To(Equal("group1"))
+ Expect(resp.IndexRuleBinding.Metadata.Name).To(Equal("name1"))
+ Expect(resp.IndexRuleBinding.Subject.Name).To(Equal("stream2"))
+ })
+
+ It("delete indexRuleBinding schema", func() {
+ // delete
+ rootCmd.SetArgs([]string{"indexRuleBinding", "delete", "-g",
"group1", "-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRuleBinding group1.name1
is deleted"))
+ // get again
+ rootCmd.SetArgs([]string{"indexRuleBinding", "get", "-g",
"group1", "-n", "name1"})
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
+ })
+
+ It("list indexRuleBinding schema", func() {
+ // create another indexRuleBinding schema for list operation
+ rootCmd.SetArgs([]string{"indexRuleBinding", "create", "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1
+subject:
+ catalog: CATALOG_STREAM
+ name: stream2`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRuleBinding group1.name2
is created"))
+ // list
+ rootCmd.SetArgs([]string{"indexRuleBinding", "list", "-g",
"group1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp :=
new(database_v1.IndexRuleBindingRegistryServiceListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRuleBinding).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/bydbctl/internal/cmd/index_rule_test.go
b/bydbctl/internal/cmd/index_rule_test.go
new file mode 100644
index 0000000..f09ff1e
--- /dev/null
+++ b/bydbctl/internal/cmd/index_rule_test.go
@@ -0,0 +1,154 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+var _ = Describe("IndexRuleSchema Operation", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ // extracting the operation of creating indexRule schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+ shard_num: 2
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is created"))
+ rootCmd.SetArgs([]string{"indexRule", "create", "-a", addr,
"-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1`))
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRule group1.name1 is
created"))
+ })
+
+ It("get indexRule schema", func() {
+ rootCmd.SetArgs([]string{"indexRule", "get", "-g", "group1",
"-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ resp := new(database_v1.IndexRuleRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRule.Metadata.Group).To(Equal("group1"))
+ Expect(resp.IndexRule.Metadata.Name).To(Equal("name1"))
+ })
+
+ It("update indexRule schema", func() {
+ rootCmd.SetArgs([]string{"indexRule", "update", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRule group1.name1 is
updated"))
+ rootCmd.SetArgs([]string{"indexRule", "get", "-g", "group1",
"-n", "name1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.IndexRuleRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRule.Metadata.Group).To(Equal("group1"))
+ Expect(resp.IndexRule.Metadata.Name).To(Equal("name1"))
+ })
+
+ It("delete indexRule schema", func() {
+ // delete
+ rootCmd.SetArgs([]string{"indexRule", "delete", "-g", "group1",
"-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRule group1.name1 is
deleted"))
+ // get again
+ rootCmd.SetArgs([]string{"indexRule", "get", "-g", "group1",
"-n", "name1"})
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
+ })
+
+ It("list indexRule schema", func() {
+ // create another indexRule schema for list operation
+ rootCmd.SetArgs([]string{"indexRule", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("indexRule group1.name2 is
created"))
+ // list
+ rootCmd.SetArgs([]string{"indexRule", "list", "-g", "group1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.IndexRuleRegistryServiceListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.IndexRule).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/bydbctl/internal/cmd/measure.go b/bydbctl/internal/cmd/measure.go
index 9a66323..178780c 100644
--- a/bydbctl/internal/cmd/measure.go
+++ b/bydbctl/internal/cmd/measure.go
@@ -18,7 +18,6 @@
package cmd
import (
- "encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
@@ -55,7 +54,7 @@ func newMeasureCmd() *cobra.Command {
cr :=
&database_v1.MeasureRegistryServiceCreateRequest{
Measure: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
@@ -84,7 +83,7 @@ func newMeasureCmd() *cobra.Command {
cr :=
&database_v1.MeasureRegistryServiceUpdateRequest{
Measure: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
@@ -139,9 +138,10 @@ func newMeasureCmd() *cobra.Command {
}
queryCmd := &cobra.Command{
- Use: "query -f [file|dir|-]",
+ Use: "query [-s start_time] [-e end_time] -f [file|dir|-]",
Version: version.Build(),
Short: "Query data in a measure",
+ Long: timeRangeUsage,
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
diff --git a/bydbctl/internal/cmd/measure_test.go
b/bydbctl/internal/cmd/measure_test.go
index b7e79c3..bf7dcb6 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -165,24 +165,27 @@ var _ = Describe("Measure Data Query", func() {
var addr, grpcAddr string
var deferFunc func()
var rootCmd *cobra.Command
+ var now time.Time
+ var nowStr, endStr string
+ var interval time.Duration
BeforeEach(func() {
+ now = timestamp.NowMilli()
+ nowStr = now.Format(RFC3339)
+ interval = 1 * time.Millisecond
+ endStr = now.Add(1 * time.Hour).Format(RFC3339)
grpcAddr, addr, deferFunc = setup.SetUp()
Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
addr = "http://" + addr
- time.Sleep(1 * time.Second)
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
})
- It("query measure data", func() {
+ It("query all measure data", func() {
conn, err := grpclib.Dial(
grpcAddr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
- now := timestamp.NowMilli()
- interval := 500 * time.Millisecond
- end := now.Add(1 * time.Hour)
cases_measure_data.Write(conn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", now, interval)
rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f",
"-"})
issue := func() string {
@@ -197,7 +200,7 @@ tagProjection:
tagFamilies:
- name: default
tags:
- - id`, now.Format(RFC3339), end.Format(RFC3339))))
+ - id`, nowStr, endStr)))
return capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
@@ -214,6 +217,52 @@ tagProjection:
}).Should(Equal(6))
})
+ DescribeTable("query measure data with time range flags", func(timeArgs
...string) {
+ conn, err := grpclib.Dial(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now := timestamp.NowMilli()
+ interval := -time.Minute
+ cases_measure_data.Write(conn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", now, interval)
+ args := []string{"measure", "query", "-a", addr}
+ args = append(args, timeArgs...)
+ args = append(args, "-f", "-")
+ rootCmd.SetArgs(args)
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: service_cpm_minute
+ group: sw_metric
+tagProjection:
+ tagFamilies:
+ - name: default
+ tags:
+ - id`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ resp := new(measure_v1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.DataPoints)
+ }).Should(Equal(6))
+ },
+ Entry("relative start", "--start", "-30m"),
+ Entry("relative end", "--end", "0m"),
+ Entry("absolute start", "--start", nowStr),
+ Entry("absolute end", "--end", endStr),
+ Entry("default"),
+ Entry("all relative", "--start", "-30m", "--end", "0m"),
+ Entry("all absolute", "--start", nowStr, "--end", endStr),
+ )
+
AfterEach(func() {
deferFunc()
})
diff --git a/bydbctl/internal/cmd/property.go b/bydbctl/internal/cmd/property.go
new file mode 100644
index 0000000..b47b823
--- /dev/null
+++ b/bydbctl/internal/cmd/property.go
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+ "github.com/go-resty/resty/v2"
+ "github.com/spf13/cobra"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ property_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/pkg/version"
+)
+
+const propertySchemaPath = "/api/v1/property"
+
+var (
+ id string
+ tags []string
+ ids []string
+ propertySchemaPathWithoutTagParams = propertySchemaPath +
"/{group}/{name}/{id}"
+ propertySchemaPathWithTagParams = propertySchemaPath +
"/{group}/{name}/{id}/{tag}"
+ propertyListSchemaPathWithTagParams = propertySchemaPath +
"/lists/{group}/{name}/{ids}/{tags}"
+)
+
+func newPropertyCmd() *cobra.Command {
+ propertyCmd := &cobra.Command{
+ Use: "property",
+ Version: version.Build(),
+ Short: "Property operation",
+ }
+
+ applyCmd := &cobra.Command{
+ Use: "apply -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Apply(Create or Update) properties from files",
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ return rest(func() ([]reqBody, error) { return
parseFromYAMLForProperty(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ s := new(property_v1.Property)
+ err :=
protojson.Unmarshal(request.data, s)
+ if err != nil {
+ return nil, err
+ }
+ cr := &property_v1.ApplyRequest{
+ Property: s,
+ }
+ b, err := protojson.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return
request.req.SetPathParam("group", request.group).SetPathParam("name",
request.name).
+ SetPathParam("id",
request.id).SetBody(b).Put(getPath(propertySchemaPathWithoutTagParams))
+ }, yamlPrinter)
+ },
+ }
+ bindFileFlag(applyCmd)
+
+ getCmd := &cobra.Command{
+ Use: "get [-g group] -n name -i id [-t tags]",
+ Version: version.Build(),
+ Short: "Get a property",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group", request.group).
+ SetPathParam("id",
request.id).SetPathParam("tag", request.tags()).
+
Get(getPath(propertySchemaPathWithTagParams))
+ }, yamlPrinter)
+ },
+ }
+
+ deleteCmd := &cobra.Command{
+ Use: "delete [-g group] -n name -i id -t [tags]",
+ Version: version.Build(),
+ Short: "Delete a property",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group", request.group).
+ SetPathParam("id",
request.id).SetPathParam("tag",
request.tags()).Delete(getPath(propertySchemaPathWithTagParams))
+ }, yamlPrinter)
+ },
+ }
+ bindNameAndIDAndTagsFlag(getCmd, deleteCmd)
+
+ listCmd := &cobra.Command{
+ Use: "list [-g group] -n name",
+ Version: version.Build(),
+ Short: "List properties",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group", request.group).
+ SetPathParam("ids",
request.ids()).SetPathParam("tags",
request.tags()).Get(getPath(propertyListSchemaPathWithTagParams))
+ }, yamlPrinter)
+ },
+ }
+ bindNameFlag(listCmd)
+ listCmd.Flags().StringArrayVarP(&ids, "ids", "", nil, "id selector")
+ listCmd.Flags().StringArrayVarP(&tags, "tags", "t", nil, "tag selector")
+
+ propertyCmd.AddCommand(getCmd, applyCmd, deleteCmd, listCmd)
+ return propertyCmd
+}
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
new file mode 100644
index 0000000..acbbe71
--- /dev/null
+++ b/bydbctl/internal/cmd/property_test.go
@@ -0,0 +1,230 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "strings"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ property_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+var _ = Describe("Property Operation", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ p1YAML := `
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: kubernetes
+tags:
+ - key: content
+ value:
+ str:
+ value: foo
+ - key: state
+ value:
+ int:
+ value: 1
+`
+ p2YAML := `
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: kubernetes
+tags:
+ - key: content
+ value:
+ str:
+ value: foo
+ - key: state
+ value:
+ int:
+ value: 3
+`
+ p1Proto := new(property_v1.Property)
+ helpers.UnmarshalYAML([]byte(p1YAML), p1Proto)
+ p2Proto := new(property_v1.Property)
+ helpers.UnmarshalYAML([]byte(p2YAML), p2Proto)
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ // extracting the operation of creating property schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"property", "apply", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(p1YAML))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ Expect(out).To(ContainSubstring("created: true"))
+ Expect(out).To(ContainSubstring("tagsNum: 2"))
+ })
+
+ It("gets property", func() {
+ rootCmd.SetArgs([]string{"property", "get", "-g",
"ui-template", "-n", "service", "-i", "kubernetes"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ resp := new(property_v1.GetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(cmp.Equal(resp.Property, p1Proto,
+ protocmp.IgnoreUnknown(),
+ protocmp.Transform())).To(BeTrue())
+ })
+
+ It("gets a tag", func() {
+ rootCmd.SetArgs([]string{
+ "property", "get", "-g", "ui-template", "-n",
+ "service", "-i", "kubernetes", "-t", "state",
+ })
+
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ resp := new(property_v1.GetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Property.Tags).To(HaveLen(1))
+ Expect(resp.Property.Tags[0].Key).To(Equal("state"))
+ })
+
+ It("gets tags", func() {
+ rootCmd.SetArgs([]string{
+ "property", "get", "-g", "ui-template", "-n",
+ "service", "-i", "kubernetes", "-t", "content,state",
+ })
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ GinkgoWriter.Println(out)
+ resp := new(property_v1.GetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(cmp.Equal(resp.Property, p1Proto,
+ protocmp.IgnoreUnknown(),
+ protocmp.Transform())).To(BeTrue())
+ })
+
+ It("update property", func() {
+ rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: kubernetes
+tags:
+- key: state
+ value:
+ int:
+ value: 3
+`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("created: false"))
+ Expect(out).To(ContainSubstring("tagsNum: 1"))
+ rootCmd.SetArgs([]string{"property", "get", "-g",
"ui-template", "-n", "service", "-i", "kubernetes"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(property_v1.GetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+
+ Expect(cmp.Equal(resp.Property, p2Proto,
+ protocmp.IgnoreUnknown(),
+ protocmp.Transform())).To(BeTrue())
+ })
+
+ It("delete property", func() {
+ // delete
+ rootCmd.SetArgs([]string{"property", "delete", "-g",
"ui-template", "-n", "service", "-i", "kubernetes"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("deleted: true"))
+ Expect(out).To(ContainSubstring("tagsNum: 0"))
+ // get again
+ rootCmd.SetArgs([]string{"property", "get", "-g",
"ui-template", "-n", "service", "-i", "kubernetes"})
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
+ })
+
+ It("list property", func() {
+ // create another property for list operation
+ rootCmd.SetArgs([]string{"property", "apply", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ container:
+ group: ui-template
+ name: service
+ id: spring
+tags:
+ - key: content
+ value:
+ str:
+ value: bar
+ - key: state
+ value:
+ int:
+ value: 1
+`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("created: true"))
+ Expect(out).To(ContainSubstring("tagsNum: 2"))
+ // list
+ rootCmd.SetArgs([]string{"property", "list", "-g",
"ui-template", "-n", "service"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(property_v1.ListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Property).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index 7a791c9..b42e50f 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
+ "strings"
"time"
"github.com/go-resty/resty/v2"
@@ -38,8 +39,18 @@ import (
const (
// RFC3339 refers to https://www.rfc-editor.org/rfc/rfc3339
- RFC3339 = "2006-01-02T15:04:05Z07:00"
- timeRange = 30 * time.Minute
+ RFC3339 = "2006-01-02T15:04:05Z07:00"
+ timeRange = 30 * time.Minute
+ timeRangeUsage = `"start" and "end" specify a time range during which
the query is preformed,
+ they can be absolute time like
"2006-01-02T15:04:05Z07:00"(https://www.rfc-editor.org/rfc/rfc3339),
+ or relative time (to the current time) like "-30m", "30m".
+ They are both optional and their default values follow the
rules below:
+ 1. when "start" and "end" are both absent, "start = now - 30
minutes" and "end = now",
+ namely past 30 minutes;
+ 2. when "start" is absent and "end" is present, this command
calculates "start" (minus 30 units),
+ e.g. "end = 2022-11-09T12:34:00Z", so "start = end - 30 minutes
= 2022-11-09T12:04:00Z";
+ 3. when "start" is present and "end" is absent, this command
calculates "end" (plus 30 units),
+ e.g. "start = 2022-11-09T12:04:00Z", so "end = start + 30
minutes = 2022-11-09T12:34:00Z".`
)
var errMalformedInput = errors.New("malformed input")
@@ -47,6 +58,9 @@ var errMalformedInput = errors.New("malformed input")
type reqBody struct {
name string
group string
+ id string
+ ids []string
+ tags []string
parsedData map[string]interface{}
data []byte
}
@@ -56,6 +70,20 @@ type request struct {
reqBody
}
+func (r request) ids() string {
+ if len(r.reqBody.ids) == 0 {
+ return "*"
+ }
+ return strings.Join(r.reqBody.ids, ",")
+}
+
+func (r request) tags() string {
+ if len(r.reqBody.tags) == 0 {
+ return "*"
+ }
+ return strings.Join(r.reqBody.tags, ",")
+}
+
type reqFn func(request request) (*resty.Response, error)
type paramsFn func() ([]reqBody, error)
@@ -118,6 +146,9 @@ func parseFromFlags() (requests []reqBody, err error) {
return nil, err
}
requests[0].name = name
+ requests[0].id = id
+ requests[0].ids = ids
+ requests[0].tags = tags
return requests, nil
}
@@ -138,7 +169,7 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader)
(requests []reqBody, err er
if startTS, err = parseTime(start); err != nil {
return nil, err
}
- if endTS, err = parseTime(start); err != nil {
+ if endTS, err = parseTime(end); err != nil {
return nil, err
}
} else if start != "" {
@@ -154,26 +185,29 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader)
(requests []reqBody, err er
}
s := startTS.Format(RFC3339)
e := endTS.Format(RFC3339)
- if requests, err = parseNameAndGroupFromYAML(reader); err != nil {
+ var rawRequests []reqBody
+ if rawRequests, err = parseNameAndGroupFromYAML(reader); err != nil {
return nil, err
}
- for _, rb := range requests {
+ for _, rb := range rawRequests {
if rb.parsedData["timeRange"] != nil {
+ requests = append(requests, rb)
continue
}
timeRange := make(map[string]interface{})
- timeRange["start"] = s
+ timeRange["begin"] = s
timeRange["end"] = e
rb.parsedData["timeRange"] = timeRange
rb.data, err = json.Marshal(rb.parsedData)
if err != nil {
return nil, err
}
+ requests = append(requests, rb)
}
return requests, nil
}
-func parseTime(timestamp string) (time.Time, error) { // if timeStamp is
duration(relative time), return (true, the uint of timestamp, nil).
+func parseTime(timestamp string) (time.Time, error) {
if len(timestamp) < 1 {
return time.Time{}, errors.New("time is empty")
}
@@ -188,6 +222,60 @@ func parseTime(timestamp string) (time.Time, error) { //
if timeStamp is duratio
return time.Now().Add(duration), nil
}
+func parseFromYAMLForProperty(reader io.Reader) (requests []reqBody, err
error) {
+ contents, err := file.Read(filePath, reader)
+ if err != nil {
+ return nil, err
+ }
+ for _, c := range contents {
+ j, err := yaml.YAMLToJSON(c)
+ if err != nil {
+ return nil, err
+ }
+ var data map[string]interface{}
+ err = json.Unmarshal(j, &data)
+ if err != nil {
+ return nil, err
+ }
+ metadata, ok := data["metadata"].(map[string]interface{})
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: metadata")
+ }
+ container, ok := metadata["container"].(map[string]interface{})
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: container")
+ }
+ group, ok := container["group"].(string)
+ if !ok {
+ group = viper.GetString("group")
+ if group == "" {
+ return nil, errors.New("please specify a group
through the input json or the config file")
+ }
+ metadata["group"] = group
+ }
+ name, ok = container["name"].(string)
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: name in metadata")
+ }
+ id, ok = metadata["id"].(string)
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: id")
+ }
+ j, err = json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ requests = append(requests, reqBody{
+ name: name,
+ group: group,
+ id: id,
+ data: j,
+ parsedData: data,
+ })
+ }
+ return requests, nil
+}
+
type printer func(index int, reqBody reqBody, body []byte) error
func yamlPrinter(index int, _ reqBody, body []byte) error {
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index 285afa1..c9ea5c7 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -55,7 +55,7 @@ func RootCmdFlags(command *cobra.Command) {
_ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
viper.SetDefault("addr", "http://localhost:17913")
- command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(),
newMeasureCmd())
+ command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(),
newMeasureCmd(), newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd())
}
func init() {
@@ -117,3 +117,13 @@ func bindTimeRangeFlag(commands ...*cobra.Command) {
c.Flags().StringVarP(&end, "end", "e", "", "End time of the
time range during which the query is preformed")
}
}
+
+func bindNameAndIDAndTagsFlag(commands ...*cobra.Command) {
+ bindNameFlag(commands...)
+ for _, c := range commands {
+ c.Flags().StringVarP(&id, "id", "i", "", "the property's id")
+ c.Flags().StringArrayVarP(&tags, "tags", "t", nil, "the
property's tags")
+ _ = c.MarkFlagRequired("name")
+ _ = c.MarkFlagRequired("id")
+ }
+}
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
index 6db1a94..75d894f 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/stream.go
@@ -18,7 +18,6 @@
package cmd
import (
- "encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
@@ -60,7 +59,7 @@ func newStreamCmd() *cobra.Command {
cr :=
&database_v1.StreamRegistryServiceCreateRequest{
Stream: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
@@ -89,7 +88,7 @@ func newStreamCmd() *cobra.Command {
cr :=
&database_v1.StreamRegistryServiceUpdateRequest{
Stream: s,
}
- b, err := json.Marshal(cr)
+ b, err := protojson.Marshal(cr)
if err != nil {
return nil, err
}
@@ -144,9 +143,10 @@ func newStreamCmd() *cobra.Command {
}
queryCmd := &cobra.Command{
- Use: "query -f [file|dir|-]",
+ Use: "query [-s start_time] [-e end_time] -f [file|dir|-]",
Version: version.Build(),
Short: "Query data in a stream",
+ Long: timeRangeUsage,
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
index 0d18c51..187c8c2 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -190,7 +190,7 @@ var _ = Describe("Stream Data Query", func() {
cmd.RootCmdFlags(rootCmd)
})
- It("query stream data", func() {
+ It("query stream all data", func() {
conn, err := grpclib.Dial(
grpcAddr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
@@ -226,6 +226,7 @@ projection:
return len(resp.Elements)
}).Should(Equal(5))
})
+
DescribeTable("query stream data with time range flags", func(timeArgs
...string) {
conn, err := grpclib.Dial(
grpcAddr,
@@ -233,7 +234,7 @@ projection:
)
Expect(err).NotTo(HaveOccurred())
now := timestamp.NowMilli()
- interval := 500 * time.Millisecond
+ interval := -1 * time.Millisecond
cases_stream_data.Write(conn, "data.json", now, interval)
args := []string{"stream", "query", "-a", addr}
args = append(args, timeArgs...)
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 967d02d..21a35d7 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -103,7 +103,7 @@ func loadData(md *commonv1.Metadata, measure
measurev1.MeasureService_WriteClien
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
dataPointValue := &measurev1.DataPointValue{}
gm.Expect(protojson.Unmarshal(rawDataPointValue,
dataPointValue)).ShouldNot(gm.HaveOccurred())
- dataPointValue.Timestamp =
timestamppb.New(baseTime.Add(time.Duration(i) * time.Minute))
+ dataPointValue.Timestamp =
timestamppb.New(baseTime.Add(time.Duration(i) * interval))
gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md,
DataPoint: dataPointValue})).
Should(gm.Succeed())
}
diff --git a/test/integration/cold_query/query_suite_test.go
b/test/integration/cold_query/query_suite_test.go
index 0f8f2fd..649cc5f 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -63,6 +63,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
now = timestamp.NowMilli().Add(-time.Hour * 24)
interval := 500 * time.Millisecond
casesStreamData.Write(conn, "data.json", now, interval)
+ interval = time.Minute
casesMeasureData.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
diff --git a/test/integration/query/query_suite_test.go
b/test/integration/query/query_suite_test.go
index 4b659b0..3b2bb9f 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -65,6 +65,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
// stream
cases_stream_data.Write(conn, "data.json", now, interval)
// measure
+ interval = time.Minute
cases_measure_data.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)