Hey Paolo,

I was thinking about this after reading a little bit more on how data is
deserialized by pnda.io

For example, if I download the file from pnda hdfs and read it using avro
tools, you can see pnda(kafka) was not able to deserialize the data.
 Since Pnda use byte array deserialization, and by reading their logstash
integration notes, they clearly mention they use byte array serialization,
don't you think we could fix this by just adding byte array serialization
into pmacct kafka plugin?

Let me know if this make sense.

thanks

ubuntu@ip-10-180-221-47:~/datasets$ java -jar avro-tools-1.8.2.jar tojson
f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
{"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
deserialize data*"},"payload":"{\"event_type\": \"purge\", \"as_src\":
6507, \"as_dst\": 5739, \"peer_ip_src\": \"x.x.x.x\", \"peer_ip_dst\":
\"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
\"net_src\": \"x.x.x.x\", \"ip_dst\": \"x.x.x.x\", \"net_dst\":
\"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
\"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\", \"tos\":
0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
\"timestamp_arrival\": \"2018-03-16 22:18:30.893573\", \"stamp_inserted\":
\"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
\"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}



On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <raphael.timo...@gmail.com>
wrote:

> I’ve also been keeping a keen eye on the Pnda.io guide page (still blank)
> for Pmacct… who is actually working on this one @ Jaime?
>
> - Tim
>
>
> On 21 Mar 2018, at 12:00 pm, Jaime Botello <jbote...@riotgames.com> wrote:
>
> thank you Paolo, I will update the group as well if we have an update.
>
> On Tue, Mar 20, 2018 at 4:22 PM, Paolo Lucente <pa...@pmacct.net> wrote:
>
>>
>> Hi Jaime,
>>
>> While i can't help you with this, i'm trying to see whether this can be
>> further investigated with the PNDA team (as a result of that i seem to
>> understand you were also in touch with them too). Keep you posted in
>> case of any news.
>>
>> Paolo
>>
>> On Mon, Mar 19, 2018 at 12:44:57AM -0700, Jaime Botello wrote:
>> > Hi,
>> >
>> > We are working on getting pmacct as a netflow producer for pnda.io data
>> > pipeline and we are wondering if anyone else had tried to integrate
>> pmacct
>> > with pnda.io.
>> >
>> > pnda.io use kafka and avro to get data into the pipeline.
>> >
>> > After getting all components required on pmacct we believe there's
>> > something still missing.
>> >
>> > I'm using the following code just to test and consume the data that is
>> in
>> > the pipeline
>> >
>> > https://github.com/pndaproject/example-kafka-clients/blob/
>> develop/python/consumer.py
>> >
>> > In theory, when running the above code, we should get an output like the
>> > following
>> > https://github.com/pndaproject/pnda-guide/blob/develop/
>> producer/data-preparation.md#consumer-example
>> >
>> > Instead, the script fails with unicode decoding errors
>> >
>> > ------------------------------------------------------------
>> > ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
>> > timestamp=1521442141690, timestamp_type=0, key=None,
>> > value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
>> 0.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8
>> \xea\x8e\x06\x8e
>> > \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
>> > 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
>> > 06:36:00\x02&2018-03-19
>> > 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
>> > serialized_key_size=-1, serialized_value_size=246)
>> > error
>> > ------------------------------------------------------------
>> > Traceback (most recent call last):
>> >   File "consumer.py", line 115, in run
>> >     consume_message(message)
>> >   File "consumer.py", line 78, in consume_message
>> >     msg = reader.read(decoder)
>> >   File
>> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> n2.7/site-packages/avro/io.py",
>> > line 445, in read
>> >     return self.read_data(self.writers_schema, self.readers_schema,
>> decoder)
>> >   File
>> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> n2.7/site-packages/avro/io.py",
>> > line 490, in read_data
>> >     return self.read_record(writers_schema, readers_schema, decoder)
>> >   File
>> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> n2.7/site-packages/avro/io.py",
>> > line 690, in read_record
>> >     field_val = self.read_data(field.type, readers_field.type, decoder)
>> >   File
>> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> n2.7/site-packages/avro/io.py",
>> > line 468, in read_data
>> >     return decoder.read_utf8()
>> >   File
>> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> n2.7/site-packages/avro/io.py",
>> > line 233, in read_utf8
>> >     return unicode(self.read_bytes(), "utf-8")
>> > UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
>> > invalid start byte
>> >
>> > Current pnda.io, use some avro plugin when integrating with logstash,
>> in
>> > our case we would like to use pmacct instead.   We believe, in the case
>> of
>> > pmacct something else may required in order to push data correctly to
>> > pnda.io kafka.
>> >
>> > https://github.com/pndaproject/pnda-guide/blob/develop/
>> producer/logstash.md
>> >
>> > On thing I notice that pnda.io and logstash define as part of the
>> > avro/kafka plugin is
>> >
>> > value_serializer => 'org.apache.kafka.common.seria
>> lization.ByteArraySerializer
>> >
>> >
>> > However, i was not able to find out if this is something I can set up
>> with
>> > pmacct/kafka integration.
>> >
>> > I'm attaching some relevant configuration.
>> >
>> > ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
>> > ! ..
>> > plugins: kafka
>> > !
>> > aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
>> > dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
>> > src_mask, dst_mask, tcpflag
>> > s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
>> > !
>> > nfacctd_port: 2055
>> > nfacctd_ip: 10.180.222.10
>> > !
>> > !
>> > kafka_output: avro
>> > avro_schema_output_file: ~/pmacct/pnda.avsc
>> > kafka_topic: netflow
>> > kafka_refresh_time: 60
>> > kafka_history: 1m
>> > kafka_history_roundoff: m
>> > kafka_broker_host: 10.180.221.130
>> > kafka_broker_port: 9092
>> >
>> > ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc
>> > {
>> >   "namespace": "pnda.entity",
>> >   "type": "record",
>> >   "name": "event",
>> >   "fields": [
>> >      {"name": "timestamp", "type": "long"},
>> >      {"name": "src",       "type": "string"},
>> >      {"name": "host_ip",   "type": "string"},
>> >      {"name": "rawdata",   "type": "bytes"}
>> >   ]
>> > }
>> > ubuntu@ip-10-180-221-190:~/pmacct$
>> >
>> > thanks for any light here
>> >
>> > --
>> > *Jaime Botello** (aka Jimbo) *// *Riot Games*
>>
>> > _______________________________________________
>> > pmacct-discussion mailing list
>> > http://www.pmacct.net/#mailinglists
>>
>>
>> _______________________________________________
>> pmacct-discussion mailing list
>> http://www.pmacct.net/#mailinglists
>>
>
>
>
> --
> *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
> 310-210-9772 // summoner: Riot R3lick
> _______________________________________________
> pmacct-discussion mailing list
> http://www.pmacct.net/#mailinglists
>
>
>


-- 
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to