This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e9d57afb Polish indexScan plan for stream (#382)
e9d57afb is described below
commit e9d57afbef9fc01c162ad25f666efd43cd59085d
Author: Huang Youliang <[email protected]>
AuthorDate: Thu Jan 25 14:37:15 2024 +0800
Polish indexScan plan for stream (#382)
---
.../logical/stream/stream_plan_indexscan_local.go | 109 +++++++++------------
1 file changed, 48 insertions(+), 61 deletions(-)
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index cf553a55..ad62fad0 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -71,6 +71,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
(elements []*streamv1.Elem
}
}
ec := executor.FromStreamExecutionContext(ctx)
+
if i.order != nil && i.order.Index != nil {
ssr, err := ec.Sort(ctx, pbv1.StreamSortOptions{
Name: i.metadata.GetName(),
@@ -88,27 +89,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
(elements []*streamv1.Elem
return elements, nil
}
r := ssr.Pull()
- for i := range r.Timestamps {
- e := &streamv1.Element{
- Timestamp: timestamppb.New(time.Unix(0,
r.Timestamps[i])),
- ElementId: r.ElementIDs[i],
- }
-
- for _, tf := range r.TagFamilies[i] {
- tagFamily := &modelv1.TagFamily{
- Name: tf.Name,
- }
- e.TagFamilies = append(e.TagFamilies, tagFamily)
- for _, t := range tf.Tags {
- tagFamily.Tags = append(tagFamily.Tags,
&modelv1.Tag{
- Key: t.Name,
- Value: t.Values[0],
- })
- }
- }
- elements = append(elements, e)
- }
- return elements, nil
+ return buildElementsFromColumnResult(r), nil
}
if i.filter != nil && i.filter != logical.Enode {
@@ -128,27 +109,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
(elements []*streamv1.Elem
return elements, nil
}
r := sfr.Pull()
- for i := range r.Timestamps {
- e := &streamv1.Element{
- Timestamp: timestamppb.New(time.Unix(0,
r.Timestamps[i])),
- ElementId: r.ElementIDs[i],
- }
-
- for _, tf := range r.TagFamilies[i] {
- tagFamily := &modelv1.TagFamily{
- Name: tf.Name,
- }
- e.TagFamilies = append(e.TagFamilies, tagFamily)
- for _, t := range tf.Tags {
- tagFamily.Tags = append(tagFamily.Tags,
&modelv1.Tag{
- Key: t.Name,
- Value: t.Values[0],
- })
- }
- }
- elements = append(elements, e)
- }
- return elements, nil
+ return buildElementsFromColumnResult(r), nil
}
var results []pbv1.StreamQueryResult
@@ -164,10 +125,53 @@ func (i *localIndexScan) Execute(ctx context.Context)
(elements []*streamv1.Elem
if err != nil {
return nil, fmt.Errorf("failed to query stream: %w",
err)
}
-
results = append(results, result)
}
+ return buildElementsFromQueryResults(results), nil
+}
+func (i *localIndexScan) String() string {
+ return fmt.Sprintf("IndexScan:
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s;
projection=%s; orderBy=%s; limit=%d",
+ i.timeRange.Start.Unix(), i.timeRange.End.Unix(),
i.metadata.GetGroup(), i.metadata.GetName(),
+ i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...),
i.order, i.maxElementSize)
+}
+
+func (i *localIndexScan) Children() []logical.Plan {
+ return []logical.Plan{}
+}
+
+func (i *localIndexScan) Schema() logical.Schema {
+ if i.projectionTagRefs == nil || len(i.projectionTagRefs) == 0 {
+ return i.schema
+ }
+ return i.schema.ProjTags(i.projectionTagRefs...)
+}
+
+func buildElementsFromColumnResult(r *pbv1.StreamColumnResult) (elements
[]*streamv1.Element) {
+ for i := range r.Timestamps {
+ e := &streamv1.Element{
+ Timestamp: timestamppb.New(time.Unix(0,
r.Timestamps[i])),
+ ElementId: r.ElementIDs[i],
+ }
+
+ for _, tf := range r.TagFamilies[i] {
+ tagFamily := &modelv1.TagFamily{
+ Name: tf.Name,
+ }
+ e.TagFamilies = append(e.TagFamilies, tagFamily)
+ for _, t := range tf.Tags {
+ tagFamily.Tags = append(tagFamily.Tags,
&modelv1.Tag{
+ Key: t.Name,
+ Value: t.Values[0],
+ })
+ }
+ }
+ elements = append(elements, e)
+ }
+ return
+}
+
+func buildElementsFromQueryResults(results []pbv1.StreamQueryResult) (elements
[]*streamv1.Element) {
for _, result := range results {
r := result.Pull()
if r == nil {
@@ -194,22 +198,5 @@ func (i *localIndexScan) Execute(ctx context.Context)
(elements []*streamv1.Elem
elements = append(elements, e)
}
}
- return elements, nil
-}
-
-func (i *localIndexScan) String() string {
- return fmt.Sprintf("IndexScan:
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s;
projection=%s; orderBy=%s; limit=%d",
- i.timeRange.Start.Unix(), i.timeRange.End.Unix(),
i.metadata.GetGroup(), i.metadata.GetName(),
- i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...),
i.order, i.maxElementSize)
-}
-
-func (i *localIndexScan) Children() []logical.Plan {
- return []logical.Plan{}
-}
-
-func (i *localIndexScan) Schema() logical.Schema {
- if i.projectionTagRefs == nil || len(i.projectionTagRefs) == 0 {
- return i.schema
- }
- return i.schema.ProjTags(i.projectionTagRefs...)
+ return
}