Just adding 2 cents here. It seems like quite a few steps and going back
and forth to kafka. You should look at Apache NIFI which will take the data
from pmacct and it will cut down on all of those steps.

On Mon, Mar 26, 2018 at 9:08 AM Jaime Botello <jbote...@riotgames.com>
wrote:

> Hi Paolo,
>
> Outside access to the environment is going to be difficult since is part
> of our production environment, however, we may be able to arrange some
> remote sessions if that's something that may work.
>
> Having said that, we were able to find a workaround that works as follows:
>
> Since pmacct can't serialize the data to something that pnda.io would
> understand, we set up a logstash instance that serves as message
> translation between pmacct and pnda.io.
>
>
>
> Logstash is configured to use pnda-avro codec plugin that supports
> ArrayByteSerialization.  This seems to be working for now and it will
> provide us with some time to figure out how we can integrate
> pmacct directly with pnda.io so we can increase the overall throughput of
> the system and maximize the efficiency.
>
> If there's any interest in the details, I can share some of the
> documentation we are working on right now.
>
> thank you
>
> --Jaime
>
> On Sat, Mar 24, 2018 at 1:28 PM, Paolo Lucente <pa...@pmacct.net> wrote:
>
>>
>> Hey Jaime,
>>
>> What you say does make sense to me and would be up to this dev. Can i
>> ask you if it would be a possibility to access your deployment (since i
>> do not have the PNDA framework deployed anywhere)? It would make easier
>> development and subsequent testing. If yes, we can follow up privately.
>>
>> Paolo
>>
>>
>> On Wed, Mar 21, 2018 at 07:56:42PM -0700, Jaime Botello wrote:
>> > Hey Paolo,
>> >
>> > I was thinking about this after reading a little bit more on how data is
>> > deserialized by pnda.io
>> >
>> > For example, if I download the file from pnda hdfs and read it using
>> avro
>> > tools, you can see pnda(kafka) was not able to deserialize the data.
>> >  Since Pnda use byte array deserialization, and by reading their
>> logstash
>> > integration notes, they clearly mention they use byte array
>> serialization,
>> > don't you think we could fix this by just adding byte array
>> serialization
>> > into pmacct kafka plugin?
>> >
>> > Let me know if this make sense.
>> >
>> > thanks
>> >
>> > ubuntu@ip-10-180-221-47:~/datasets$ java -jar avro-tools-1.8.2.jar
>> tojson
>> > f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
>> > log4j:WARN No appenders could be found for logger
>> > (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
>> > log4j:WARN Please initialize the log4j system properly.
>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>> for
>> > more info.
>> >
>> {"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable to
>> > deserialize data*"},"payload":"{\"event_type\": \"purge\", \"as_src\":
>> > 6507, \"as_dst\": 5739, \"peer_ip_src\": \"x.x.x.x\", \"peer_ip_dst\":
>> > \"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
>> > \"net_src\": \"x.x.x.x\", \"ip_dst\": \"x.x.x.x\", \"net_dst\":
>> > \"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
>> > \"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
>> \"tos\":
>> > 0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
>> > 22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
>> > \"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
>> \"stamp_inserted\":
>> > \"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
>> > \"packets\": 1, \"bytes\": 667, \"writer_id\": \"default_kafka/2971\"}"}
>> >
>> >
>> >
>> > On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <raphael.timo...@gmail.com
>> >
>> > wrote:
>> >
>> > > I’ve also been keeping a keen eye on the Pnda.io guide page (still
>> blank)
>> > > for Pmacct… who is actually working on this one @ Jaime?
>> > >
>> > > - Tim
>> > >
>> > >
>> > > On 21 Mar 2018, at 12:00 pm, Jaime Botello <jbote...@riotgames.com>
>> wrote:
>> > >
>> > > thank you Paolo, I will update the group as well if we have an update.
>> > >
>> > > On Tue, Mar 20, 2018 at 4:22 PM, Paolo Lucente <pa...@pmacct.net>
>> wrote:
>> > >
>> > >>
>> > >> Hi Jaime,
>> > >>
>> > >> While i can't help you with this, i'm trying to see whether this can
>> be
>> > >> further investigated with the PNDA team (as a result of that i seem
>> to
>> > >> understand you were also in touch with them too). Keep you posted in
>> > >> case of any news.
>> > >>
>> > >> Paolo
>> > >>
>> > >> On Mon, Mar 19, 2018 at 12:44:57AM -0700, Jaime Botello wrote:
>> > >> > Hi,
>> > >> >
>> > >> > We are working on getting pmacct as a netflow producer for pnda.io
>> data
>> > >> > pipeline and we are wondering if anyone else had tried to integrate
>> > >> pmacct
>> > >> > with pnda.io.
>> > >> >
>> > >> > pnda.io use kafka and avro to get data into the pipeline.
>> > >> >
>> > >> > After getting all components required on pmacct we believe there's
>> > >> > something still missing.
>> > >> >
>> > >> > I'm using the following code just to test and consume the data
>> that is
>> > >> in
>> > >> > the pipeline
>> > >> >
>> > >> > https://github.com/pndaproject/example-kafka-clients/blob/
>> > >> develop/python/consumer.py
>> > >> >
>> > >> > In theory, when running the above code, we should get an output
>> like the
>> > >> > following
>> > >> > https://github.com/pndaproject/pnda-guide/blob/develop/
>> > >> producer/data-preparation.md#consumer-example
>> > >> >
>> > >> > Instead, the script fails with unicode decoding errors
>> > >> >
>> > >> > ------------------------------------------------------------
>> > >> > ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
>> > >> > timestamp=1521442141690, timestamp_type=0, key=None,
>> > >> > value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
>> > >> 0.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8
>> > >> \xea\x8e\x06\x8e
>> > >> > \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
>> > >> > 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
>> > >> > 06:36:00\x02&2018-03-19
>> > >> > 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
>> checksum=None,
>> > >> > serialized_key_size=-1, serialized_value_size=246)
>> > >> > error
>> > >> > ------------------------------------------------------------
>> > >> > Traceback (most recent call last):
>> > >> >   File "consumer.py", line 115, in run
>> > >> >     consume_message(message)
>> > >> >   File "consumer.py", line 78, in consume_message
>> > >> >     msg = reader.read(decoder)
>> > >> >   File
>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> > >> n2.7/site-packages/avro/io.py",
>> > >> > line 445, in read
>> > >> >     return self.read_data(self.writers_schema, self.readers_schema,
>> > >> decoder)
>> > >> >   File
>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> > >> n2.7/site-packages/avro/io.py",
>> > >> > line 490, in read_data
>> > >> >     return self.read_record(writers_schema, readers_schema,
>> decoder)
>> > >> >   File
>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> > >> n2.7/site-packages/avro/io.py",
>> > >> > line 690, in read_record
>> > >> >     field_val = self.read_data(field.type, readers_field.type,
>> decoder)
>> > >> >   File
>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> > >> n2.7/site-packages/avro/io.py",
>> > >> > line 468, in read_data
>> > >> >     return decoder.read_utf8()
>> > >> >   File
>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>> > >> n2.7/site-packages/avro/io.py",
>> > >> > line 233, in read_utf8
>> > >> >     return unicode(self.read_bytes(), "utf-8")
>> > >> > UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in
>> position 15:
>> > >> > invalid start byte
>> > >> >
>> > >> > Current pnda.io, use some avro plugin when integrating with
>> logstash,
>> > >> in
>> > >> > our case we would like to use pmacct instead.   We believe, in the
>> case
>> > >> of
>> > >> > pmacct something else may required in order to push data correctly
>> to
>> > >> > pnda.io kafka.
>> > >> >
>> > >> > https://github.com/pndaproject/pnda-guide/blob/develop/
>> > >> producer/logstash.md
>> > >> >
>> > >> > On thing I notice that pnda.io and logstash define as part of the
>> > >> > avro/kafka plugin is
>> > >> >
>> > >> > value_serializer => 'org.apache.kafka.common.seria
>> > >> lization.ByteArraySerializer
>> > >> >
>> > >> >
>> > >> > However, i was not able to find out if this is something I can set
>> up
>> > >> with
>> > >> > pmacct/kafka integration.
>> > >> >
>> > >> > I'm attaching some relevant configuration.
>> > >> >
>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
>> > >> > ! ..
>> > >> > plugins: kafka
>> > >> > !
>> > >> > aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
>> src_as,
>> > >> > dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
>> dst_net,
>> > >> > src_mask, dst_mask, tcpflag
>> > >> > s, sampling_rate, timestamp_start, timestamp_end, timestamp_arrival
>> > >> > !
>> > >> > nfacctd_port: 2055
>> > >> > nfacctd_ip: 10.180.222.10
>> > >> > !
>> > >> > !
>> > >> > kafka_output: avro
>> > >> > avro_schema_output_file: ~/pmacct/pnda.avsc
>> > >> > kafka_topic: netflow
>> > >> > kafka_refresh_time: 60
>> > >> > kafka_history: 1m
>> > >> > kafka_history_roundoff: m
>> > >> > kafka_broker_host: 10.180.221.130
>> > >> > kafka_broker_port: 9092
>> > >> >
>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc
>> > >> > {
>> > >> >   "namespace": "pnda.entity",
>> > >> >   "type": "record",
>> > >> >   "name": "event",
>> > >> >   "fields": [
>> > >> >      {"name": "timestamp", "type": "long"},
>> > >> >      {"name": "src",       "type": "string"},
>> > >> >      {"name": "host_ip",   "type": "string"},
>> > >> >      {"name": "rawdata",   "type": "bytes"}
>> > >> >   ]
>> > >> > }
>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$
>> > >> >
>> > >> > thanks for any light here
>> > >> >
>> > >> > --
>> > >> > *Jaime Botello** (aka Jimbo) *// *Riot Games*
>> > >>
>> > >> > _______________________________________________
>> > >> > pmacct-discussion mailing list
>> > >> > http://www.pmacct.net/#mailinglists
>> > >>
>> > >>
>> > >> _______________________________________________
>> > >> pmacct-discussion mailing list
>> > >> http://www.pmacct.net/#mailinglists
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
>> > > 310-210-9772 // summoner: Riot R3lick
>> > > _______________________________________________
>> > > pmacct-discussion mailing list
>> > > http://www.pmacct.net/#mailinglists
>> > >
>> > >
>> > >
>> >
>> >
>> > --
>> > *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
>> > 310-210-9772 // summoner: Riot R3lick
>>
>
>
>
> --
> *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
> 310-210-9772 // summoner: Riot R3lick
> _______________________________________________
> pmacct-discussion mailing list
> http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to