EvanLjp commented on a change in pull request #53:
URL:
https://github.com/apache/skywalking-satellite/pull/53#discussion_r655399114
##########
File path: plugins/receiver/api/receiver.go
##########
@@ -32,6 +34,9 @@ type Receiver interface {
// RegisterHandler register a handler to the server, such as to handle
a gRPC or an HTTP request
RegisterHandler(server interface{})
+ // RegisterSyncProcessor register the sync processor, receive event and
sync process to sender
+ RegisterSyncProcessor(processor api.SyncProcessor)
Review comment:
The plugin should be a part of the module, so the sync processor API
should be defined in the receiver package,
##########
File path: internal/satellite/module/processor/processor.go
##########
@@ -83,3 +84,18 @@ func (p *Processor) Boot(ctx context.Context) {
func (p *Processor) Shutdown() {
}
+
+func (p *Processor) syncProcessor(e *v1.SniffData) (*v1.SniffData, error) {
+ // build contest
+ c := &event.OutputEventContext{
+ Offset: "",
+ Context: make(map[string]*v1.SniffData),
+ }
+ c.Put(e)
+ // processing the event with filters, that put the necessary events to
OutputEventContext.
+ for _, f := range p.runningFilters {
Review comment:
sync process means invoke remote method, so the lines[89-99] should be
removed
##########
File path: internal/satellite/module/sender/sender.go
##########
@@ -216,3 +217,12 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
return s.input
}
+
+func (s *Sender) SyncProcess(data *v1.SniffData) (*v1.SniffData, error) {
Review comment:
How about adding a new interface in forwarder API called
supportedSyncInvoke to select the sync forwarder
##########
File path: internal/satellite/module/gatherer/api/gatherer.go
##########
@@ -21,13 +21,20 @@ import (
"github.com/apache/skywalking-satellite/internal/satellite/event"
"github.com/apache/skywalking-satellite/internal/satellite/module/api"
queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
+type SyncProcessor func(event *v1.SniffData) (*v1.SniffData, error)
Review comment:
how about renaming to SyncInvoker because of a module API called
Processor. Maybe ambiguous. Same to the similar names.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]