hanahmily commented on code in PR #467:
URL: 
https://github.com/apache/skywalking-banyandb/pull/467#discussion_r1636497703


##########
pkg/meter/native/instruments.go:
##########
@@ -18,10 +18,126 @@
 // Package native provides a simple meter system for metrics. The metrics are 
aggregated by the meter provider.
 package native
 
-type nativeInstrument struct{}
+import (
+       "context"
+       "sync"
+       "time"
 
-func (nativeInstrument) Inc(_ float64, _ ...string)     {}
-func (nativeInstrument) Set(_ float64, _ ...string)     {}
-func (nativeInstrument) Add(_ float64, _ ...string)     {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool        { return false }
+       "github.com/robfig/cron/v3"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+       writeTimeout = 5 * time.Second
+)
+
+type nativeInstrument struct {
+       scheduler     *timestamp.Scheduler
+       pipeline      queue.Client
+       scope         meter.Scope
+       measureName   string
+       messageBuffer []bus.Message
+       mutex         sync.Mutex
+}
+
+func NewNativeInstrument(measureName string, pipeline queue.Client, scope 
meter.Scope) *nativeInstrument {
+       clock, _ := timestamp.GetClock(context.TODO())
+       n := &nativeInstrument{
+               measureName: measureName,
+               pipeline:    pipeline,
+               scope:       scope,
+               scheduler:   timestamp.NewScheduler(log, clock),
+       }
+       err := n.scheduler.Register("flush messages", cron.Descriptor, "@every 
5s", func(_ time.Time, _ *logger.Logger) bool {
+               n.flushMessages()
+               return true
+       })
+       if err != nil {
+               log.Fatal().Err(err).Msg("Failed to register flushMessages")
+       }
+       return n
+}
+
+// Counter Only Methods.
+func (n *nativeInstrument) Inc(_ float64, _ ...string) {}
+
+// Gauge Only Methods.
+func (n *nativeInstrument) Set(value float64, labelValues ...string) {
+       n.mutex.Lock()
+       defer n.mutex.Unlock()
+       message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", 
n.buildIWR(value, labelValues...))
+       n.messageBuffer = append(n.messageBuffer, message)
+}
+
+func (n *nativeInstrument) Add(_ float64, _ ...string) {}
+
+// Histogram Only Methods.
+func (n *nativeInstrument) Observe(_ float64, _ ...string) {}
+
+// Shared Methods.
+func (n *nativeInstrument) Delete(_ ...string) bool { return false }
+
+func (n *nativeInstrument) buildIWR(value float64, labelValues ...string) 
*measurev1.InternalWriteRequest {
+       tagValues := buildTagValues(n.scope, labelValues...)
+       entities, err := pbv1.EntityValues(tagValues).ToEntity()
+       if err != nil {
+               log.Error().Err(err).Msg("Failed to convert tagValues to 
Entity")
+       }
+       writeRequest := &measurev1.WriteRequest{
+               MessageId: uint64(time.Now().UnixNano()),
+               Metadata: &commonv1.Metadata{
+                       Group: NativeObservabilityGroupName,
+                       Name:  n.measureName,
+               },
+               DataPoint: &measurev1.DataPointValue{
+                       Timestamp: 
timestamppb.New(time.Now().Truncate(time.Millisecond)),

Review Comment:
   The "second" unit is sufficient; we don't require millisecond precision.



##########
pkg/meter/native/instruments.go:
##########
@@ -18,10 +18,126 @@
 // Package native provides a simple meter system for metrics. The metrics are 
aggregated by the meter provider.
 package native
 
-type nativeInstrument struct{}
+import (
+       "context"
+       "sync"
+       "time"
 
-func (nativeInstrument) Inc(_ float64, _ ...string)     {}
-func (nativeInstrument) Set(_ float64, _ ...string)     {}
-func (nativeInstrument) Add(_ float64, _ ...string)     {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool        { return false }
+       "github.com/robfig/cron/v3"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+       writeTimeout = 5 * time.Second
+)
+
+type nativeInstrument struct {
+       scheduler     *timestamp.Scheduler
+       pipeline      queue.Client
+       scope         meter.Scope
+       measureName   string
+       messageBuffer []bus.Message
+       mutex         sync.Mutex
+}
+
+func NewNativeInstrument(measureName string, pipeline queue.Client, scope 
meter.Scope) *nativeInstrument {
+       clock, _ := timestamp.GetClock(context.TODO())
+       n := &nativeInstrument{
+               measureName: measureName,
+               pipeline:    pipeline,
+               scope:       scope,
+               scheduler:   timestamp.NewScheduler(log, clock),
+       }
+       err := n.scheduler.Register("flush messages", cron.Descriptor, "@every 
5s", func(_ time.Time, _ *logger.Logger) bool {
+               n.flushMessages()
+               return true
+       })
+       if err != nil {
+               log.Fatal().Err(err).Msg("Failed to register flushMessages")
+       }
+       return n
+}
+
+// Counter Only Methods.
+func (n *nativeInstrument) Inc(_ float64, _ ...string) {}
+
+// Gauge Only Methods.
+func (n *nativeInstrument) Set(value float64, labelValues ...string) {
+       n.mutex.Lock()
+       defer n.mutex.Unlock()
+       message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", 
n.buildIWR(value, labelValues...))

Review Comment:
   It would be better to store the value in an internal field of 
'nativeInstrument' instead. The flushing process will then generate such 
messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to