This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new a4b2c32 polish plugins for e2e tests (#19)
a4b2c32 is described below
commit a4b2c323b8e579c59a93b8ecb918d1790bffbc46
Author: Evan <[email protected]>
AuthorDate: Sun Jan 24 12:36:15 2021 +0800
polish plugins for e2e tests (#19)
* add polish plugins
* rename configuration to distinguish the concept
* fix unit test
Co-authored-by: Evan <[email protected]>
---
configs/satellite_config.yaml | 33 ++++++-------
docs/plugin-description.md | 55 +++++++++++++++++++++-
internal/satellite/config/loader_test.go | 53 ++++++++++-----------
plugins/client/grpc/README.md | 1 -
plugins/client/kafka/client.go | 4 +-
plugins/fallbacker/fallbacker_repository.go | 4 +-
.../none/none_fallbacker.go} | 38 +++++++++------
plugins/filter/sampling/README.md | 1 -
plugins/forwarder/forwarder_repository.go | 4 +-
.../kafka/{log => nativelog}/sync_forwarder.go | 4 +-
plugins/forwarder/segment/README.md | 1 -
plugins/parser/gork/README.md | 1 -
plugins/receiver/http/{ => nativcelog}/receiver.go | 28 ++++++-----
.../http/{ => nativcelog}/receiver_test.go | 6 +--
plugins/receiver/receiver_repository.go | 4 ++
plugins/server/http/server.go | 2 +-
plugins/server/prometheus/prometheus.go | 2 +-
plugins/server/server_repository.go | 4 ++
18 files changed, 158 insertions(+), 87 deletions(-)
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 5a1fbac..fc48810 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -21,7 +21,7 @@ logger:
level: "info"
telemetry:
- namespace: namespaces
+ cluster: cluster1
service: service1
instance: instance1
@@ -29,33 +29,34 @@ sharing:
common_config:
pipe_name: sharing
clients:
- - plugin_name: "grpc-client"
- k: v
+ - plugin_name: "kafka-client"
+ brokers: 127.0.0.1:9092
+ version: 2.1.1
servers:
- plugin_name: "grpc-server"
- k: v
+ - plugin_name: "prometheus-server"
+ address: ":8090"
pipes:
- common_config:
- pipe_name: namespace1
+ pipe_name: pipe1
gatherer:
server_name: "grpc-server"
receiver:
- plugin_name: segment-receiver
- k: v
+ plugin_name: "grpc-nativelog-receiver"
queue:
- plugin_name: mmap-queue
- key: value
+ plugin_name: "mmap-queue"
+ segment_size: 524288
+ max_in_mem_segments: 6
+ queue_dir: "pipe1-log-grpc-receiver-queue"
processor:
filters:
- - plugin_name: filtertype1
- key: value
sender:
fallbacker:
plugin_name: none-fallbacker
flush_time: 1000
- max_buffer_size: 100
- min_flush_events: 30
- client_name: grpc-client
+ max_buffer_size: 200
+ min_flush_events: 100
+ client_name: kafka-client
forwarders:
- - plugin_name: segment-forwarder
- key: value
+ - plugin_name: nativelog-kafka-forwarder
+ topic: log-topic
\ No newline at end of file
diff --git a/docs/plugin-description.md b/docs/plugin-description.md
index ca87cf5..0b7506a 100755
--- a/docs/plugin-description.md
+++ b/docs/plugin-description.md
@@ -47,12 +47,18 @@ client_id: Satellite
compression_codec: 0
# How frequently to refresh the cluster metadata in the background. Defaults
to 10 minutes. The unit is minute.
-# refresh_period: 10
+refresh_period: 10
# InsecureSkipVerify controls whether a client verifies the server's
certificate chain and host name.
insecure_skip_verify: true
```
# api.Fallbacker
+## none-fallbacker
+### description
+this is a nothing to do fallbacker.
+### defaultConfig
+```yaml```
+# api.Fallbacker
## timer-fallbacker
### description
this is a timer fallback trigger when forward fails.
@@ -62,7 +68,7 @@ max_times: 3
latency_factor: 2000
```
# api.Forwarder
-## log-kafka-forwarder
+## nativelog-kafka-forwarder
### description
this is a synchronization Kafka log forwarder.
### defaultConfig
@@ -91,6 +97,51 @@ queue_dir: satellite-mmap-queue
# The max size of the input event. Default value is 20k.
max_event_size: 20480
```
+# api.Receiver
+## grpc-nativelog-receiver
+### description
+This is a receiver for SkyWalking native logging format, which is defined at
https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto.
+### defaultConfig
+```yaml```
+# api.Receiver
+## http-log-receiver
+### description
+This is a receiver for SkyWalking http logging format, which is defined at
https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto.
+### defaultConfig
+```yaml
+# The native log request URI.
+uri: "/logging"
+# The request timeout seconds.
+timeout: 5
+```
+# api.Server
+## grpc-server
+### description
+this is a grpc server
+### defaultConfig
+```yaml
+# The address of grpc server. Default value is :8000
+address: :8000
+# The network of grpc. Default value is :tcp
+network: tcp
+# The max size of receiving log. Default value is 2M. The unit is Byte.
+max_recv_msg_size: 2097152
+# The max concurrent stream channels.
+max_concurrent_streams: 32
+# The TLS cert file path.
+tls_cert_file:
+# The TLS key file path.
+tls_key_file:
+```
+# api.Server
+## http-server
+### description
+this is a http server.
+### defaultConfig
+```yaml
+# The http server address.
+address: ":8080"
+```
# api.Server
## prometheus-server
### description
diff --git a/internal/satellite/config/loader_test.go
b/internal/satellite/config/loader_test.go
index 7016168..1249e20 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -71,7 +71,7 @@ func params() *SatelliteConfig {
Level: "info",
},
Telemetry: &telemetry.Config{
- Cluster: "namesspace",
+ Cluster: "cluster1",
Service: "service1",
Instance: "instance1",
},
@@ -81,15 +81,20 @@ func params() *SatelliteConfig {
},
Clients: []plugin.Config{
{
- "plugin_name": "grpc-client",
- "k": "v",
+ "plugin_name":
"kafka-client",
+ "brokers":
"127.0.0.1:9092",
+ "version": "2.1.1",
"commonfields_pipe_name": "sharing",
},
},
Servers: []plugin.Config{
{
"plugin_name": "grpc-server",
- "k": "v",
+ "commonfields_pipe_name": "sharing",
+ },
+ {
+ "plugin_name":
"prometheus-server",
+ "address": ":8090",
"commonfields_pipe_name": "sharing",
},
},
@@ -97,54 +102,48 @@ func params() *SatelliteConfig {
Pipes: []*PipeConfig{
{
PipeCommonConfig: config.CommonFields{
- PipeName: "namespace1",
+ PipeName: "pipe1",
},
Gatherer: &gatherer.GathererConfig{
ServerName: "grpc-server",
CommonFields: config.CommonFields{
- PipeName: "namespace1",
+ PipeName: "pipe1",
},
ReceiverConfig: plugin.Config{
- "plugin_name":
"segment-receiver",
- "k": "v",
- "commonfields_pipe_name":
"namespace1",
+ "plugin_name":
"grpc-nativelog-receiver",
+ "commonfields_pipe_name":
"pipe1",
},
QueueConfig: plugin.Config{
"plugin_name":
"mmap-queue",
- "key":
"value",
- "commonfields_pipe_name":
"namespace1",
+ "segment_size":
524288,
+ "max_in_mem_segments": 6,
+ "queue_dir":
"pipe1-log-grpc-receiver-queue",
+ "commonfields_pipe_name":
"pipe1",
},
},
Processor: &processor.ProcessorConfig{
CommonFields: config.CommonFields{
- PipeName: "namespace1",
- },
- FilterConfig: []plugin.Config{
- {
- "plugin_name":
"filtertype1",
- "key":
"value",
-
"commonfields_pipe_name": "namespace1",
- },
+ PipeName: "pipe1",
},
},
Sender: &sender.SenderConfig{
CommonFields: config.CommonFields{
- PipeName: "namespace1",
+ PipeName: "pipe1",
},
FallbackerConfig: plugin.Config{
- "commonfields_pipe_name":
"namespace1",
+ "commonfields_pipe_name":
"pipe1",
"plugin_name":
"none-fallbacker",
},
- MaxBufferSize: 100,
- MinFlushEvents: 30,
FlushTime: 1000,
- ClientName: "grpc-client",
+ MaxBufferSize: 200,
+ MinFlushEvents: 100,
+ ClientName: "kafka-client",
ForwardersConfig: []plugin.Config{
{
- "plugin_name":
"segment-forwarder",
- "key":
"value",
-
"commonfields_pipe_name": "namespace1",
+ "plugin_name":
"nativelog-kafka-forwarder",
+ "topic":
"log-topic",
+
"commonfields_pipe_name": "pipe1",
},
},
},
diff --git a/plugins/client/grpc/README.md b/plugins/client/grpc/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/client/grpc/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/client/kafka/client.go b/plugins/client/kafka/client.go
index 5fcb280..c898442 100644
--- a/plugins/client/kafka/client.go
+++ b/plugins/client/kafka/client.go
@@ -25,6 +25,7 @@ import (
"github.com/Shopify/sarama"
"github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/plugins/client/api"
)
@@ -111,7 +112,7 @@ client_id: Satellite
compression_codec: 0
# How frequently to refresh the cluster metadata in the background. Defaults
to 10 minutes. The unit is minute.
-# refresh_period: 10
+refresh_period: 10
# InsecureSkipVerify controls whether a client verifies the server's
certificate chain and host name.
insecure_skip_verify: true
@@ -123,6 +124,7 @@ func (c *Client) Prepare() error {
if err != nil {
return fmt.Errorf("cannot init the kafka producer: %v", err)
}
+ sarama.Logger = log.Logger
client, err := sarama.NewClient(strings.Split(c.Brokers, ","), cfg)
if err != nil {
return fmt.Errorf("cannot init the kafka client: %v", err)
diff --git a/plugins/fallbacker/fallbacker_repository.go
b/plugins/fallbacker/fallbacker_repository.go
index 92e7109..9ff9911 100644
--- a/plugins/fallbacker/fallbacker_repository.go
+++ b/plugins/fallbacker/fallbacker_repository.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+ "github.com/apache/skywalking-satellite/plugins/fallbacker/none"
"github.com/apache/skywalking-satellite/plugins/fallbacker/timer"
)
@@ -30,7 +31,8 @@ func RegisterFallbackerPlugins() {
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Fallbacker)(nil)).Elem())
fallbackers := []api.Fallbacker{
// Please register the fallbacker plugins at here.
- &timer.Fallbacker{},
+ new(none.Fallbacker),
+ new(timer.Fallbacker),
}
for _, fallbacker := range fallbackers {
plugin.RegisterPlugin(fallbacker)
diff --git a/plugins/forwarder/forwarder_repository.go
b/plugins/fallbacker/none/none_fallbacker.go
similarity index 63%
copy from plugins/forwarder/forwarder_repository.go
copy to plugins/fallbacker/none/none_fallbacker.go
index 74fc31a..fdfd444 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/fallbacker/none/none_fallbacker.go
@@ -15,24 +15,32 @@
// specific language governing permissions and limitations
// under the License.
-package forwarder
+package none
import (
- "reflect"
-
- "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
"github.com/apache/skywalking-satellite/plugins/forwarder/api"
- "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/log"
)
-// RegisterForwarderPlugins register the used filter plugins.
-func RegisterForwarderPlugins() {
-
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Forwarder)(nil)).Elem())
- forwarders := []api.Forwarder{
- // Please register the forwarder plugins at here.
- new(log.Forwarder),
- }
- for _, forwarder := range forwarders {
- plugin.RegisterPlugin(forwarder)
- }
+const Name = "none-fallbacker"
+
+type Fallbacker struct {
+ config.CommonFields
+}
+
+func (f *Fallbacker) Name() string {
+ return Name
+}
+
+func (f *Fallbacker) Description() string {
+ return "this is a nothing to do fallbacker."
+}
+
+func (f *Fallbacker) DefaultConfig() string {
+ return ""
+}
+
+func (f *Fallbacker) FallBack(batch event.BatchEvents, forward
api.ForwardFunc) bool {
+ return true
}
diff --git a/plugins/filter/sampling/README.md
b/plugins/filter/sampling/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/filter/sampling/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/forwarder/forwarder_repository.go
b/plugins/forwarder/forwarder_repository.go
index 74fc31a..f69acdd 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -22,7 +22,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/forwarder/api"
- "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/log"
+
"github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
)
// RegisterForwarderPlugins register the used filter plugins.
@@ -30,7 +30,7 @@ func RegisterForwarderPlugins() {
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Forwarder)(nil)).Elem())
forwarders := []api.Forwarder{
// Please register the forwarder plugins at here.
- new(log.Forwarder),
+ new(nativelog.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/kafka/log/sync_forwarder.go
b/plugins/forwarder/kafka/nativelog/sync_forwarder.go
similarity index 97%
rename from plugins/forwarder/kafka/log/sync_forwarder.go
rename to plugins/forwarder/kafka/nativelog/sync_forwarder.go
index 171121e..98979cd 100644
--- a/plugins/forwarder/kafka/log/sync_forwarder.go
+++ b/plugins/forwarder/kafka/nativelog/sync_forwarder.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package log
+package nativelog
import (
"fmt"
@@ -31,7 +31,7 @@ import (
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
)
-const Name = "log-kafka-forwarder"
+const Name = "nativelog-kafka-forwarder"
type Forwarder struct {
config.CommonFields
diff --git a/plugins/forwarder/segment/README.md
b/plugins/forwarder/segment/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/forwarder/segment/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/parser/gork/README.md b/plugins/parser/gork/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/parser/gork/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/receiver/http/receiver.go
b/plugins/receiver/http/nativcelog/receiver.go
similarity index 82%
rename from plugins/receiver/http/receiver.go
rename to plugins/receiver/http/nativcelog/receiver.go
index 20fbc22..7d78e51 100644
--- a/plugins/receiver/http/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package http
+package nativcelog
import (
"fmt"
@@ -38,16 +38,18 @@ import (
const (
Name = "http-log-receiver"
eventName = "http-log-event"
- timeout = 5 * time.Second
- Success = "success"
- Failing = "failing"
+ success = "success"
+ failing = "failing"
)
type Receiver struct {
config.CommonFields
+ // config
+ URI string `mapstructure:"uri"`
+ Timeout int `mapstructure:"timeout"`
+ // components
Server *http_server.Server
OutputChannel chan *protocol.Event
- URI string `mapstructure:"uri"`
}
type Response struct {
@@ -66,15 +68,17 @@ func (r *Receiver) Description() string {
func (r *Receiver) DefaultConfig() string {
return `
-# The http server uri .
+# The native log request URI.
uri: "/logging"
+# The request timeout seconds.
+timeout: 5
`
}
func (r *Receiver) RegisterHandler(server interface{}) {
r.Server = server.(*http_server.Server)
r.OutputChannel = make(chan *protocol.Event)
- r.Server.Server.Handle(r.URI, httpHandler(r))
+ r.Server.Server.Handle(r.URI, r.httpHandler())
}
func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) {
@@ -82,20 +86,20 @@ func ResponseWithJSON(rsp http.ResponseWriter, response
*Response, code int) {
_ = json.NewEncoder(rsp).Encode(response)
}
-func httpHandler(r *Receiver) http.Handler {
+func (r *Receiver) httpHandler() http.Handler {
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
rsp.Header().Set("Content-Type", "application/json")
b, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Logger.Errorf("get http body error: %v", err)
- response := &Response{Status: Failing, Msg: err.Error()}
+ response := &Response{Status: failing, Msg: err.Error()}
ResponseWithJSON(rsp, response, http.StatusBadRequest)
return
}
var data logging.LogData
err = proto.Unmarshal(b, &data)
if err != nil {
- response := &Response{Status: Failing, Msg: err.Error()}
+ response := &Response{Status: failing, Msg: err.Error()}
ResponseWithJSON(rsp, response,
http.StatusInternalServerError)
return
}
@@ -110,10 +114,10 @@ func httpHandler(r *Receiver) http.Handler {
},
}
r.OutputChannel <- e
- response := &Response{Status: Success, Msg: Success}
+ response := &Response{Status: success, Msg: success}
ResponseWithJSON(rsp, response, http.StatusOK)
})
- return http.TimeoutHandler(h, timeout, fmt.Sprintf("Exceeded configured
timeout of %v \n", timeout))
+ return http.TimeoutHandler(h, time.Duration(r.Timeout)*time.Second,
fmt.Sprintf("Exceeded configured timeout of %d seconds", r.Timeout))
}
func (r *Receiver) Channel() <-chan *protocol.Event {
diff --git a/plugins/receiver/http/receiver_test.go
b/plugins/receiver/http/nativcelog/receiver_test.go
similarity index 98%
rename from plugins/receiver/http/receiver_test.go
rename to plugins/receiver/http/nativcelog/receiver_test.go
index a2ece41..d601a55 100644
--- a/plugins/receiver/http/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package http
+package nativcelog
import (
"bytes"
@@ -77,7 +77,7 @@ func TestReceiver_http_RegisterHandler(t *testing.T) {
}
var response Response
_ = json.Unmarshal(result, &response)
- if !cmp.Equal(response.Status, Success) {
+ if !cmp.Equal(response.Status, success) {
panic("the response should be success, but
failing")
}
}()
@@ -122,7 +122,7 @@ func TestReceiver_http_RegisterHandler_failed(t *testing.T)
{
}
var response Response
_ = json.Unmarshal(result, &response)
- if !cmp.Equal(response.Status, Failing) {
+ if !cmp.Equal(response.Status, failing) {
panic("the response should be failing, but success")
}
}
diff --git a/plugins/receiver/receiver_repository.go
b/plugins/receiver/receiver_repository.go
index a178011..bbbfcaf 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -22,6 +22,8 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/receiver/api"
+ grpcnavtivelog
"github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog"
+ httpnavtivelog
"github.com/apache/skywalking-satellite/plugins/receiver/http/nativcelog"
)
// RegisterReceiverPlugins register the used receiver plugins.
@@ -29,6 +31,8 @@ func RegisterReceiverPlugins() {
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Receiver)(nil)).Elem())
receivers := []api.Receiver{
// Please register the receiver plugins at here.
+ new(grpcnavtivelog.Receiver),
+ new(httpnavtivelog.Receiver),
}
for _, receiver := range receivers {
plugin.RegisterPlugin(receiver)
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
index 7d93382..627a023 100644
--- a/plugins/server/http/server.go
+++ b/plugins/server/http/server.go
@@ -37,7 +37,7 @@ func (s *Server) Name() string {
}
func (s *Server) Description() string {
- return "this is a http server for receive logs."
+ return "this is a http server."
}
func (s *Server) DefaultConfig() string {
diff --git a/plugins/server/prometheus/prometheus.go
b/plugins/server/prometheus/prometheus.go
index 5d25063..768fd1b 100644
--- a/plugins/server/prometheus/prometheus.go
+++ b/plugins/server/prometheus/prometheus.go
@@ -65,7 +65,7 @@ func (s *Server) Start() error {
telemetry.Registerer.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
telemetry.Registerer.MustRegister(prometheus.NewGoCollector())
// register prometheus metrics exporter handler.
- s.server.Handle(s.Endpoint, promhttp.HandlerFor(telemetry.Gatherer,
promhttp.HandlerOpts{}))
+ s.server.Handle(s.Endpoint, promhttp.HandlerFor(telemetry.Gatherer,
promhttp.HandlerOpts{ErrorLog: log.Logger}))
go func() {
err := http.ListenAndServe(s.Address, s.server)
if err != nil {
diff --git a/plugins/server/server_repository.go
b/plugins/server/server_repository.go
index 0ac9958..f14be19 100644
--- a/plugins/server/server_repository.go
+++ b/plugins/server/server_repository.go
@@ -22,6 +22,8 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/server/api"
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
+ "github.com/apache/skywalking-satellite/plugins/server/http"
"github.com/apache/skywalking-satellite/plugins/server/prometheus"
)
@@ -31,6 +33,8 @@ func RegisterServerPlugins() {
servers := []api.Server{
// Please register the server plugins at here.
new(prometheus.Server),
+ new(grpc.Server),
+ new(http.Server),
}
for _, server := range servers {
plugin.RegisterPlugin(server)