I have a rsyslog -> kafka -> splunk stack working pretty well, I could probably 
answer a few of your questions - 

You can list topics (and a lot of other stuff) on the kafka brokers themselves 
using the kafka-topics.sh script included with kafka.  e.g.:

bin/kafka-topics.sh —zookeeper=localhost:2181 —list
bin/kafka-topics.sh —zookeeper=locahost:2181 —topic “topic” —describe

I’d recommend using kafka-manager to manage your cluster.  It’ll give you a 
much quicker look in to your topics, your brokers, consumers, and throughput.  
It also makes creating and deleting topics easy.

If you’re not seeing your topics get created the first place I’d look is in the 
kafka broker logs themselves - server.log and kafkaServer.out - then work your 
way back from there.  As you’ve found, omkafka isn’t terribly verbose when it 
comes to error reporting.  For your timeout issues, the first place I’d look to 
is the local firewall configuration.  Also in your "broker=["cluster_kafka”]” 
portion are you including the port number for the broker (I’m assuming 9092)?

Andrew Griffin
  ETS / Integration Services
☏ 408-783-8348

> On Jan 30, 2017, at 6:38 AM, mostolog--- via rsyslog 
> <[email protected]> wrote:
> 
> Hi
> 
> *Do any of you have a working example of rsyslog (omkafka) -> kafka <- 
> losgstash ?*
> 
> So far I have been able to deploy a zookeper cluster using docker, deploy a 
> kafka cluster (to be honest, I don't know how to test if it's working as a 
> cluster or if I failed miserably), rsyslog as producer and logstash as 
> consumer.
> 
> But rsyslog doesn't show anything about kafka in logs, I'm not able to list 
> topics within kafka (although they are supposed to be automatically created) 
> and logstash, on the other side, doesn't show any events on debug.
> 
> Stats dump:
> 
>   Mon Jan 30 15:04:49 2017: global: origin=dynstats
>   Mon Jan 30 15:04:49 2017: omkafka: submitted=7016 maxoutqsize=3010
>   failures=0 topicdynacache.skipped=7014 topicdynacache.miss=2
>   topicdynacache.evicted=0
>   Mon Jan 30 15:04:49 2017: kafka: origin=core.action processed=7016
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:04:49 2017: error: origin=core.action processed=0
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:04:49 2017: unk: origin=core.action processed=0
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:04:49 2017: imrelp[20514]: origin=imrelp submitted=7016
>   Mon Jan 30 15:04:49 2017: resource-usage: origin=impstats
>   utime=2116000 stime=1580000 maxrss=12596 minflt=2582 majflt=0
>   inblock=10 oublock=5201 nvcsw=38958 nivcsw=1102
>   Mon Jan 30 15:04:49 2017: relp[DA]: origin=core.queue size=0
>   enqueued=0 full=0 discarded.full=0 discarded.nf=0 maxqsize=0
>   Mon Jan 30 15:04:49 2017: relp: origin=core.queue size=0
>   enqueued=7016 full=0 discarded.full=0 discarded.nf=0 maxqsize=504
>   Mon Jan 30 15:04:49 2017: main Q: origin=core.queue size=13
>   enqueued=111 full=0 discarded.full=0 discarded.nf=0 maxqsize=14
>   Mon Jan 30 15:06:30 2017: global: origin=dynstats
>   Mon Jan 30 15:06:30 2017: omkafka: submitted=7884 maxoutqsize=3010
>   failures=0 topicdynacache.skipped=7882 topicdynacache.miss=2
>   topicdynacache.evicted=0
>   Mon Jan 30 15:06:30 2017: kafka: origin=core.action processed=7884
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:06:30 2017: error: origin=core.action processed=0
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:06:30 2017: unk: origin=core.action processed=0
>   failed=0 suspended=0 suspended.duration=0 resumed=0
>   Mon Jan 30 15:06:30 2017: imrelp[20514]: origin=imrelp submitted=7884
>   Mon Jan 30 15:06:30 2017: resource-usage: origin=impstats
>   utime=2356000 stime=1824000 maxrss=12860 minflt=2649 majflt=0
>   inblock=10 oublock=6208 nvcsw=43871 nivcsw=1231
>   Mon Jan 30 15:06:30 2017: relp[DA]: origin=core.queue size=0
>   enqueued=0 full=0 discarded.full=0 discarded.nf=0 maxqsize=0
>   Mon Jan 30 15:06:30 2017: relp: origin=core.queue size=0
>   enqueued=7884 full=0 discarded.full=0 discarded.nf=0 maxqsize=504
>   Mon Jan 30 15:06:30 2017: main Q: origin=core.queue size=13
>   enqueued=125 full=0 discarded.full=0 discarded.nf=0 maxqsize=14
> 
> rsyslog.conf
>    template(name="topic" type="string" string="foo")
>    ...
>    action(
>        name="kafka"
>        type="omkafka"
>        broker=["cluster_kafka"]
>        dynatopic="on"
>        topic="topic"
>        dynatopic.cachesize="300"
>        template="json"
>        errorFile="/data/kafka-error.json"
>        partitions.auto="on"
>    )
> 
> kafka-error.json
> 
>   { "errcode": -192, "errmsg": "Local: Message timed out", "data": "{
>   \"app\": \"myapp\",\"originalmsg\":
>   \"<167>2017-01-30T15:31:22.927536+01:00 host app[pid]: conn=4311996
>   fd=20 closed\" }\n" }
> 
> kafka.properties
> 
>   broker.id=9
>   zookeeper.connect=my_zookeeper_cluster:2181
>   listeners=PLAINTEXT://10.0.0.9:9092
> 
> Regards
> 
> _______________________________________________
> rsyslog mailing list
> http://lists.adiscon.net/mailman/listinfo/rsyslog
> http://www.rsyslog.com/professional-services/
> What's up with rsyslog? Follow https://twitter.com/rgerhards
> NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
> sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T 
> LIKE THAT.

Attachment: smime.p7s
Description: S/MIME cryptographic signature

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to