hanahmily commented on code in PR #467:
URL:
https://github.com/apache/skywalking-banyandb/pull/467#discussion_r1636497703
##########
pkg/meter/native/instruments.go:
##########
@@ -18,10 +18,126 @@
// Package native provides a simple meter system for metrics. The metrics are
aggregated by the meter provider.
package native
-type nativeInstrument struct{}
+import (
+ "context"
+ "sync"
+ "time"
-func (nativeInstrument) Inc(_ float64, _ ...string) {}
-func (nativeInstrument) Set(_ float64, _ ...string) {}
-func (nativeInstrument) Add(_ float64, _ ...string) {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool { return false }
+ "github.com/robfig/cron/v3"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+ writeTimeout = 5 * time.Second
+)
+
+type nativeInstrument struct {
+ scheduler *timestamp.Scheduler
+ pipeline queue.Client
+ scope meter.Scope
+ measureName string
+ messageBuffer []bus.Message
+ mutex sync.Mutex
+}
+
+func NewNativeInstrument(measureName string, pipeline queue.Client, scope
meter.Scope) *nativeInstrument {
+ clock, _ := timestamp.GetClock(context.TODO())
+ n := &nativeInstrument{
+ measureName: measureName,
+ pipeline: pipeline,
+ scope: scope,
+ scheduler: timestamp.NewScheduler(log, clock),
+ }
+ err := n.scheduler.Register("flush messages", cron.Descriptor, "@every
5s", func(_ time.Time, _ *logger.Logger) bool {
+ n.flushMessages()
+ return true
+ })
+ if err != nil {
+ log.Fatal().Err(err).Msg("Failed to register flushMessages")
+ }
+ return n
+}
+
+// Counter Only Methods.
+func (n *nativeInstrument) Inc(_ float64, _ ...string) {}
+
+// Gauge Only Methods.
+func (n *nativeInstrument) Set(value float64, labelValues ...string) {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+ message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "",
n.buildIWR(value, labelValues...))
+ n.messageBuffer = append(n.messageBuffer, message)
+}
+
+func (n *nativeInstrument) Add(_ float64, _ ...string) {}
+
+// Histogram Only Methods.
+func (n *nativeInstrument) Observe(_ float64, _ ...string) {}
+
+// Shared Methods.
+func (n *nativeInstrument) Delete(_ ...string) bool { return false }
+
+func (n *nativeInstrument) buildIWR(value float64, labelValues ...string)
*measurev1.InternalWriteRequest {
+ tagValues := buildTagValues(n.scope, labelValues...)
+ entities, err := pbv1.EntityValues(tagValues).ToEntity()
+ if err != nil {
+ log.Error().Err(err).Msg("Failed to convert tagValues to
Entity")
+ }
+ writeRequest := &measurev1.WriteRequest{
+ MessageId: uint64(time.Now().UnixNano()),
+ Metadata: &commonv1.Metadata{
+ Group: NativeObservabilityGroupName,
+ Name: n.measureName,
+ },
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp:
timestamppb.New(time.Now().Truncate(time.Millisecond)),
Review Comment:
The "second" unit is sufficient; we don't require millisecond precision.
##########
pkg/meter/native/instruments.go:
##########
@@ -18,10 +18,126 @@
// Package native provides a simple meter system for metrics. The metrics are
aggregated by the meter provider.
package native
-type nativeInstrument struct{}
+import (
+ "context"
+ "sync"
+ "time"
-func (nativeInstrument) Inc(_ float64, _ ...string) {}
-func (nativeInstrument) Set(_ float64, _ ...string) {}
-func (nativeInstrument) Add(_ float64, _ ...string) {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool { return false }
+ "github.com/robfig/cron/v3"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+ writeTimeout = 5 * time.Second
+)
+
+type nativeInstrument struct {
+ scheduler *timestamp.Scheduler
+ pipeline queue.Client
+ scope meter.Scope
+ measureName string
+ messageBuffer []bus.Message
+ mutex sync.Mutex
+}
+
+func NewNativeInstrument(measureName string, pipeline queue.Client, scope
meter.Scope) *nativeInstrument {
+ clock, _ := timestamp.GetClock(context.TODO())
+ n := &nativeInstrument{
+ measureName: measureName,
+ pipeline: pipeline,
+ scope: scope,
+ scheduler: timestamp.NewScheduler(log, clock),
+ }
+ err := n.scheduler.Register("flush messages", cron.Descriptor, "@every
5s", func(_ time.Time, _ *logger.Logger) bool {
+ n.flushMessages()
+ return true
+ })
+ if err != nil {
+ log.Fatal().Err(err).Msg("Failed to register flushMessages")
+ }
+ return n
+}
+
+// Counter Only Methods.
+func (n *nativeInstrument) Inc(_ float64, _ ...string) {}
+
+// Gauge Only Methods.
+func (n *nativeInstrument) Set(value float64, labelValues ...string) {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+ message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "",
n.buildIWR(value, labelValues...))
Review Comment:
It would be better to store the value in an internal field of
'nativeInstrument' instead. The flushing process will then generate such
messages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]