Hey Jaime,

What you say does make sense to me and would be up to this dev. Can i
ask you if it would be a possibility to access your deployment (since i
do not have the PNDA framework deployed anywhere)? It would make easier
development and subsequent testing. If yes, we can follow up privately.

Paolo
 

On Wed, Mar 21, 2018 at 07:56:42PM -0700, Jaime Botello wrote:
> Hey Paolo,
> 
> I was thinking about this after reading a little bit more on how data is
> deserialized by pnda.io
> 
> For example, if I download the file from pnda hdfs and read it using avro
> tools, you can see pnda(kafka) was not able to deserialize the data.
>  Since Pnda use byte array deserialization, and by reading their logstash
> integration notes, they clearly mention they use byte array serialization,
> don't you think we could fix this by just adding byte array serialization
> into pmacct kafka plugin?
> 
> Let me know if this make sense.
> 
> thanks
> 
> ubuntu@ip-10-180-221-47:~/datasets$ java -jar avro-tools-1.8.2.jar tojson
> f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
> log4j:WARN No appenders could be found for logger
> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> {"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
> deserialize data*"},"payload":"{\"event_type\": \"purge\", \"as_src\":
> 6507, \"as_dst\": 5739, \"peer_ip_src\": \"x.x.x.x\", \"peer_ip_dst\":
> \"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
> \"net_src\": \"x.x.x.x\", \"ip_dst\": \"x.x.x.x\", \"net_dst\":
> \"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
> \"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\", \"tos\":
> 0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
> 22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
> \"timestamp_arrival\": \"2018-03-16 22:18:30.893573\", \"stamp_inserted\":
> \"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
> \"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
> 
> 
> 
> On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <raphael.timo...@gmail.com>
> wrote:
> 
> > I’ve also been keeping a keen eye on the Pnda.io guide page (still blank)
> > for Pmacct… who is actually working on this one @ Jaime?
> >
> > - Tim
> >
> >
> > On 21 Mar 2018, at 12:00 pm, Jaime Botello <jbote...@riotgames.com> wrote:
> >
> > thank you Paolo, I will update the group as well if we have an update.
> >
> > On Tue, Mar 20, 2018 at 4:22 PM, Paolo Lucente <pa...@pmacct.net> wrote:
> >
> >>
> >> Hi Jaime,
> >>
> >> While i can't help you with this, i'm trying to see whether this can be
> >> further investigated with the PNDA team (as a result of that i seem to
> >> understand you were also in touch with them too). Keep you posted in
> >> case of any news.
> >>
> >> Paolo
> >>
> >> On Mon, Mar 19, 2018 at 12:44:57AM -0700, Jaime Botello wrote:
> >> > Hi,
> >> >
> >> > We are working on getting pmacct as a netflow producer for pnda.io data
> >> > pipeline and we are wondering if anyone else had tried to integrate
> >> pmacct
> >> > with pnda.io.
> >> >
> >> > pnda.io use kafka and avro to get data into the pipeline.
> >> >
> >> > After getting all components required on pmacct we believe there's
> >> > something still missing.
> >> >
> >> > I'm using the following code just to test and consume the data that is
> >> in
> >> > the pipeline
> >> >
> >> > https://github.com/pndaproject/example-kafka-clients/blob/
> >> develop/python/consumer.py
> >> >
> >> > In theory, when running the above code, we should get an output like the
> >> > following
> >> > https://github.com/pndaproject/pnda-guide/blob/develop/
> >> producer/data-preparation.md#consumer-example
> >> >
> >> > Instead, the script fails with unicode decoding errors
> >> >
> >> > ------------------------------------------------------------
> >> > ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
> >> > timestamp=1521442141690, timestamp_type=0, key=None,
> >> > value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
> >> 0.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8
> >> \xea\x8e\x06\x8e
> >> > \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
> >> > 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
> >> > 06:36:00\x02&2018-03-19
> >> > 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884', checksum=None,
> >> > serialized_key_size=-1, serialized_value_size=246)
> >> > error
> >> > ------------------------------------------------------------
> >> > Traceback (most recent call last):
> >> >   File "consumer.py", line 115, in run
> >> >     consume_message(message)
> >> >   File "consumer.py", line 78, in consume_message
> >> >     msg = reader.read(decoder)
> >> >   File
> >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
> >> n2.7/site-packages/avro/io.py",
> >> > line 445, in read
> >> >     return self.read_data(self.writers_schema, self.readers_schema,
> >> decoder)
> >> >   File
> >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
> >> n2.7/site-packages/avro/io.py",
> >> > line 490, in read_data
> >> >     return self.read_record(writers_schema, readers_schema, decoder)
> >> >   File
> >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
> >> n2.7/site-packages/avro/io.py",
> >> > line 690, in read_record
> >> >     field_val = self.read_data(field.type, readers_field.type, decoder)
> >> >   File
> >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
> >> n2.7/site-packages/avro/io.py",
> >> > line 468, in read_data
> >> >     return decoder.read_utf8()
> >> >   File
> >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
> >> n2.7/site-packages/avro/io.py",
> >> > line 233, in read_utf8
> >> >     return unicode(self.read_bytes(), "utf-8")
> >> > UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in position 15:
> >> > invalid start byte
> >> >
> >> > Current pnda.io, use some avro plugin when integrating with logstash,
> >> in
> >> > our case we would like to use pmacct instead.   We believe, in the case
> >> of
> >> > pmacct something else may required in order to push data correctly to
> >> > pnda.io kafka.
> >> >
> >> > https://github.com/pndaproject/pnda-guide/blob/develop/
> >> producer/logstash.md
> >> >
> >> > On thing I notice that pnda.io and logstash define as part of the
> >> > avro/kafka plugin is
> >> >
> >> > value_serializer => 'org.apache.kafka.common.seria
> >> lization.ByteArraySerializer
> >> >
> >> >
> >> > However, i was not able to find out if this is something I can set up
> >> with
> >> > pmacct/kafka integration.
> >> >
> >> > I'm attaching some relevant configuration.
> >> >
> >> > ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
> >> > ! ..
> >> > plugins: kafka
> >> > !
> >> > aggregate: src_host, dst_host, src_port, dst_port, proto, tos, src_as,
> >> > dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net, dst_net,
> >> > src_mask, dst_mask, tcpflag
> >> > s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
> >> > !
> >> > nfacctd_port: 2055
> >> > nfacctd_ip: 10.180.222.10
> >> > !
> >> > !
> >> > kafka_output: avro
> >> > avro_schema_output_file: ~/pmacct/pnda.avsc
> >> > kafka_topic: netflow
> >> > kafka_refresh_time: 60
> >> > kafka_history: 1m
> >> > kafka_history_roundoff: m
> >> > kafka_broker_host: 10.180.221.130
> >> > kafka_broker_port: 9092
> >> >
> >> > ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc
> >> > {
> >> >   "namespace": "pnda.entity",
> >> >   "type": "record",
> >> >   "name": "event",
> >> >   "fields": [
> >> >      {"name": "timestamp", "type": "long"},
> >> >      {"name": "src",       "type": "string"},
> >> >      {"name": "host_ip",   "type": "string"},
> >> >      {"name": "rawdata",   "type": "bytes"}
> >> >   ]
> >> > }
> >> > ubuntu@ip-10-180-221-190:~/pmacct$
> >> >
> >> > thanks for any light here
> >> >
> >> > --
> >> > *Jaime Botello** (aka Jimbo) *// *Riot Games*
> >>
> >> > _______________________________________________
> >> > pmacct-discussion mailing list
> >> > http://www.pmacct.net/#mailinglists
> >>
> >>
> >> _______________________________________________
> >> pmacct-discussion mailing list
> >> http://www.pmacct.net/#mailinglists
> >>
> >
> >
> >
> > --
> > *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
> > 310-210-9772 // summoner: Riot R3lick
> > _______________________________________________
> > pmacct-discussion mailing list
> > http://www.pmacct.net/#mailinglists
> >
> >
> >
> 
> 
> -- 
> *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
> 310-210-9772 // summoner: Riot R3lick

_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to