fixed bluecoat_parse()
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/6b79abbb Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/6b79abbb Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/6b79abbb Branch: refs/heads/master Commit: 6b79abbb079d99d283664382fba131864049f1fa Parents: 2ea6b4e Author: tpltnt <tpltnt.git...@dropcut.net> Authored: Thu Jan 25 12:40:38 2018 +0100 Committer: tpltnt <tpltnt.git...@dropcut.net> Committed: Thu Jan 25 12:42:22 2018 +0100 ---------------------------------------------------------------------- spot-ingest/pipelines/proxy/bluecoat.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/6b79abbb/spot-ingest/pipelines/proxy/bluecoat.py ---------------------------------------------------------------------- diff --git a/spot-ingest/pipelines/proxy/bluecoat.py b/spot-ingest/pipelines/proxy/bluecoat.py index 597c13c..2f5da0d 100644 --- a/spot-ingest/pipelines/proxy/bluecoat.py +++ b/spot-ingest/pipelines/proxy/bluecoat.py @@ -170,21 +170,30 @@ def save_data(rdd, sqc, db, db_table, topic): print("------------------------LISTENING KAFKA TOPIC:{0}------------------------".format(topic)) -def bluecoat_parse(zk,topic,db,db_table,num_of_workers,batch_size): - +def bluecoat_parse(zk, topic, db, db_table, num_of_workers, batch_size): + """ + Parse and save bluecoat logs. + + :param zk: Apache ZooKeeper quorum + :param topic: Apache Kafka topic (application name) + :param db: Apache Hive database to save into + :param db_table: table of `db` to save into + :param num_of_workers: number of Apache Kafka workers + :param batch_size: batch size for Apache Spark streaming context + """ app_name = topic wrks = int(num_of_workers) # create spark context sc = SparkContext(appName=app_name) - ssc = StreamingContext(sc,int(batch_size)) + ssc = StreamingContext(sc, int(batch_size)) sqc = HiveContext(sc) tp_stream = KafkaUtils.createStream(ssc, zk, app_name, {topic: wrks}, keyDecoder=spot_decoder, valueDecoder=spot_decoder) - proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace(" ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row)) - saved_data = proxy_data.foreachRDD(lambda row: save_data(row,sqc,db,db_table,topic)) - ssc.start(); + proxy_data = tp_stream.map(lambda row: row[1]).flatMap(lambda row: row.split("\n")).filter(lambda row: rex_date.match(row)).map(lambda row: row.strip("\n").strip("\r").replace("\t", " ").replace(" ", " ")).map(lambda row: split_log_entry(row)).map(lambda row: proxy_parser(row)) + saved_data = proxy_data.foreachRDD(lambda row: save_data(row, sqc, db, db_table, topic)) + ssc.start() ssc.awaitTermination()