Hi Jaime,

While i can't help you with this, i'm trying to see whether this can be
further investigated with the PNDA team (as a result of that i seem to
understand you were also in touch with them too). Keep you posted in
case of any news.

Paolo

On Mon, Mar 19, 2018 at 12:44:57AM -0700, Jaime Botello wrote:
> Hi,
> 
> We are working on getting pmacct as a netflow producer for pnda.io data
> pipeline and we are wondering if anyone else had tried to integrate pmacct
> with pnda.io.
> 
> pnda.io use kafka and avro to get data into the pipeline.
> 
> After getting all components required on pmacct we believe there's
> something still missing.
> 
> I'm using the following code just to test and consume the data that is in
> the pipeline
> 
> https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py
> 
> In theory, when running the above code, we should get an output like the
> following
> https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example
> 
> Instead, the script fails with unicode decoding errors
> 
> ------------------------------------------------------------
> ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
> timestamp=1521442141690, timestamp_type=0, key=None,
> value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.160.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8\xea\x8e\x06\x8e
> \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
> 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
> 06:36:00\x02&2018-03-19
> 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
> serialized_key_size=-1, serialized_value_size=246)
> error
> ------------------------------------------------------------
> Traceback (most recent call last):
>   File "consumer.py", line 115, in run
>     consume_message(message)
>   File "consumer.py", line 78, in consume_message
>     msg = reader.read(decoder)
>   File
> "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
> line 445, in read
>     return self.read_data(self.writers_schema, self.readers_schema, decoder)
>   File
> "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
> line 490, in read_data
>     return self.read_record(writers_schema, readers_schema, decoder)
>   File
> "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
> line 690, in read_record
>     field_val = self.read_data(field.type, readers_field.type, decoder)
>   File
> "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
> line 468, in read_data
>     return decoder.read_utf8()
>   File
> "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py",
> line 233, in read_utf8
>     return unicode(self.read_bytes(), "utf-8")
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
> invalid start byte
> 
> Current pnda.io, use some avro plugin when integrating with logstash, in
> our case we would like to use pmacct instead.   We believe, in the case of
> pmacct something else may required in order to push data correctly to
> pnda.io kafka.
> 
> https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md
> 
> On thing I notice that pnda.io and logstash define as part of the
> avro/kafka plugin is
> 
> value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer
> 
> 
> However, i was not able to find out if this is something I can set up with
> pmacct/kafka integration.
> 
> I'm attaching some relevant configuration.
> 
> ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
> ! ..
> plugins: kafka
> !
> aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
> dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
> src_mask, dst_mask, tcpflag
> s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
> !
> nfacctd_port: 2055
> nfacctd_ip: 10.180.222.10
> !
> !
> kafka_output: avro
> avro_schema_output_file: ~/pmacct/pnda.avsc
> kafka_topic: netflow
> kafka_refresh_time: 60
> kafka_history: 1m
> kafka_history_roundoff: m
> kafka_broker_host: 10.180.221.130
> kafka_broker_port: 9092
> 
> ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc
> {
>   "namespace": "pnda.entity",
>   "type": "record",
>   "name": "event",
>   "fields": [
>      {"name": "timestamp", "type": "long"},
>      {"name": "src",       "type": "string"},
>      {"name": "host_ip",   "type": "string"},
>      {"name": "rawdata",   "type": "bytes"}
>   ]
> }
> ubuntu@ip-10-180-221-190:~/pmacct$
> 
> thanks for any light here
> 
> -- 
> *Jaime Botello** (aka Jimbo) *// *Riot Games*

> _______________________________________________
> pmacct-discussion mailing list
> http://www.pmacct.net/#mailinglists


_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to