This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push: new e31b654 feature: add support for segmentio-kafka (#176) e31b654 is described below commit e31b654bebf92f378c99e98df3747b02b55a983c Author: Starry <codeprince2...@163.com> AuthorDate: Sun Mar 17 15:42:13 2024 +0800 feature: add support for segmentio-kafka (#176) --- .github/workflows/plugin-tests.yaml | 1 + CHANGES.md | 1 + docs/en/agent/support-plugins.md | 1 + go.work | 2 + plugins/segmentio-kafka/go.mod | 9 ++ plugins/segmentio-kafka/go.sum | 59 ++++++++ plugins/segmentio-kafka/instrument.go | 78 +++++++++++ plugins/segmentio-kafka/reader_interceptor.go | 74 +++++++++++ plugins/segmentio-kafka/writer_interceptor.go | 80 +++++++++++ .../scenarios/segmentio-kafka/bin/startup.sh | 22 +++ .../scenarios/segmentio-kafka/config/excepted.yml | 74 +++++++++++ test/plugins/scenarios/segmentio-kafka/go.mod | 9 ++ test/plugins/scenarios/segmentio-kafka/go.sum | 59 ++++++++ test/plugins/scenarios/segmentio-kafka/main.go | 148 +++++++++++++++++++++ test/plugins/scenarios/segmentio-kafka/plugin.yml | 40 ++++++ tools/go-agent/instrument/plugins/register.go | 2 + 16 files changed, 659 insertions(+) diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml index 737184d..c58c498 100644 --- a/.github/workflows/plugin-tests.yaml +++ b/.github/workflows/plugin-tests.yaml @@ -99,6 +99,7 @@ jobs: - rocketmq - amqp - pulsar + - segmentio-kafka steps: - uses: actions/checkout@v2 with: diff --git a/CHANGES.md b/CHANGES.md index 467c25f..f823c49 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ Release Notes. ------------------ #### Plugins * Support [Pulsar](https://github.com/apache/pulsar-client-go) MQ. +* Support [Segmentio-Kafka](https://github.com/segmentio/kafka-go) MQ. 0.4.0 ------------------ diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md index 92502f1..3eb8dbc 100644 --- a/docs/en/agent/support-plugins.md +++ b/docs/en/agent/support-plugins.md @@ -31,6 +31,7 @@ metrics based on the tracing data. * `rocketMQ`: [rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested v2.1.2. * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0. * `pulsar`: [pulsar-client-go](https://github.com/apache/pulsar-client-go) tested v0.12.0. + * `segmentio-kafka`: [segmentio-kafka](https://github.com/segmentio/kafka-go) tested v0.4.47. # Metrics Plugins The meter plugin provides the advanced metrics collections. diff --git a/go.work b/go.work index fd0ce8b..29fd8a3 100644 --- a/go.work +++ b/go.work @@ -26,6 +26,7 @@ use ( ./plugins/rocketmq ./plugins/amqp ./plugins/pulsar + ./plugins/segmentio-kafka ./test/benchmark-codebase/consumer ./test/benchmark-codebase/provider @@ -58,6 +59,7 @@ use ( ./test/plugins/scenarios/rocketmq ./test/plugins/scenarios/amqp ./test/plugins/scenarios/pulsar + ./test/plugins/scenarios/segmentio-kafka ./tools/go-agent diff --git a/plugins/segmentio-kafka/go.mod b/plugins/segmentio-kafka/go.mod new file mode 100644 index 0000000..cd8cffb --- /dev/null +++ b/plugins/segmentio-kafka/go.mod @@ -0,0 +1,9 @@ +module github.com/apache/skywalking-go/plugins/segmentio-kafka + +go 1.18 + +require ( + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/segmentio/kafka-go v0.4.47 // indirect +) diff --git a/plugins/segmentio-kafka/go.sum b/plugins/segmentio-kafka/go.sum new file mode 100644 index 0000000..cf392a8 --- /dev/null +++ b/plugins/segmentio-kafka/go.sum @@ -0,0 +1,59 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugins/segmentio-kafka/instrument.go b/plugins/segmentio-kafka/instrument.go new file mode 100644 index 0000000..a7ff008 --- /dev/null +++ b/plugins/segmentio-kafka/instrument.go @@ -0,0 +1,78 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package segmentiokafka + +import ( + "embed" + + "github.com/apache/skywalking-go/plugins/core/instrument" +) + +//go:embed * +var fs embed.FS + +//skywalking:nocopy +type Instrument struct { +} + +func NewInstrument() *Instrument { + return &Instrument{} +} + +func (i *Instrument) Name() string { + return "segmentio_kafka" +} + +func (i *Instrument) BasePackage() string { + return "github.com/segmentio/kafka-go" +} + +func (i *Instrument) VersionChecker(version string) bool { + return true +} + +func (i *Instrument) Points() []*instrument.Point { + return []*instrument.Point{ + { + PackageName: "kafka", + At: instrument.NewMethodEnhance("*Writer", "WriteMessages", + instrument.WithArgsCount(2), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "...Message"), + instrument.WithResultCount(1), + instrument.WithResultType(0, "error"), + ), + Interceptor: "WriterInterceptor", + }, + { + PackageName: "kafka", + At: instrument.NewMethodEnhance("*Reader", "ReadMessage", + instrument.WithArgsCount(1), + instrument.WithArgType(0, "context.Context"), + instrument.WithResultCount(2), + instrument.WithResultType(0, "Message"), + instrument.WithResultType(1, "error"), + ), + Interceptor: "ReaderInterceptor", + }, + } +} + +func (i *Instrument) FS() *embed.FS { + return &fs +} diff --git a/plugins/segmentio-kafka/reader_interceptor.go b/plugins/segmentio-kafka/reader_interceptor.go new file mode 100644 index 0000000..4a876b4 --- /dev/null +++ b/plugins/segmentio-kafka/reader_interceptor.go @@ -0,0 +1,74 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package segmentiokafka + +import ( + "strings" + + "github.com/segmentio/kafka-go" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + kafkaReaderPrefix = "Kafka/" + kafkaReaderSuffix = "/Consumer" + kafkaReaderComponentID = 41 + semicolon = ";" +) + +type ReaderInterceptor struct { +} + +func (r *ReaderInterceptor) BeforeInvoke(invocation operator.Invocation) error { + return nil +} + +func (r *ReaderInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + reader := invocation.CallerInstance().(*kafka.Reader) + brokers := strings.Join(reader.Config().Brokers, semicolon) + message := result[0].(kafka.Message) + topic := message.Topic + operationName := kafkaReaderPrefix + topic + kafkaReaderSuffix + + span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) { + for _, header := range message.Headers { + if header.Key == headerKey { + return string(header.Value), nil + } + } + return "", nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(kafkaReaderComponentID), + tracing.WithTag(tracing.TagMQBroker, brokers), + tracing.WithTag(tracing.TagMQTopic, topic), + ) + if err != nil { + return err + } + + if err, ok := result[1].(error); ok { + span.Tag(tracing.TagMQStatus, err.Error()) + span.Error(err.Error()) + } + span.SetPeer(brokers) + span.End() + return nil +} diff --git a/plugins/segmentio-kafka/writer_interceptor.go b/plugins/segmentio-kafka/writer_interceptor.go new file mode 100644 index 0000000..78d93d9 --- /dev/null +++ b/plugins/segmentio-kafka/writer_interceptor.go @@ -0,0 +1,80 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package segmentiokafka + +import ( + "github.com/segmentio/kafka-go" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + kafkaWriterPrefix = "Kafka/" + kafkaWriterSuffix = "/Producer" + kafkaWriterComponentID = 40 +) + +type WriterInterceptor struct { +} + +func (w *WriterInterceptor) BeforeInvoke(invocation operator.Invocation) error { + writer := invocation.CallerInstance().(*kafka.Writer) + addr, topic := writer.Addr.String(), writer.Topic + messageList := invocation.Args()[1].([]kafka.Message) + operationName := kafkaWriterPrefix + topic + kafkaWriterSuffix + + span, err := tracing.CreateExitSpan(operationName, addr, func(headerKey, headerValue string) error { + for idx := range messageList { + if len(messageList[idx].Headers) == 0 { + messageList[idx].Headers = []kafka.Header{ + {Key: headerKey, Value: []byte(headerValue)}, + } + } else { + messageList[idx].Headers = append(messageList[idx].Headers, + kafka.Header{Key: headerKey, Value: []byte(headerValue)}) + } + } + return nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(kafkaWriterComponentID), + tracing.WithTag(tracing.TagMQBroker, addr), + tracing.WithTag(tracing.TagMQTopic, topic), + ) + if err != nil { + return err + } + + span.SetPeer(addr) + invocation.SetContext(span) + return nil +} + +func (w *WriterInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + span := invocation.GetContext().(tracing.Span) + if err, ok := result[0].(error); ok && err != nil { + span.Tag(tracing.TagMQStatus, err.Error()) + span.Error(err.Error()) + } + span.End() + return nil +} diff --git a/test/plugins/scenarios/segmentio-kafka/bin/startup.sh b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh new file mode 100755 index 0000000..e758869 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +home="$(cd "$(dirname $0)"; pwd)" +go build ${GO_BUILD_OPTS} -o kafka + +./kafka \ No newline at end of file diff --git a/test/plugins/scenarios/segmentio-kafka/config/excepted.yml b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml new file mode 100644 index 0000000..bb4f7b7 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +segmentItems: + - serviceName: segmentio-kafka + segmentSize: 3 + segments: + - segmentId: not null + spans: + - operationName: Kafka/sw-topic/Producer + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 40 + isError: false + spanType: Exit + peer: kafka-server:9092 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + - operationName: GET:/execute + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 5004 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - { key: http.method, value: GET } + - { key: url, value: 'service:8080/execute' } + - { key: status_code, value: '200' } + - segmentId: not null + spans: + - operationName: Kafka/sw-topic/Consumer + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 41 + isError: false + spanType: Entry + peer: kafka-server:9092 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'kafka-server:9092', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: segmentio-kafka, + traceId: not null } +meterItems: [ ] +logItems: [ ] \ No newline at end of file diff --git a/test/plugins/scenarios/segmentio-kafka/go.mod b/test/plugins/scenarios/segmentio-kafka/go.mod new file mode 100644 index 0000000..0b839b5 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/go.mod @@ -0,0 +1,9 @@ +module test/plugins/scenarios/segmentio-kafka + +go 1.18 + +require ( + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/segmentio/kafka-go v0.4.47 // indirect +) diff --git a/test/plugins/scenarios/segmentio-kafka/go.sum b/test/plugins/scenarios/segmentio-kafka/go.sum new file mode 100644 index 0000000..cf392a8 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/go.sum @@ -0,0 +1,59 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/test/plugins/scenarios/segmentio-kafka/main.go b/test/plugins/scenarios/segmentio-kafka/main.go new file mode 100644 index 0000000..58df214 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/main.go @@ -0,0 +1,148 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "fmt" + "log" + "net" + "net/http" + "strconv" + "time" + + "github.com/segmentio/kafka-go" + + _ "github.com/apache/skywalking-go" +) + +type testFunc func() error + +var ( + url = "kafka-server:9092" + topic = "sw-topic" + msg = "I love skywalking 3 thousand" + ctx = context.Background() + writer *kafka.Writer + reader *kafka.Reader +) + +func main() { + writer = newKafkaWriter(topic) + defer writer.Close() + reader = newKafkaReader() + defer reader.Close() + consumerHelper() + + route := http.NewServeMux() + route.HandleFunc("/execute", func(res http.ResponseWriter, req *http.Request) { + testProduceConsume() + _, _ = res.Write([]byte("execute success")) + }) + route.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) { + _, _ = writer.Write([]byte("ok")) + }) + fmt.Println("start client") + err := http.ListenAndServe(":8080", route) + if err != nil { + fmt.Printf("client start error: %v \n", err) + } +} + +func testProduceConsume() { + tests := []struct { + name string + fn testFunc + }{ + {"simpleMsg", simpleMsg}, + } + for _, test := range tests { + fmt.Printf("excute test case: %s\n", test.name) + if subErr := test.fn(); subErr != nil { + fmt.Printf("test case %s failed: %v", test.name, subErr) + } + } +} + +func simpleMsg() error { + if err := writer.WriteMessages(ctx, kafka.Message{ + Value: []byte(msg), + }); err != nil { + log.Println("simpleMsg WriteMessages error") + return err + } + return nil +} + +func consumerHelper() { + go func() { + for { + if message, err := reader.ReadMessage(ctx); err != nil { + log.Fatal("consumer msg error: ", err) + } else { + fmt.Printf("consumer|topic=%s, partition=%d, offset=%d, key=%s, value=%s, header=%s\n", + message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value), message.Headers) + } + } + }() +} + +func newKafkaReader() *kafka.Reader { + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{url}, + Topic: topic, + CommitInterval: 1 * time.Second, + }) +} + +func newKafkaWriter(topic string) *kafka.Writer { + createTopic() + return &kafka.Writer{ + Addr: kafka.TCP(url), + Topic: topic, + } +} + +func createTopic() { + conn, err := kafka.Dial("tcp", url) + if err != nil { + log.Fatal(fmt.Errorf("createTopic, Dial: %w", err)) + } + defer conn.Close() + controller, err := conn.Controller() + if err != nil { + err = fmt.Errorf("createTopic, conn.Controller: %w", err) + log.Fatal(err) + } + conn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + log.Fatal("kafka.Dial error: ", err) + } + conn.SetDeadline(time.Now().Add(time.Second)) + topicConfigs := []kafka.TopicConfig{ + { + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + err = conn.CreateTopics(topicConfigs...) + if err != nil { + log.Fatal(fmt.Errorf("createTopic error: %w", err)) + } +} diff --git a/test/plugins/scenarios/segmentio-kafka/plugin.yml b/test/plugins/scenarios/segmentio-kafka/plugin.yml new file mode 100644 index 0000000..887e422 --- /dev/null +++ b/test/plugins/scenarios/segmentio-kafka/plugin.yml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute +health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health +start-script: ./bin/startup.sh +framework: github.com/segmentio/kafka-go +export-port: 8080 +support-version: + - go: 1.18 + framework: + - v0.4.47 +dependencies: + zookeeper-server: + image: zookeeper:3.9.2 + hostname: zookeeper-server + kafka-server: + image: bitnami/kafka:3.7.0 + hostname: kafka-server + ports: + - 9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: "zookeeper-server:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LISTENERS: "PLAINTEXT://kafka-server:9092" + depends_on: + - zookeeper-server \ No newline at end of file diff --git a/tools/go-agent/instrument/plugins/register.go b/tools/go-agent/instrument/plugins/register.go index 7726b41..2ed4857 100644 --- a/tools/go-agent/instrument/plugins/register.go +++ b/tools/go-agent/instrument/plugins/register.go @@ -41,6 +41,7 @@ import ( "github.com/apache/skywalking-go/plugins/pulsar" "github.com/apache/skywalking-go/plugins/rocketmq" runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics" + segmentiokafka "github.com/apache/skywalking-go/plugins/segmentio-kafka" sql_entry "github.com/apache/skywalking-go/plugins/sql/entry" sql_mysql "github.com/apache/skywalking-go/plugins/sql/mysql" ) @@ -66,6 +67,7 @@ func init() { registerFramework(rocketmq.NewInstrument()) registerFramework(amqp.NewInstrument()) registerFramework(pulsar.NewInstrument()) + registerFramework(segmentiokafka.NewInstrument()) // fasthttp related instruments registerFramework(fasthttp_client.NewInstrument())