mrproliu commented on code in PR #156:
URL:
https://github.com/apache/skywalking-satellite/pull/156#discussion_r1834034663
##########
plugins/client/grpc/client_config.go:
##########
@@ -68,14 +68,18 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
if authHeader != nil {
ctx = metadata.NewOutgoingContext(ctx, authHeader)
}
+ supportBidirectionalStream := false
+ if b := ctx.Value("BidirectionalStream"); b != nil {
Review Comment:
Please make the key as a const.
##########
plugins/forwarder/api/forwarder.go:
##########
@@ -34,7 +35,7 @@ type Forwarder interface {
// Forward the batch events to the external services, such as Kafka MQ
and SkyWalking OAP cluster.
Forward(batch event.BatchEvents) error
// SyncForward the single event to the external service with sync
forward
- SyncForward(event *v1.SniffData) (*v1.SniffData, error)
+ SyncForward(event *v1.SniffData) (*v1.SniffData, grpc.ClientStream,
error)
Review Comment:
Please add a comment about when the user need to use the client stream.
##########
plugins/forwarder/grpc/nativeasyncprofiler/forwarder.go:
##########
@@ -0,0 +1,108 @@
+package nativeasyncprofiler
+
+import (
+ "context"
+ "fmt"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
+ "reflect"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+ "google.golang.org/grpc"
+ asyncprofiler
"skywalking.apache.org/repo/goapi/collect/language/asyncprofiler/v10"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+ Name = "native-async-profiler-grpc-forwarder"
+ ShowName = "Native Async Profiler GRPC Forwarder"
+)
+
+type Forwarder struct {
+ config.CommonFields
+
+ profilingClient asyncprofiler.AsyncProfilerTaskClient
+}
+
+func (f *Forwarder) Name() string {
+ return Name
+}
+
+func (f *Forwarder) ShowName() string {
+ return ShowName
+}
+
+func (f *Forwarder) Description() string {
+ return "This is a grpc forwarder with the SkyWalking native async
profiler protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+ return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+ client, ok := connection.(*grpc.ClientConn)
+ if !ok {
+ return fmt.Errorf("the %s only accepts a grpc client, but
received a %s",
+ f.Name(), reflect.TypeOf(connection).String())
+ }
+ f.profilingClient = asyncprofiler.NewAsyncProfilerTaskClient(client)
+ return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ return fmt.Errorf("unsupport forward")
+}
+
+func (f *Forwarder) SyncForward(e *v1.SniffData) (*v1.SniffData,
grpc.ClientStream, error) {
+ switch requestData := e.GetData().(type) {
+ case *v1.SniffData_AsyncProfilerTaskCommandQuery:
+ query := requestData.AsyncProfilerTaskCommandQuery
+ commands, err :=
f.profilingClient.GetAsyncProfilerTaskCommands(context.Background(), query)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return &v1.SniffData{Data: &v1.SniffData_Commands{Commands:
commands}}, nil, nil
+ case *v1.SniffData_AsyncProfilerData:
+ // metadata
+ ctx := context.WithValue(context.Background(),
"BidirectionalStream", true)
Review Comment:
This key should reference the const which defined in the grpc client
package.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]