Hi Anthony,

Not familiar with it, thank you.  I will bring this to our team working on
figuring out how to remove logstash from the workflow.

--Jaime



On Mon, Mar 26, 2018 at 6:27 AM, Anthony Caiafa <2600...@gmail.com> wrote:

> Just adding 2 cents here. It seems like quite a few steps and going back
> and forth to kafka. You should look at Apache NIFI which will take the data
> from pmacct and it will cut down on all of those steps.
>
> On Mon, Mar 26, 2018 at 9:08 AM Jaime Botello <jbote...@riotgames.com>
> wrote:
>
>> Hi Paolo,
>>
>> Outside access to the environment is going to be difficult since is part
>> of our production environment, however, we may be able to arrange some
>> remote sessions if that's something that may work.
>>
>> Having said that, we were able to find a workaround that works as follows:
>>
>> Since pmacct can't serialize the data to something that pnda.io would
>> understand, we set up a logstash instance that serves as message
>> translation between pmacct and pnda.io.
>>
>>
>>
>> Logstash is configured to use pnda-avro codec plugin that supports
>> ArrayByteSerialization.  This seems to be working for now and it will
>> provide us with some time to figure out how we can integrate
>> pmacct directly with pnda.io so we can increase the overall throughput
>> of the system and maximize the efficiency.
>>
>> If there's any interest in the details, I can share some of the
>> documentation we are working on right now.
>>
>> thank you
>>
>> --Jaime
>>
>> On Sat, Mar 24, 2018 at 1:28 PM, Paolo Lucente <pa...@pmacct.net> wrote:
>>
>>>
>>> Hey Jaime,
>>>
>>> What you say does make sense to me and would be up to this dev. Can i
>>> ask you if it would be a possibility to access your deployment (since i
>>> do not have the PNDA framework deployed anywhere)? It would make easier
>>> development and subsequent testing. If yes, we can follow up privately.
>>>
>>> Paolo
>>>
>>>
>>> On Wed, Mar 21, 2018 at 07:56:42PM -0700, Jaime Botello wrote:
>>> > Hey Paolo,
>>> >
>>> > I was thinking about this after reading a little bit more on how data
>>> is
>>> > deserialized by pnda.io
>>> >
>>> > For example, if I download the file from pnda hdfs and read it using
>>> avro
>>> > tools, you can see pnda(kafka) was not able to deserialize the data.
>>> >  Since Pnda use byte array deserialization, and by reading their
>>> logstash
>>> > integration notes, they clearly mention they use byte array
>>> serialization,
>>> > don't you think we could fix this by just adding byte array
>>> serialization
>>> > into pmacct kafka plugin?
>>> >
>>> > Let me know if this make sense.
>>> >
>>> > thanks
>>> >
>>> > ubuntu@ip-10-180-221-47:~/datasets$ java -jar avro-tools-1.8.2.jar
>>> tojson
>>> > f0f01acf-5011-42ec-90b0-c18f21e4e2ab.avro
>>> > log4j:WARN No appenders could be found for logger
>>> > (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
>>> > log4j:WARN Please initialize the log4j system properly.
>>> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
>>> for
>>> > more info.
>>> > {"topic":"netflow","timestamp":1521676850049,"reason":{"string":"*Unable
>>> to
>>> > deserialize data*"},"payload":"{\"event_type\": \"purge\", \"as_src\":
>>> > 6507, \"as_dst\": 5739, \"peer_ip_src\": \"x.x.x.x\", \"peer_ip_dst\":
>>> > \"\", \"iface_in\": 654, \"iface_out\": 659, \"ip_src\": \"x.x.x.x\",
>>> > \"net_src\": \"x.x.x.x\", \"ip_dst\": \"x.x.x.x\", \"net_dst\":
>>> > \"x.x.x.x\", \"mask_src\": 32, \"mask_dst\": 16, \"port_src\": 2099,
>>> > \"port_dst\": 55764, \"tcp_flags\": \"24\", \"ip_proto\": \"tcp\",
>>> \"tos\":
>>> > 0, \"sampling_rate\": 1000, \"timestamp_start\": \"2018-03-16
>>> > 22:17:29.856000\", \"timestamp_end\": \"2018-03-16 22:17:29.856000\",
>>> > \"timestamp_arrival\": \"2018-03-16 22:18:30.893573\",
>>> \"stamp_inserted\":
>>> > \"2018-03-16 22:17:00\", \"stamp_updated\": \"2018-03-16 22:19:01\",
>>> > \"packets\": 1, \"bytes\": 667, \"writer_id\":
>>> \"default_kafka/2971\"}"}
>>> >
>>> >
>>> >
>>> > On Tue, Mar 20, 2018 at 6:06 PM, Tim Raphael <
>>> raphael.timo...@gmail.com>
>>> > wrote:
>>> >
>>> > > I’ve also been keeping a keen eye on the Pnda.io guide page (still
>>> blank)
>>> > > for Pmacct… who is actually working on this one @ Jaime?
>>> > >
>>> > > - Tim
>>> > >
>>> > >
>>> > > On 21 Mar 2018, at 12:00 pm, Jaime Botello <jbote...@riotgames.com>
>>> wrote:
>>> > >
>>> > > thank you Paolo, I will update the group as well if we have an
>>> update.
>>> > >
>>> > > On Tue, Mar 20, 2018 at 4:22 PM, Paolo Lucente <pa...@pmacct.net>
>>> wrote:
>>> > >
>>> > >>
>>> > >> Hi Jaime,
>>> > >>
>>> > >> While i can't help you with this, i'm trying to see whether this
>>> can be
>>> > >> further investigated with the PNDA team (as a result of that i seem
>>> to
>>> > >> understand you were also in touch with them too). Keep you posted in
>>> > >> case of any news.
>>> > >>
>>> > >> Paolo
>>> > >>
>>> > >> On Mon, Mar 19, 2018 at 12:44:57AM -0700, Jaime Botello wrote:
>>> > >> > Hi,
>>> > >> >
>>> > >> > We are working on getting pmacct as a netflow producer for
>>> pnda.io data
>>> > >> > pipeline and we are wondering if anyone else had tried to
>>> integrate
>>> > >> pmacct
>>> > >> > with pnda.io.
>>> > >> >
>>> > >> > pnda.io use kafka and avro to get data into the pipeline.
>>> > >> >
>>> > >> > After getting all components required on pmacct we believe there's
>>> > >> > something still missing.
>>> > >> >
>>> > >> > I'm using the following code just to test and consume the data
>>> that is
>>> > >> in
>>> > >> > the pipeline
>>> > >> >
>>> > >> > https://github.com/pndaproject/example-kafka-clients/blob/
>>> > >> develop/python/consumer.py
>>> > >> >
>>> > >> > In theory, when running the above code, we should get an output
>>> like the
>>> > >> > following
>>> > >> > https://github.com/pndaproject/pnda-guide/blob/develop/
>>> > >> producer/data-preparation.md#consumer-example
>>> > >> >
>>> > >> > Instead, the script fails with unicode decoding errors
>>> > >> >
>>> > >> > ------------------------------------------------------------
>>> > >> > ConsumerRecord(topic=u'netflow', partition=0, offset=1591318,
>>> > >> > timestamp=1521442141690, timestamp_type=0, key=None,
>>> > >> > value='\xd6e\xd6e\x1a104.160.128.2\x00\x9c\n\xf0\n\x1a104.16
>>> > >> 0.138.1\x1a104.160.138.1\x1c104.160.128.20\x1c104.160.128.16@8
>>> > >> \xea\x8e\x06\x8e
>>> > >> > \x020\x06udp\x00\xd0\x0f42018-03-19 06:36:58.75200042018-03-19
>>> > >> > 06:48:52.73600042018-03-19 06:48:59.727396\x02&2018-03-19
>>> > >> > 06:36:00\x02&2018-03-19
>>> > >> > 06:49:01\x02\x06\x00\x02\x9e\x06&default_kafka/22884',
>>> checksum=None,
>>> > >> > serialized_key_size=-1, serialized_value_size=246)
>>> > >> > error
>>> > >> > ------------------------------------------------------------
>>> > >> > Traceback (most recent call last):
>>> > >> >   File "consumer.py", line 115, in run
>>> > >> >     consume_message(message)
>>> > >> >   File "consumer.py", line 78, in consume_message
>>> > >> >     msg = reader.read(decoder)
>>> > >> >   File
>>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>>> > >> n2.7/site-packages/avro/io.py",
>>> > >> > line 445, in read
>>> > >> >     return self.read_data(self.writers_schema,
>>> self.readers_schema,
>>> > >> decoder)
>>> > >> >   File
>>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>>> > >> n2.7/site-packages/avro/io.py",
>>> > >> > line 490, in read_data
>>> > >> >     return self.read_record(writers_schema, readers_schema,
>>> decoder)
>>> > >> >   File
>>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>>> > >> n2.7/site-packages/avro/io.py",
>>> > >> > line 690, in read_record
>>> > >> >     field_val = self.read_data(field.type, readers_field.type,
>>> decoder)
>>> > >> >   File
>>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>>> > >> n2.7/site-packages/avro/io.py",
>>> > >> > line 468, in read_data
>>> > >> >     return decoder.read_utf8()
>>> > >> >   File
>>> > >> > "/Library/Frameworks/Python.framework/Versions/2.7/lib/pytho
>>> > >> n2.7/site-packages/avro/io.py",
>>> > >> > line 233, in read_utf8
>>> > >> >     return unicode(self.read_bytes(), "utf-8")
>>> > >> > UnicodeDecodeError: 'utf8' codec can't decode byte 0x9c in
>>> position 15:
>>> > >> > invalid start byte
>>> > >> >
>>> > >> > Current pnda.io, use some avro plugin when integrating with
>>> logstash,
>>> > >> in
>>> > >> > our case we would like to use pmacct instead.   We believe, in
>>> the case
>>> > >> of
>>> > >> > pmacct something else may required in order to push data
>>> correctly to
>>> > >> > pnda.io kafka.
>>> > >> >
>>> > >> > https://github.com/pndaproject/pnda-guide/blob/develop/
>>> > >> producer/logstash.md
>>> > >> >
>>> > >> > On thing I notice that pnda.io and logstash define as part of the
>>> > >> > avro/kafka plugin is
>>> > >> >
>>> > >> > value_serializer => 'org.apache.kafka.common.seria
>>> > >> lization.ByteArraySerializer
>>> > >> >
>>> > >> >
>>> > >> > However, i was not able to find out if this is something I can
>>> set up
>>> > >> with
>>> > >> > pmacct/kafka integration.
>>> > >> >
>>> > >> > I'm attaching some relevant configuration.
>>> > >> >
>>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$ more netflow_kafka.conf
>>> > >> > ! ..
>>> > >> > plugins: kafka
>>> > >> > !
>>> > >> > aggregate: src_host, dst_host, src_port, dst_port, proto, tos,
>>> src_as,
>>> > >> > dst_as, peer_src_ip, peer_dst_ip, in_iface, out_iface, src_net,
>>> dst_net,
>>> > >> > src_mask, dst_mask, tcpflag
>>> > >> > s, sampling_rate, timestamp_start, timestamp_end,
>>> timestamp_arrival
>>> > >> > !
>>> > >> > nfacctd_port: 2055
>>> > >> > nfacctd_ip: 10.180.222.10
>>> > >> > !
>>> > >> > !
>>> > >> > kafka_output: avro
>>> > >> > avro_schema_output_file: ~/pmacct/pnda.avsc
>>> > >> > kafka_topic: netflow
>>> > >> > kafka_refresh_time: 60
>>> > >> > kafka_history: 1m
>>> > >> > kafka_history_roundoff: m
>>> > >> > kafka_broker_host: 10.180.221.130
>>> > >> > kafka_broker_port: 9092
>>> > >> >
>>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$ more pnda.avsc
>>> > >> > {
>>> > >> >   "namespace": "pnda.entity",
>>> > >> >   "type": "record",
>>> > >> >   "name": "event",
>>> > >> >   "fields": [
>>> > >> >      {"name": "timestamp", "type": "long"},
>>> > >> >      {"name": "src",       "type": "string"},
>>> > >> >      {"name": "host_ip",   "type": "string"},
>>> > >> >      {"name": "rawdata",   "type": "bytes"}
>>> > >> >   ]
>>> > >> > }
>>> > >> > ubuntu@ip-10-180-221-190:~/pmacct$
>>> > >> >
>>> > >> > thanks for any light here
>>> > >> >
>>> > >> > --
>>> > >> > *Jaime Botello** (aka Jimbo) *// *Riot Games*
>>> > >>
>>> > >> > _______________________________________________
>>> > >> > pmacct-discussion mailing list
>>> > >> > http://www.pmacct.net/#mailinglists
>>> > >>
>>> > >>
>>> > >> _______________________________________________
>>> > >> pmacct-discussion mailing list
>>> > >> http://www.pmacct.net/#mailinglists
>>> > >>
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
>>> > > 310-210-9772 // summoner: Riot R3lick
>>> > > _______________________________________________
>>> > > pmacct-discussion mailing list
>>> > > http://www.pmacct.net/#mailinglists
>>> > >
>>> > >
>>> > >
>>> >
>>> >
>>> > --
>>> > *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
>>> > 310-210-9772 // summoner: Riot R3lick
>>>
>>
>>
>>
>> --
>> *Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
>> 310-210-9772 // summoner: Riot R3lick
>> _______________________________________________
>> pmacct-discussion mailing list
>> http://www.pmacct.net/#mailinglists
>
>
> _______________________________________________
> pmacct-discussion mailing list
> http://www.pmacct.net/#mailinglists
>



-- 
*Jaime Botello** (aka Jimbo) *// *Riot Games* : Riot Direct // c:
310-210-9772 // summoner: Riot R3lick
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to