This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e9d57afb Polish indexScan plan for stream (#382)
e9d57afb is described below

commit e9d57afbef9fc01c162ad25f666efd43cd59085d
Author: Huang Youliang <[email protected]>
AuthorDate: Thu Jan 25 14:37:15 2024 +0800

    Polish indexScan plan for stream (#382)
---
 .../logical/stream/stream_plan_indexscan_local.go  | 109 +++++++++------------
 1 file changed, 48 insertions(+), 61 deletions(-)

diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index cf553a55..ad62fad0 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -71,6 +71,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(elements []*streamv1.Elem
                }
        }
        ec := executor.FromStreamExecutionContext(ctx)
+
        if i.order != nil && i.order.Index != nil {
                ssr, err := ec.Sort(ctx, pbv1.StreamSortOptions{
                        Name:           i.metadata.GetName(),
@@ -88,27 +89,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(elements []*streamv1.Elem
                        return elements, nil
                }
                r := ssr.Pull()
-               for i := range r.Timestamps {
-                       e := &streamv1.Element{
-                               Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
-                               ElementId: r.ElementIDs[i],
-                       }
-
-                       for _, tf := range r.TagFamilies[i] {
-                               tagFamily := &modelv1.TagFamily{
-                                       Name: tf.Name,
-                               }
-                               e.TagFamilies = append(e.TagFamilies, tagFamily)
-                               for _, t := range tf.Tags {
-                                       tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
-                                               Key:   t.Name,
-                                               Value: t.Values[0],
-                                       })
-                               }
-                       }
-                       elements = append(elements, e)
-               }
-               return elements, nil
+               return buildElementsFromColumnResult(r), nil
        }
 
        if i.filter != nil && i.filter != logical.Enode {
@@ -128,27 +109,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(elements []*streamv1.Elem
                        return elements, nil
                }
                r := sfr.Pull()
-               for i := range r.Timestamps {
-                       e := &streamv1.Element{
-                               Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
-                               ElementId: r.ElementIDs[i],
-                       }
-
-                       for _, tf := range r.TagFamilies[i] {
-                               tagFamily := &modelv1.TagFamily{
-                                       Name: tf.Name,
-                               }
-                               e.TagFamilies = append(e.TagFamilies, tagFamily)
-                               for _, t := range tf.Tags {
-                                       tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
-                                               Key:   t.Name,
-                                               Value: t.Values[0],
-                                       })
-                               }
-                       }
-                       elements = append(elements, e)
-               }
-               return elements, nil
+               return buildElementsFromColumnResult(r), nil
        }
 
        var results []pbv1.StreamQueryResult
@@ -164,10 +125,53 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(elements []*streamv1.Elem
                if err != nil {
                        return nil, fmt.Errorf("failed to query stream: %w", 
err)
                }
-
                results = append(results, result)
        }
+       return buildElementsFromQueryResults(results), nil
+}
 
+func (i *localIndexScan) String() string {
+       return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; 
projection=%s; orderBy=%s; limit=%d",
+               i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
+               i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), 
i.order, i.maxElementSize)
+}
+
+func (i *localIndexScan) Children() []logical.Plan {
+       return []logical.Plan{}
+}
+
+func (i *localIndexScan) Schema() logical.Schema {
+       if i.projectionTagRefs == nil || len(i.projectionTagRefs) == 0 {
+               return i.schema
+       }
+       return i.schema.ProjTags(i.projectionTagRefs...)
+}
+
+func buildElementsFromColumnResult(r *pbv1.StreamColumnResult) (elements 
[]*streamv1.Element) {
+       for i := range r.Timestamps {
+               e := &streamv1.Element{
+                       Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
+                       ElementId: r.ElementIDs[i],
+               }
+
+               for _, tf := range r.TagFamilies[i] {
+                       tagFamily := &modelv1.TagFamily{
+                               Name: tf.Name,
+                       }
+                       e.TagFamilies = append(e.TagFamilies, tagFamily)
+                       for _, t := range tf.Tags {
+                               tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
+                                       Key:   t.Name,
+                                       Value: t.Values[0],
+                               })
+                       }
+               }
+               elements = append(elements, e)
+       }
+       return
+}
+
+func buildElementsFromQueryResults(results []pbv1.StreamQueryResult) (elements 
[]*streamv1.Element) {
        for _, result := range results {
                r := result.Pull()
                if r == nil {
@@ -194,22 +198,5 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(elements []*streamv1.Elem
                        elements = append(elements, e)
                }
        }
-       return elements, nil
-}
-
-func (i *localIndexScan) String() string {
-       return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; 
projection=%s; orderBy=%s; limit=%d",
-               i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
-               i.filter, logical.FormatTagRefs(", ", i.projectionTagRefs...), 
i.order, i.maxElementSize)
-}
-
-func (i *localIndexScan) Children() []logical.Plan {
-       return []logical.Plan{}
-}
-
-func (i *localIndexScan) Schema() logical.Schema {
-       if i.projectionTagRefs == nil || len(i.projectionTagRefs) == 0 {
-               return i.schema
-       }
-       return i.schema.ProjTags(i.projectionTagRefs...)
+       return
 }

Reply via email to