Hi, We are working on getting pmacct as a netflow producer for pnda.io data pipeline and we are wondering if anyone else had tried to integrate pmacct with pnda.io.
pnda.io use kafka and avro to get data into the pipeline. After getting all components required on pmacct we believe there's something still missing. I'm using the following code just to test and consume the data that is in the pipeline https://github.com/pndaproject/example-kafka-clients/blob/develop/python/consumer.py In theory, when running the above code, we should get an output like the following https://github.com/pndaproject/pnda-guide/blob/develop/producer/data-preparation.md#consumer-example Instead, the script fails with unicode decoding errors ------------------------------------------------------------ ConsumerRecord(topic=u'netflow', partition=0, offset=1591318, timestamp=1521442141690, timestamp_type=0, key=None, value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.160.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8\xea\x8e\x06\x8e \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19 06:36:00\x02&2018-03-19 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None, serialized_key_size=-1, serialized_value_size=246) error ------------------------------------------------------------ Traceback (most recent call last): File "consumer.py", line 115, in run consume_message(message) File "consumer.py", line 78, in consume_message msg = reader.read(decoder) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py", line 445, in read return self.read_data(self.writers_schema, self.readers_schema, decoder) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py", line 490, in read_data return self.read_record(writers_schema, readers_schema, decoder) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py", line 690, in read_record field_val = self.read_data(field.type, readers_field.type, decoder) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py", line 468, in read_data return decoder.read_utf8() File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/avro/io.py", line 233, in read_utf8 return unicode(self.read_bytes(), "utf-8") UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15: invalid start byte Current pnda.io, use some avro plugin when integrating with logstash, in our case we would like to use pmacct instead. We believe, in the case of pmacct something else may required in order to push data correctly to pnda.io kafka. https://github.com/pndaproject/pnda-guide/blob/develop/producer/logstash.md On thing I notice that pnda.io and logstash define as part of the avro/kafka plugin is value_serializer => 'org.apache.kafka.common.serialization.ByteArraySerializer However, i was not able to find out if this is something I can set up with pmacct/kafka integration. I'm attaching some relevant configuration. ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf ! .. plugins: kafka ! aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as, dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net, src_mask, dst_mask, tcpflag s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival ! nfacctd_port: 2055 nfacctd_ip: 10.180.222.10 ! ! kafka_output: avro avro_schema_output_file: ~/pmacct/pnda.avsc kafka_topic: netflow kafka_refresh_time: 60 kafka_history: 1m kafka_history_roundoff: m kafka_broker_host: 10.180.221.130 kafka_broker_port: 9092 ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc { "namespace": "pnda.entity", "type": "record", "name": "event", "fields": [ {"name": "timestamp", "type": "long"}, {"name": "src", "type": "string"}, {"name": "host_ip", "type": "string"}, {"name": "rawdata", "type": "bytes"} ] } ubuntu@ip-10-180-221-190:~/pmacct$ thanks for any light here -- *Jaime Botello** (aka Jimbo) *// *Riot Games*
_______________________________________________ pmacct-discussion mailing list http://www.pmacct.net/#mailinglists
