This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 7854f8f  add jvm pipe (#55)
7854f8f is described below

commit 7854f8f486c7d48af998b91b8b4b4a364122ca56
Author: Evan <[email protected]>
AuthorDate: Thu Jul 1 08:55:05 2021 +0800

    add jvm pipe (#55)
---
 configs/satellite_config.yaml                      | 24 +++++-
 .../plugins/forwarder_nativejvm-grpc-forwarder.md  |  5 ++
 docs/en/setup/plugins/plugin-list.md               |  2 +
 .../plugins/receiver_grpc-nativejvm-receiver.md    |  5 ++
 docs/menu.yml                                      |  4 +
 plugins/forwarder/forwarder_repository.go          |  2 +
 plugins/forwarder/grpc/nativejvm/sync_forwarder.go | 85 ++++++++++++++++++++
 .../receiver/grpc/nativejvm/jvm_report_service.go  | 49 ++++++++++++
 plugins/receiver/grpc/nativejvm/receiver.go        | 61 ++++++++++++++
 plugins/receiver/grpc/nativejvm/receiver_test.go   | 92 ++++++++++++++++++++++
 plugins/receiver/receiver_repository.go            |  2 +
 11 files changed, 330 insertions(+), 1 deletion(-)

diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 4981fdf..20cacc5 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -202,4 +202,26 @@ pipes:
       min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
       client_name: grpc-client
       forwarders:
-        - plugin_name: nativeevent-grpc-forwarder
\ No newline at end of file
+        - plugin_name: nativeevent-grpc-forwarder
+  - common_config:
+      pipe_name: jvmpipe
+    gatherer:
+      server_name: "grpc-server"
+      receiver:
+        plugin_name: "grpc-nativejvm-receiver"
+      queue:
+        plugin_name: "memory-queue"
+    processor:
+      filters:
+    sender:
+      fallbacker:
+        plugin_name: none-fallbacker
+      # The time interval between two flush operations. And the time unit is 
millisecond.
+      flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
+      max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
+      min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+      client_name: grpc-client
+      forwarders:
+        - plugin_name: nativejvm-grpc-forwarder
\ No newline at end of file
diff --git a/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md 
b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
new file mode 100755
index 0000000..5999157
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativejvm-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native 
Configuration Discovery Service protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md 
b/docs/en/setup/plugins/plugin-list.md
index dc1f67f..6dca7e3 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -12,6 +12,7 @@
        - [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md)
        - [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md)
        - 
[nativeevent-grpc-forwarder](./forwarder_nativeevent-grpc-forwarder.md)
+       - [nativejvm-grpc-forwarder](./forwarder_nativejvm-grpc-forwarder.md)
        - [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
        - [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
        - 
[nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md)
@@ -25,6 +26,7 @@
 - Receiver
        - [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md)
        - [grpc-nativeevent-receiver](./receiver_grpc-nativeevent-receiver.md)
+       - [grpc-nativejvm-receiver](./receiver_grpc-nativejvm-receiver.md)
        - [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
        - 
[grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md)
        - 
[grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md 
b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
new file mode 100755
index 0000000..a5a8434
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md
@@ -0,0 +1,5 @@
+# Receiver/grpc-nativejvm-receiver
+## Description
+This is a receiver for SkyWalking native jvm format, which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto.
+## DefaultConfig
+```yaml```
diff --git a/docs/menu.yml b/docs/menu.yml
index fa8f23a..0f20958 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -75,6 +75,8 @@ catalog:
                   path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder
                 - name: nativeevent-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativeevent-grpc-forwarder
+                - name: nativejvm-grpc-forwarder
+                  path: /en/setup/plugins/forwarder_nativejvm-grpc-forwarder
                 - name: nativelog-grpc-forwarder
                   path: /en/setup/plugins/forwarder_nativelog-grpc-forwarder
                 - name: nativelog-kafka-forwarder
@@ -99,6 +101,8 @@ catalog:
                   path: /en/setup/plugins/receiver_grpc-nativecds-receiver
                 - name: grpc-nativeevent-receiver
                   path: /en/setup/plugins/receiver_grpc-nativeevent-receiver
+                - name: grpc-nativejvm-receiver
+                  path: /en/setup/plugins/receiver_grpc-nativejvm-receiver
                 - name: grpc-nativelog-receiver
                   path: /en/setup/plugins/receiver_grpc-nativelog-receiver
                 - name: grpc-nativemanagement-receiver
diff --git a/plugins/forwarder/forwarder_repository.go 
b/plugins/forwarder/forwarder_repository.go
index 7aa7469..f9419f9 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -23,6 +23,7 @@ import (
        grpc_meter 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter"
        grpc_nativecds 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds"
        grpc_nativeevent 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeevent"
+       grpc_nativejvm 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm"
        grpc_nativelog 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
        grpc_nativemanagement 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement"
        grpc_nativeprofile 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
@@ -46,6 +47,7 @@ func RegisterForwarderPlugins() {
                new(grpc_nativeprofile.Forwarder),
                new(grpc_nativecds.Forwarder),
                new(grpc_nativeevent.Forwarder),
+               new(grpc_nativejvm.Forwarder),
        }
        for _, forwarder := range forwarders {
                plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativejvm/sync_forwarder.go 
b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
new file mode 100644
index 0000000..1869dcb
--- /dev/null
+++ b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativecds
+
+import (
+       "context"
+       "fmt"
+       "reflect"
+
+       "google.golang.org/grpc"
+
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "nativejvm-grpc-forwarder"
+
+type Forwarder struct {
+       config.CommonFields
+
+       client agent.JVMMetricReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+       return Name
+}
+
+func (f *Forwarder) Description() string {
+       return "This is a synchronization grpc forwarder with the SkyWalking 
native Configuration Discovery Service protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+       return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+       client, ok := connection.(*grpc.ClientConn)
+       if !ok {
+               return fmt.Errorf("the %s is only accepet the grpc client, but 
receive a %s",
+                       f.Name(), reflect.TypeOf(connection).String())
+       }
+       f.client = agent.NewJVMMetricReportServiceClient(client)
+       return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+       for _, e := range batch {
+               jvm := e.GetJvm()
+               _, err := f.client.Collect(context.Background(), jvm)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (f *Forwarder) ForwardType() v1.SniffType {
+       return v1.SniffType_JVMMetricType
+}
+
+func (f *Forwarder) SyncForward(_ *v1.SniffData) (*v1.SniffData, error) {
+       return nil, fmt.Errorf("unsupport sync forward")
+}
+
+func (f *Forwarder) SupportedSyncInvoke() bool {
+       return false
+}
diff --git a/plugins/receiver/grpc/nativejvm/jvm_report_service.go 
b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
new file mode 100644
index 0000000..22e56ad
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/jvm_report_service.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+       "context"
+       "time"
+
+       common "skywalking.apache.org/repo/goapi/collect/common/v3"
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-jvm-event"
+
+type JVMReportService struct {
+       receiveChannel chan *v1.SniffData
+       agent.UnimplementedJVMMetricReportServiceServer
+}
+
+func (j *JVMReportService) Collect(_ context.Context, jvm 
*agent.JVMMetricCollection) (*common.Commands, error) {
+       e := &v1.SniffData{
+               Name:      eventName,
+               Timestamp: time.Now().UnixNano() / 1e6,
+               Meta:      nil,
+               Type:      v1.SniffType_JVMMetricType,
+               Remote:    true,
+               Data: &v1.SniffData_Jvm{
+                       Jvm: jvm,
+               },
+       }
+       j.receiveChannel <- e
+       return &common.Commands{}, nil
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver.go 
b/plugins/receiver/grpc/nativejvm/receiver.go
new file mode 100644
index 0000000..6a6fb61
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       module 
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
+       grpcreceiver 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+const Name = "grpc-nativejvm-receiver"
+
+type Receiver struct {
+       config.CommonFields
+       grpcreceiver.CommonGRPCReceiverFields
+       service *JVMReportService // The gRPC request handler for jvm data.
+}
+
+func (r *Receiver) Name() string {
+       return Name
+}
+
+func (r *Receiver) Description() string {
+       return "This is a receiver for SkyWalking native jvm format, " +
+               "which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto.";
+}
+
+func (r *Receiver) DefaultConfig() string {
+       return ""
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+       r.CommonGRPCReceiverFields = 
*grpcreceiver.InitCommonGRPCReceiverFields(server)
+       r.service = &JVMReportService{receiveChannel: r.OutputChannel}
+       agent.RegisterJVMMetricReportServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *v1.SniffData {
+       return r.OutputChannel
+}
diff --git a/plugins/receiver/grpc/nativejvm/receiver_test.go 
b/plugins/receiver/grpc/nativejvm/receiver_test.go
new file mode 100644
index 0000000..25bc318
--- /dev/null
+++ b/plugins/receiver/grpc/nativejvm/receiver_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "google.golang.org/grpc"
+
+       common "skywalking.apache.org/repo/goapi/collect/common/v3"
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+       _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+       receiver_grpc 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+       receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence 
int, conn *grpc.ClientConn, ctx context.Context) string {
+               client := agent.NewJVMMetricReportServiceClient(conn)
+               data := initData()
+               _, err := client.Collect(ctx, data)
+               if err != nil {
+                       t.Fatalf("cannot send data: %v", err)
+               }
+               return data.String()
+       }, func(data *v1.SniffData) string {
+               return data.GetJvm().String()
+       }, t)
+}
+
+func initData() *agent.JVMMetricCollection {
+       return &agent.JVMMetricCollection{
+               Service:         "demo-service",
+               ServiceInstance: "demo-instance",
+               Metrics: []*agent.JVMMetric{
+                       {
+                               Time: time.Now().Unix() / 1e6,
+                               Cpu: &common.CPU{
+                                       UsagePercent: 99.9,
+                               },
+                               Memory: []*agent.Memory{
+                                       {
+                                               IsHeap:    true,
+                                               Init:      1,
+                                               Max:       2,
+                                               Used:      3,
+                                               Committed: 4,
+                                       },
+                               },
+                               MemoryPool: []*agent.MemoryPool{
+                                       {
+                                               Type:      0,
+                                               Init:      1,
+                                               Max:       2,
+                                               Used:      3,
+                                               Committed: 4,
+                                       },
+                               },
+                               Gc: []*agent.GC{
+                                       {
+                                               Phrase: 2,
+                                               Count:  3,
+                                               Time:   202106111010,
+                                       },
+                               },
+                               Thread: &agent.Thread{
+                                       LiveCount:   1,
+                                       PeakCount:   2,
+                                       DaemonCount: 3,
+                               },
+                       },
+               },
+       }
+}
diff --git a/plugins/receiver/receiver_repository.go 
b/plugins/receiver/receiver_repository.go
index 386193e..e294fd3 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/skywalking-satellite/plugins/receiver/api"
        grpcnativecds 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds"
        grpcnativeevent 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeevent"
+       grpcnativejvm 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm"
        grpcnavtivelog 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
        grpcnativemanagement 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement"
        grpcnativeprofile 
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
@@ -42,6 +43,7 @@ func RegisterReceiverPlugins() {
                new(grpcnativeprofile.Receiver),
                new(grpcnativecds.Receiver),
                new(httpnavtivelog.Receiver),
+               new(grpcnativejvm.Receiver),
                new(grpcnativeevent.Receiver),
        }
        for _, receiver := range receivers {

Reply via email to