This is an automated email from the ASF dual-hosted git repository. liujiapeng pushed a commit to branch add-jvm-pipe in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit ce094addf8e87c77984152bb9dbe43a1735bc0ad Author: Evan <[email protected]> AuthorDate: Tue Jun 29 16:20:36 2021 +0800 add jvm pipe --- configs/satellite_config.yaml | 24 +++++- .../plugins/forwarder_nativejvm-grpc-forwarder.md | 5 ++ docs/en/setup/plugins/plugin-list.md | 2 + .../plugins/receiver_grpc-nativejvm-receiver.md | 5 ++ docs/menu.yml | 4 + plugins/forwarder/forwarder_repository.go | 2 + plugins/forwarder/grpc/nativejvm/sync_forwarder.go | 85 ++++++++++++++++++++ .../receiver/grpc/nativejvm/jvm_report_service.go | 49 ++++++++++++ plugins/receiver/grpc/nativejvm/receiver.go | 61 ++++++++++++++ plugins/receiver/grpc/nativejvm/receiver_test.go | 92 ++++++++++++++++++++++ plugins/receiver/receiver_repository.go | 2 + 11 files changed, 330 insertions(+), 1 deletion(-) diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml index 7b53e35..d93892c 100644 --- a/configs/satellite_config.yaml +++ b/configs/satellite_config.yaml @@ -180,4 +180,26 @@ pipes: plugin_name: none-fallbacker client_name: grpc-client forwarders: - - plugin_name: nativecds-grpc-forwarder \ No newline at end of file + - plugin_name: nativecds-grpc-forwarder + - common_config: + pipe_name: jvmpipe + gatherer: + server_name: "grpc-server" + receiver: + plugin_name: "grpc-nativejvm-receiver" + queue: + plugin_name: "memory-queue" + processor: + filters: + sender: + fallbacker: + plugin_name: none-fallbacker + # The time interval between two flush operations. And the time unit is millisecond. + flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000} + # The maximum buffer elements. + max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200} + # The minimum flush elements. + min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1} + client_name: grpc-client + forwarders: + - plugin_name: nativejvm-grpc-forwarder \ No newline at end of file diff --git a/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md new file mode 100755 index 0000000..5999157 --- /dev/null +++ b/docs/en/setup/plugins/forwarder_nativejvm-grpc-forwarder.md @@ -0,0 +1,5 @@ +# Forwarder/nativejvm-grpc-forwarder +## Description +This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol. +## DefaultConfig +```yaml``` diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md index b2f6bc7..a3d7833 100755 --- a/docs/en/setup/plugins/plugin-list.md +++ b/docs/en/setup/plugins/plugin-list.md @@ -11,6 +11,7 @@ - Forwarder - [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md) - [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md) + - [nativejvm-grpc-forwarder](./forwarder_nativejvm-grpc-forwarder.md) - [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md) - [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md) - [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md) @@ -23,6 +24,7 @@ - [none-queue](./queue_none-queue.md) - Receiver - [grpc-nativecds-receiver](./receiver_grpc-nativecds-receiver.md) + - [grpc-nativejvm-receiver](./receiver_grpc-nativejvm-receiver.md) - [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md) - [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md) - [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md) diff --git a/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md new file mode 100755 index 0000000..a5a8434 --- /dev/null +++ b/docs/en/setup/plugins/receiver_grpc-nativejvm-receiver.md @@ -0,0 +1,5 @@ +# Receiver/grpc-nativejvm-receiver +## Description +This is a receiver for SkyWalking native jvm format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto. +## DefaultConfig +```yaml``` diff --git a/docs/menu.yml b/docs/menu.yml index 9f7d2e9..d0f6854 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -73,6 +73,8 @@ catalog: path: /en/setup/plugins/forwarder_meter-grpc-forwarder - name: nativecds-grpc-forwarder path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder + - name: nativejvm-grpc-forwarder + path: /en/setup/plugins/forwarder_nativejvm-grpc-forwarder - name: nativelog-grpc-forwarder path: /en/setup/plugins/forwarder_nativelog-grpc-forwarder - name: nativelog-kafka-forwarder @@ -95,6 +97,8 @@ catalog: catalog: - name: grpc-nativecds-receiver path: /en/setup/plugins/receiver_grpc-nativecds-receiver + - name: grpc-nativejvm-receiver + path: /en/setup/plugins/receiver_grpc-nativejvm-receiver - name: grpc-nativelog-receiver path: /en/setup/plugins/receiver_grpc-nativelog-receiver - name: grpc-nativemanagement-receiver diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go index 3be46ba..bc6b181 100644 --- a/plugins/forwarder/forwarder_repository.go +++ b/plugins/forwarder/forwarder_repository.go @@ -22,6 +22,7 @@ import ( grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter" grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds" + grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm" grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog" grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement" grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile" @@ -44,6 +45,7 @@ func RegisterForwarderPlugins() { new(grpc_nativetracing.Forwarder), new(grpc_nativeprofile.Forwarder), new(grpc_nativecds.Forwarder), + new(grpc_nativejvm.Forwarder), } for _, forwarder := range forwarders { plugin.RegisterPlugin(forwarder) diff --git a/plugins/forwarder/grpc/nativejvm/sync_forwarder.go b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go new file mode 100644 index 0000000..1869dcb --- /dev/null +++ b/plugins/forwarder/grpc/nativejvm/sync_forwarder.go @@ -0,0 +1,85 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativecds + +import ( + "context" + "fmt" + "reflect" + + "google.golang.org/grpc" + + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/satellite/event" +) + +const Name = "nativejvm-grpc-forwarder" + +type Forwarder struct { + config.CommonFields + + client agent.JVMMetricReportServiceClient +} + +func (f *Forwarder) Name() string { + return Name +} + +func (f *Forwarder) Description() string { + return "This is a synchronization grpc forwarder with the SkyWalking native Configuration Discovery Service protocol." +} + +func (f *Forwarder) DefaultConfig() string { + return `` +} + +func (f *Forwarder) Prepare(connection interface{}) error { + client, ok := connection.(*grpc.ClientConn) + if !ok { + return fmt.Errorf("the %s is only accepet the grpc client, but receive a %s", + f.Name(), reflect.TypeOf(connection).String()) + } + f.client = agent.NewJVMMetricReportServiceClient(client) + return nil +} + +func (f *Forwarder) Forward(batch event.BatchEvents) error { + for _, e := range batch { + jvm := e.GetJvm() + _, err := f.client.Collect(context.Background(), jvm) + if err != nil { + return err + } + } + return nil +} + +func (f *Forwarder) ForwardType() v1.SniffType { + return v1.SniffType_JVMMetricType +} + +func (f *Forwarder) SyncForward(_ *v1.SniffData) (*v1.SniffData, error) { + return nil, fmt.Errorf("unsupport sync forward") +} + +func (f *Forwarder) SupportedSyncInvoke() bool { + return false +} diff --git a/plugins/receiver/grpc/nativejvm/jvm_report_service.go b/plugins/receiver/grpc/nativejvm/jvm_report_service.go new file mode 100644 index 0000000..22e56ad --- /dev/null +++ b/plugins/receiver/grpc/nativejvm/jvm_report_service.go @@ -0,0 +1,49 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativelog + +import ( + "context" + "time" + + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +const eventName = "grpc-jvm-event" + +type JVMReportService struct { + receiveChannel chan *v1.SniffData + agent.UnimplementedJVMMetricReportServiceServer +} + +func (j *JVMReportService) Collect(_ context.Context, jvm *agent.JVMMetricCollection) (*common.Commands, error) { + e := &v1.SniffData{ + Name: eventName, + Timestamp: time.Now().UnixNano() / 1e6, + Meta: nil, + Type: v1.SniffType_JVMMetricType, + Remote: true, + Data: &v1.SniffData_Jvm{ + Jvm: jvm, + }, + } + j.receiveChannel <- e + return &common.Commands{}, nil +} diff --git a/plugins/receiver/grpc/nativejvm/receiver.go b/plugins/receiver/grpc/nativejvm/receiver.go new file mode 100644 index 0000000..6a6fb61 --- /dev/null +++ b/plugins/receiver/grpc/nativejvm/receiver.go @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativelog + +import ( + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + module "github.com/apache/skywalking-satellite/internal/satellite/module/api" + grpcreceiver "github.com/apache/skywalking-satellite/plugins/receiver/grpc" +) + +const Name = "grpc-nativejvm-receiver" + +type Receiver struct { + config.CommonFields + grpcreceiver.CommonGRPCReceiverFields + service *JVMReportService // The gRPC request handler for jvm data. +} + +func (r *Receiver) Name() string { + return Name +} + +func (r *Receiver) Description() string { + return "This is a receiver for SkyWalking native jvm format, " + + "which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/JVMMetric.proto." +} + +func (r *Receiver) DefaultConfig() string { + return "" +} + +func (r *Receiver) RegisterHandler(server interface{}) { + r.CommonGRPCReceiverFields = *grpcreceiver.InitCommonGRPCReceiverFields(server) + r.service = &JVMReportService{receiveChannel: r.OutputChannel} + agent.RegisterJVMMetricReportServiceServer(r.Server, r.service) +} + +func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) { +} + +func (r *Receiver) Channel() <-chan *v1.SniffData { + return r.OutputChannel +} diff --git a/plugins/receiver/grpc/nativejvm/receiver_test.go b/plugins/receiver/grpc/nativejvm/receiver_test.go new file mode 100644 index 0000000..25bc318 --- /dev/null +++ b/plugins/receiver/grpc/nativejvm/receiver_test.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativelog + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc" + + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" + + _ "github.com/apache/skywalking-satellite/internal/satellite/test" + receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc" +) + +func TestReceiver_RegisterHandler(t *testing.T) { + receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string { + client := agent.NewJVMMetricReportServiceClient(conn) + data := initData() + _, err := client.Collect(ctx, data) + if err != nil { + t.Fatalf("cannot send data: %v", err) + } + return data.String() + }, func(data *v1.SniffData) string { + return data.GetJvm().String() + }, t) +} + +func initData() *agent.JVMMetricCollection { + return &agent.JVMMetricCollection{ + Service: "demo-service", + ServiceInstance: "demo-instance", + Metrics: []*agent.JVMMetric{ + { + Time: time.Now().Unix() / 1e6, + Cpu: &common.CPU{ + UsagePercent: 99.9, + }, + Memory: []*agent.Memory{ + { + IsHeap: true, + Init: 1, + Max: 2, + Used: 3, + Committed: 4, + }, + }, + MemoryPool: []*agent.MemoryPool{ + { + Type: 0, + Init: 1, + Max: 2, + Used: 3, + Committed: 4, + }, + }, + Gc: []*agent.GC{ + { + Phrase: 2, + Count: 3, + Time: 202106111010, + }, + }, + Thread: &agent.Thread{ + LiveCount: 1, + PeakCount: 2, + DaemonCount: 3, + }, + }, + }, + } +} diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go index d6ed000..aa6e7a4 100644 --- a/plugins/receiver/receiver_repository.go +++ b/plugins/receiver/receiver_repository.go @@ -23,6 +23,7 @@ import ( "github.com/apache/skywalking-satellite/internal/pkg/plugin" "github.com/apache/skywalking-satellite/plugins/receiver/api" grpcnativecds "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativecds" + grpcnativejvm "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm" grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog" grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement" grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile" @@ -41,6 +42,7 @@ func RegisterReceiverPlugins() { new(grpcnativeprofile.Receiver), new(grpcnativecds.Receiver), new(httpnavtivelog.Receiver), + new(grpcnativejvm.Receiver), } for _, receiver := range receivers { plugin.RegisterPlugin(receiver)
