This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch kafka-forwarder
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit 5877a1f4430171ca4c64f3daaad0825306b66352
Author: Evan <[email protected]>
AuthorDate: Sat Jan 2 08:47:37 2021 +0800

    add kafka forwarder
---
 dist/LICENSE                                       |   1 +
 dist/licenses/LICENSE-kafka.txt                    |  20 +++
 go.mod                                             |   1 +
 go.sum                                             |  53 ++++++++
 plugins/client/client_repository.go                |   2 +
 plugins/client/kafka/client.go                     | 149 +++++++++++++++++++++
 plugins/client/kafka/client_config.go              | 120 +++++++++++++++++
 .../client_sniffer.go}                             |  39 ++++--
 plugins/client/kakka/README.md                     |   1 -
 plugins/forwarder/forwarder_repository.go          |   2 +
 plugins/forwarder/kafkalog/sync_forwarder.go       |  92 +++++++++++++
 11 files changed, 469 insertions(+), 11 deletions(-)

diff --git a/dist/LICENSE b/dist/LICENSE
index af6f460..c6ce18c 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -238,5 +238,6 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
        urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
        grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
     grandecola (bigqueue) v0.4.0: https://github.com/grandecola/bigqueue MIT
+    Shopify (sarama) v1.27.2: https://github.com/Shopify/sarama  MIT
 
 
diff --git a/dist/licenses/LICENSE-kafka.txt b/dist/licenses/LICENSE-kafka.txt
new file mode 100644
index 0000000..d2bf435
--- /dev/null
+++ b/dist/licenses/LICENSE-kafka.txt
@@ -0,0 +1,20 @@
+Copyright (c) 2013 Shopify
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/go.mod b/go.mod
index 95c5280..2a7d519 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.14
 replace skywalking/network v1.0.0 => ./protocol/gen-codes/skywalking/network
 
 require (
+       github.com/Shopify/sarama v1.27.2
        github.com/golang/protobuf v1.4.3
        github.com/google/go-cmp v0.5.4
        github.com/grandecola/mmap v0.6.0
diff --git a/go.sum b/go.sum
index f71e837..ccf7d3d 100644
--- a/go.sum
+++ b/go.sum
@@ -16,7 +16,11 @@ github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod 
h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/Knetic/govaluate 
v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod 
h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/Shopify/sarama v1.19.0 
h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
 github.com/Shopify/sarama v1.19.0/go.mod 
h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
+github.com/Shopify/sarama v1.27.2 
h1:1EyY1dsxNDUQEv0O/4TsjosHI2CgB1uo9H/v56xzTxc=
+github.com/Shopify/sarama v1.27.2/go.mod 
h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt/Mc93II=
+github.com/Shopify/toxiproxy v2.1.4+incompatible 
h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod 
h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
 github.com/VividCortex/gohistogram v1.0.0/go.mod 
h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod 
h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
@@ -63,14 +67,20 @@ github.com/coreos/pkg 
v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d 
h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod 
h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
 github.com/creack/pty v1.1.7/go.mod 
h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
+github.com/creack/pty v1.1.9/go.mod 
h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod 
h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eapache/go-resiliency v1.1.0 
h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
 github.com/eapache/go-resiliency v1.1.0/go.mod 
h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-resiliency v1.2.0 
h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
+github.com/eapache/go-resiliency v1.2.0/go.mod 
h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 
h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod 
h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod 
h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/edsrzf/mmap-go v1.0.0/go.mod 
h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
 github.com/envoyproxy/go-control-plane v0.6.9/go.mod 
h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
@@ -80,8 +90,10 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod 
h1:cwu0lG7PUMfa9snN8LXBig5y
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
 github.com/fatih/color v1.7.0/go.mod 
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fortytw2/leaktest v1.3.0/go.mod 
h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod 
h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
 github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod 
h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
+github.com/frankban/quicktest v1.10.2/go.mod 
h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
 github.com/fsnotify/fsnotify v1.4.7 
h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -118,7 +130,10 @@ github.com/golang/protobuf v1.4.1/go.mod 
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
 github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.4.3 
h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
 github.com/golang/protobuf v1.4.3/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db 
h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod 
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod 
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -127,6 +142,7 @@ github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.2/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
 github.com/google/go-cmp v0.5.4/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -170,6 +186,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod 
h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX
 github.com/hashicorp/go-syslog v1.0.0/go.mod 
h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
 github.com/hashicorp/go-uuid v1.0.0/go.mod 
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
 github.com/hashicorp/go-uuid v1.0.1/go.mod 
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.2 
h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
+github.com/hashicorp/go-uuid v1.0.2/go.mod 
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
 github.com/hashicorp/go-version v1.2.0/go.mod 
h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
 github.com/hashicorp/go.net v0.0.1/go.mod 
h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
 github.com/hashicorp/golang-lru v0.5.0/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@@ -184,6 +202,8 @@ github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
 github.com/hudl/fargo v1.3.0/go.mod 
h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/influxdata/influxdb1-client 
v0.0.0-20191209144304-8bf82d3c094d/go.mod 
h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
+github.com/jcmturner/gofork v1.0.0 
h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
+github.com/jcmturner/gofork v1.0.0/go.mod 
h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod 
h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/jonboulle/clockwork v0.1.0/go.mod 
h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
 github.com/jpillora/backoff v1.0.0/go.mod 
h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -198,14 +218,18 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod 
h1:SYymIcj16QtmaHHD7aYtjjsJG7V
 github.com/julienschmidt/httprouter v1.3.0/go.mod 
h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
 github.com/kisielk/errcheck v1.1.0/go.mod 
h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.11.0 
h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg=
+github.com/klauspost/compress v1.11.0/go.mod 
h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod 
h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.1/go.mod 
h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod 
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/lightstep/lightstep-tracer-common/golang/gogo 
v0.0.0-20190605223551-bc2310a04743/go.mod 
h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
 github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod 
h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
 github.com/lyft/protoc-gen-validate v0.0.13/go.mod 
h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
@@ -241,6 +265,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod 
h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
 github.com/nats-io/nkeys v0.1.0/go.mod 
h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
 github.com/nats-io/nkeys v0.1.3/go.mod 
h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
 github.com/nats-io/nuid v1.0.1/go.mod 
h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod 
h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/oklog/oklog v0.3.2/go.mod 
h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
 github.com/oklog/run v1.0.0/go.mod 
h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
 github.com/oklog/ulid v1.3.1/go.mod 
h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
@@ -264,7 +289,10 @@ github.com/pelletier/go-toml v1.2.0 
h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181
 github.com/pelletier/go-toml v1.2.0/go.mod 
h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/performancecopilot/speed v3.0.0+incompatible/go.mod 
h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
 github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod 
h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
+github.com/pierrec/lz4 v2.0.5+incompatible 
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pierrec/lz4 v2.5.2+incompatible 
h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
+github.com/pierrec/lz4 v2.5.2+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -305,7 +333,10 @@ github.com/prometheus/procfs v0.1.3/go.mod 
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
 github.com/prometheus/procfs v0.2.0 
h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4=
 github.com/prometheus/procfs v0.2.0/go.mod 
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
 github.com/prometheus/tsdb v0.7.1/go.mod 
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a 
h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod 
h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 
h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
+github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod 
h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod 
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/go-internal v1.3.0/go.mod 
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/russross/blackfriday/v2 v2.0.1 
h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
@@ -352,6 +383,7 @@ github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/subosito/gotenv v1.2.0 
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod 
h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod 
h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@@ -361,6 +393,8 @@ github.com/urfave/cli v1.22.1 
h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
 github.com/urfave/cli v1.22.1/go.mod 
h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
 github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
 github.com/urfave/cli/v2 v2.3.0/go.mod 
h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod 
h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
+github.com/xdg/stringprep v1.0.0/go.mod 
h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod 
h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
@@ -387,6 +421,8 @@ golang.org/x/crypto 
v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a 
h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM=
+golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod 
h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -428,6 +464,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344 
h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20200904194848-62affa334b73 
h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
+golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 
h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
@@ -469,6 +507,8 @@ golang.org/x/text v0.3.0/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -500,6 +540,7 @@ golang.org/x/xerrors 
v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.3.1/go.mod 
h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
 google.golang.org/api v0.4.0/go.mod 
h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod 
h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
@@ -555,12 +596,22 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 
h1:qIbj1fsPNlZgppZ+VLlY7N33
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod 
h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod 
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
 gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/jcmturner/aescts.v1 v1.0.1 
h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
+gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod 
h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1 
h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod 
h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
+gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod 
h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0 
h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod 
h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/rpc.v1 v1.1.0 
h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
+gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod 
h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
 gopkg.in/resty.v1 v1.12.0/go.mod 
h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod 
h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/warnings.v0 v0.1.2/go.mod 
h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
@@ -573,6 +624,8 @@ gopkg.in/yaml.v2 v2.2.4/go.mod 
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/plugins/client/client_repository.go 
b/plugins/client/client_repository.go
index c766803..1300b37 100644
--- a/plugins/client/client_repository.go
+++ b/plugins/client/client_repository.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/skywalking-satellite/internal/pkg/plugin"
        "github.com/apache/skywalking-satellite/plugins/client/api"
+       "github.com/apache/skywalking-satellite/plugins/client/kafka"
 )
 
 // RegisterClientPlugins register the used client plugins.
@@ -29,6 +30,7 @@ func RegisterClientPlugins() {
        plugin.RegisterPluginCategory(reflect.TypeOf((*api.Client)(nil)).Elem())
        clients := []api.Client{
                // Please register the client plugins at here.
+               new(kafka.Client),
        }
        for _, client := range clients {
                plugin.RegisterPlugin(client)
diff --git a/plugins/client/kafka/client.go b/plugins/client/kafka/client.go
new file mode 100644
index 0000000..4e6f8ea
--- /dev/null
+++ b/plugins/client/kafka/client.go
@@ -0,0 +1,149 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kafka
+
+import (
+       "context"
+       "fmt"
+       "strings"
+
+       "github.com/Shopify/sarama"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+type Client struct {
+       config.CommonFields
+       // config
+       Brokers            string `mapstructure:"brokers"`              // The 
Kafka broker addresses (default `localhost:9092`).
+       Version            string `mapstructure:"version"`              // The 
version should follow this pattern, which is `major.minor.veryMinor.patch`.
+       EnableTLS          bool   `mapstructure:"enable_TLS"`           // The 
TLS switch (default false).
+       ClientPemPath      string `mapstructure:"client_pem_path"`      // The 
file path of client.pem. The config only works when opening the TLS switch.
+       ClientKeyPath      string `mapstructure:"client_key_path"`      // The 
file path of client.key. The config only works when opening the TLS switch.
+       CaPemPath          string `mapstructure:"ca_pem_path"`          // The 
file path oca.pem. The config only works when opening the TLS switch.
+       RequiredAcks       int16  `mapstructure:"required_acks"`        // 0 
means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
+       MaxRetry           int    `mapstructure:"max_retry"`            // The 
producer max retry times (default 3).
+       RetryBackoff       int    `mapstructure:"retry_backoff"`        // How 
long to wait for the cluster to settle between retries (default 100ms).
+       MaxMessageBytes    int    `mapstructure:"max_message_bytes"`    // The 
max message bytes.
+       IdempotentWrites   bool   `mapstructure:"idempotent_writes"`    // 
Ensure that exactly one copy of each message is written when is true.
+       ClientID           string `mapstructure:"client_id"`            // A 
user-provided string sent with every request to the brokers.
+       CompressionCodec   int    `mapstructure:"compression_codec"`    // 
Represents the various compression codecs recognized by Kafka in messages.
+       RefreshPeriod      int    `mapstructure:"refresh_period"`       // How 
frequently to refresh the cluster metadata.
+       InsecureSkipVerify bool   `mapstructure:"insecure_skip_verify"` // 
Controls whether a client verifies the server's certificate chain and host name.
+
+       // components
+       client    sarama.Client // The kafka producer.
+       listeners []chan<- api.ClientStatus
+       status    api.ClientStatus
+       ctx       context.Context    // Parent ctx
+       cancel    context.CancelFunc // Parent ctx cancel function
+}
+
+func (c *Client) Name() string {
+       return plugin.GetPluginName(c)
+}
+
+func (c *Client) Description() string {
+       return "this is a sharing client to delivery the data to Kafka."
+}
+
+func (c *Client) DefaultConfig() string {
+       return `
+# The Kafka broker addresses (default localhost:9092). Multiple values are 
separated by commas.
+brokers: localhost:9092
+
+# The Kakfa version should follow this pattern, which is 
major_minor_veryMinor_patch (default 1.0.0.0).
+version: 1.0.0.0
+
+# The TLS switch (default false).
+enable_TLS: false
+
+# The file path of client.pem. The config only works when opening the TLS 
switch.
+client_pem_path: ""
+
+# The file path of client.key. The config only works when opening the TLS 
switch.
+client_key_path: ""
+
+# The file path oca.pem. The config only works when opening the TLS switch.
+ca_pem_path: ""
+
+# 0 means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
+required_acks: 1
+
+# The producer max retry times (default 3).
+max_retry: 3
+
+# How long to wait for the cluster to settle between retries (default 100ms). 
Time unit is ms.
+retry_backoff: 100
+
+# The max message bytes.
+max_message_bytes: 1000000
+
+# If enabled, the producer will ensure that exactly one copy of each message 
is written (default false).
+idempotent_writes: false
+
+# A user-provided string sent with every request to the brokers for logging, 
debugging, and auditing purposes (default Satellite).
+client_id: Satellite
+
+# Compression codec represents the various compression codecs recognized by 
Kafka in messages. 0 : None, 1 : Gzip, 2 : Snappy, 3 : LZ4, 4 : ZSTD
+compression_codec: 0
+
+# How frequently to refresh the cluster metadata in the background. Defaults 
to 10 minutes. The unit is minute.
+# refresh_period: 10
+
+# InsecureSkipVerify controls whether a client verifies the server's 
certificate chain and host name.
+insecure_skip_verify: true
+`
+}
+
+func (c *Client) Prepare() error {
+       cfg, err := c.loadConfig()
+       if err != nil {
+               return fmt.Errorf("cannot init the kafka producer: %v", err)
+       }
+       client, err := sarama.NewClient(strings.Split(c.Brokers, ","), cfg)
+       if err != nil {
+               return fmt.Errorf("cannot init the kafka client: %v", err)
+       }
+       c.client = client
+       c.status = api.Connected
+       c.ctx, c.cancel = context.WithCancel(context.Background())
+       c.listeners = make([]chan<- api.ClientStatus, 0)
+       return nil
+}
+
+func (c *Client) Close() error {
+       c.cancel()
+       return c.client.Close()
+}
+
+func (c *Client) GetConnectedClient() interface{} {
+       return c.client
+}
+
+func (c *Client) RegisterListener(listener chan<- api.ClientStatus) {
+       c.listeners = append(c.listeners, listener)
+}
+
+func (c *Client) Start() error {
+       // start supported processes.
+       go c.snifferBrokerStatus()
+       return nil
+}
diff --git a/plugins/client/kafka/client_config.go 
b/plugins/client/kafka/client_config.go
new file mode 100644
index 0000000..df483d4
--- /dev/null
+++ b/plugins/client/kafka/client_config.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kafka
+
+import (
+       "fmt"
+       "os"
+       "time"
+
+       "io/ioutil"
+
+       "crypto/tls"
+       "crypto/x509"
+
+       "github.com/Shopify/sarama"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+// loadConfig use the client params to build the kafka producer config.
+func (c *Client) loadConfig() (*sarama.Config, error) {
+       cfg := sarama.NewConfig()
+       cfg.Producer.Return.Successes = true
+       cfg.Producer.Return.Errors = true
+       cfg.Producer.Idempotent = c.IdempotentWrites
+       cfg.Producer.RequiredAcks = sarama.RequiredAcks(c.RequiredAcks)
+       cfg.Producer.Compression = sarama.CompressionCodec(c.CompressionCodec)
+       if c.MaxRetry > 0 {
+               cfg.Producer.Retry.Max = c.MaxRetry
+       }
+       if c.RetryBackoff > 0 {
+               cfg.Producer.Retry.Backoff = time.Millisecond * 
time.Duration(c.RetryBackoff)
+       }
+       if c.RefreshPeriod > 0 {
+               cfg.Metadata.RefreshFrequency = time.Duration(c.RefreshPeriod) 
* time.Minute
+       }
+       if c.MaxMessageBytes > 0 {
+               cfg.Producer.MaxMessageBytes = c.MaxMessageBytes
+       }
+       if c.ClientID != "" {
+               cfg.ClientID = c.ClientID
+       }
+       if c.Version != "" {
+               if version, err := sarama.ParseKafkaVersion(c.Version); err != 
nil {
+                       log.Logger.Errorf("error in parsing the kafka version, 
the kafka version would be set as default value: %v", err)
+               } else {
+                       cfg.Version = version
+               }
+       }
+       cfg.Net.TLS.Enable = c.EnableTLS
+       if c.EnableTLS {
+               configTLS, err := c.configTLS()
+               if err != nil {
+                       return nil, err
+               }
+               cfg.Net.TLS.Config = configTLS
+       }
+       return cfg, nil
+}
+
+// configTLS loads and parse the TLS configs.
+func (c *Client) configTLS() (tc *tls.Config, tlsErr error) {
+       if err := checkTLSFile(c.CaPemPath); err != nil {
+               return nil, err
+       }
+       if err := checkTLSFile(c.ClientKeyPath); err != nil {
+               return nil, err
+       }
+       if err := checkTLSFile(c.ClientPemPath); err != nil {
+               return nil, err
+       }
+       tlsConfig := new(tls.Config)
+       tlsConfig.Renegotiation = tls.RenegotiateNever
+       tlsConfig.InsecureSkipVerify = c.InsecureSkipVerify
+       caPem, err := ioutil.ReadFile(c.CaPemPath)
+       if err != nil {
+               return nil, err
+       }
+       certPool := x509.NewCertPool()
+       certPool.AppendCertsFromPEM(caPem)
+       tlsConfig.RootCAs = certPool
+
+       clientPem, err := tls.LoadX509KeyPair(c.ClientPemPath, c.ClientKeyPath)
+       if err != nil {
+               return nil, err
+       }
+       tlsConfig.Certificates = []tls.Certificate{clientPem}
+       return tlsConfig, nil
+}
+
+// checkTLSFile checks the TLS files.
+func checkTLSFile(path string) error {
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       stat, err := file.Stat()
+       if err != nil {
+               return err
+       }
+       if stat.Size() == 0 {
+               return fmt.Errorf("the TLS file is illegal: %s", path)
+       }
+       return nil
+}
diff --git a/plugins/client/client_repository.go 
b/plugins/client/kafka/client_sniffer.go
similarity index 54%
copy from plugins/client/client_repository.go
copy to plugins/client/kafka/client_sniffer.go
index c766803..07aed35 100644
--- a/plugins/client/client_repository.go
+++ b/plugins/client/kafka/client_sniffer.go
@@ -15,22 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package client
+package kafka
 
 import (
-       "reflect"
+       "context"
+       "time"
 
-       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
        "github.com/apache/skywalking-satellite/plugins/client/api"
 )
 
-// RegisterClientPlugins register the used client plugins.
-func RegisterClientPlugins() {
-       plugin.RegisterPluginCategory(reflect.TypeOf((*api.Client)(nil)).Elem())
-       clients := []api.Client{
-               // Please register the client plugins at here.
+// snifferBrokerStatus would sniffer the broker status to notify the listeners.
+func (c *Client) snifferBrokerStatus() {
+       ctx, cancel := context.WithCancel(c.ctx)
+       defer cancel()
+       timeTicker := time.NewTicker(time.Duration(c.RefreshPeriod) * 
time.Minute)
+       for {
+               select {
+               case <-timeTicker.C:
+                       brokers := c.client.Brokers()
+                       if len(brokers) == 0 && c.status == api.Connected {
+                               c.status = api.Disconnect
+                               c.notify()
+                       } else if len(brokers) > 0 && c.status == 
api.Disconnect {
+                               c.status = api.Connected
+                               c.notify()
+                       }
+               case <-ctx.Done():
+                       timeTicker.Stop()
+                       return
+               }
        }
-       for _, client := range clients {
-               plugin.RegisterPlugin(client)
+}
+
+// notify the current status to the listeners.
+func (c *Client) notify() {
+       for _, listener := range c.listeners {
+               listener <- c.status
        }
 }
diff --git a/plugins/client/kakka/README.md b/plugins/client/kakka/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/client/kakka/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/forwarder/forwarder_repository.go 
b/plugins/forwarder/forwarder_repository.go
index dfaf1eb..5a36db1 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -22,6 +22,7 @@ import (
 
        "github.com/apache/skywalking-satellite/internal/pkg/plugin"
        "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+       "github.com/apache/skywalking-satellite/plugins/forwarder/kafkalog"
 )
 
 // RegisterForwarderPlugins register the used filter plugins.
@@ -29,6 +30,7 @@ func RegisterForwarderPlugins() {
        
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Forwarder)(nil)).Elem())
        forwarders := []api.Forwarder{
                // Please register the forwarder plugins at here.
+               new(kafkalog.Forwarder),
        }
        for _, forwarder := range forwarders {
                plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/kafkalog/sync_forwarder.go 
b/plugins/forwarder/kafkalog/sync_forwarder.go
new file mode 100644
index 0000000..20985d8
--- /dev/null
+++ b/plugins/forwarder/kafkalog/sync_forwarder.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kafkalog
+
+import (
+       "fmt"
+       "reflect"
+
+       "google.golang.org/protobuf/proto"
+
+       "github.com/Shopify/sarama"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       "github.com/apache/skywalking-satellite/internal/satellite/event"
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+type Forwarder struct {
+       config.CommonFields
+       Topic    string `mapstructure:"topic"` // The forwarder topic.
+       producer sarama.SyncProducer
+}
+
+func (f *Forwarder) Name() string {
+       return plugin.GetPluginName(f)
+}
+
+func (f *Forwarder) Description() string {
+       return "this is a synchronization Kafka log forwarder."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+       return `
+# The remote topic. 
+topic: "log-topic"
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+       client, ok := connection.(sarama.Client)
+       if !ok {
+               return fmt.Errorf("the %s is only accepet the kafka client, but 
receive a %s",
+                       f.Name(), reflect.TypeOf(connection).String())
+       }
+       producer, err := sarama.NewSyncProducerFromClient(client)
+       if err != nil {
+               return err
+       }
+       f.producer = producer
+       return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+       var message []*sarama.ProducerMessage
+       for _, e := range batch {
+               data, ok := e.GetData().(*protocol.Event_Log)
+               if !ok {
+                       continue
+               }
+               bytes, err := proto.Marshal(data.Log)
+               if err != nil {
+                       log.Logger.Errorf("%s serialize the logData fail: %v", 
f.Name(), err)
+                       continue
+               }
+               message = append(message, &sarama.ProducerMessage{
+                       Topic: f.Topic,
+                       Value: sarama.ByteEncoder(bytes),
+               })
+       }
+       return f.producer.SendMessages(message)
+}
+
+func (f *Forwarder) ForwardType() protocol.EventType {
+       return protocol.EventType_Logging
+}

Reply via email to