back out spot-oa changes for this PR

Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/1241398a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/1241398a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/1241398a

Branch: refs/heads/SPOT-181_ODM
Commit: 1241398ae4d13b6c745dc59b3e89e2ba91c2d960
Parents: 579be08
Author: Curtis Howard <curtis@curtis-MBP.local>
Authored: Thu Jan 25 10:05:54 2018 -0500
Committer: Curtis Howard <curtis@curtis-MBP.local>
Committed: Thu Jan 25 10:05:54 2018 -0500

----------------------------------------------------------------------
 spot-oa/oa/dns/dns_oa.py     | 320 ++++++++++++++++++++------------------
 spot-oa/oa/flow/flow_oa.py   | 272 ++++++++++++++++----------------
 spot-oa/oa/proxy/proxy_oa.py | 251 ++++++++++++++----------------
 3 files changed, 424 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1241398a/spot-oa/oa/dns/dns_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py
index a923cc2..5982e8b 100644
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@ -23,7 +23,8 @@ import sys
 import datetime
 import csv, math
 from tld import get_tld
-
+import api.resources.impala_engine as impala
+import api.resources.hdfs_client as HDFSClient
 from collections import OrderedDict
 from utils import Util
 from components.data.data import Data
@@ -47,7 +48,7 @@ class OA(object):
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        #self._table_name = "dns"
+        self._table_name = "dns"
         self._dns_results = []
         self._limit = limit
         self._data_path = None
@@ -67,8 +68,6 @@ class OA(object):
 
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
-        self._table_name = self._spot_conf.get('conf', 'DNS_TABLE')
-        self._engine = Data(self._db,self._table_name ,self._logger) 
 
 
     def start(self):
@@ -77,15 +76,16 @@ class OA(object):
         start = time.time()
         ####################
 
+        self._clear_previous_executions()
         self._create_folder_structure()
         self._add_ipynb()
         self._get_dns_results()
         self._add_tld_column()
         self._add_reputation()
-        self._add_hh_and_severity()
+        self._add_hh_column()
         self._add_iana()
         self._add_network_context()
-        self._create_dns_scores_csv()
+        self._create_dns_scores()
         self._get_oa_details()
         self._ingest_summary()
 
@@ -94,18 +94,40 @@ class OA(object):
         print(end - start)
         ##################
 
+
+    def _clear_previous_executions(self):
+
+        self._logger.info("Cleaning data from previous executions for the day")
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]
+        table_schema = []
+        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", 
"").replace('"', '')
+        table_schema=['suspicious', 'edge', 'dendro', 'threat_dendro', 
'threat_investigation', 'storyboard', 'summary' ]
+
+        for path in table_schema:
+            
HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
+        impala.execute_query("invalidate metadata")
+
+        #removes Feedback file
+        
HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+        #removes json files from the storyboard
+        
HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+
     def _create_folder_structure(self):
 
         # create date folder structure if it does not exist.
         self._logger.info("Creating folder structure for OA (data and ipynb)") 
      
         self._data_path,self._ingest_summary_path,self._ipynb_path = 
Util.create_oa_folders("dns",self._date)
     
+
     def _add_ipynb(self):
 
         if os.path.isdir(self._ipynb_path):
 
-            self._logger.info("Adding edge investigation IPython Notebook")
-            
shutil.copy("{0}/ipynb_templates/Edge_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Edge_Investigation.ipynb".format(self._ipynb_path))
+            self._logger.info("Adding advanced mode IPython Notebook")
+            
shutil.copy("{0}/ipynb_templates/Advanced_Mode_master.ipynb".format(self._scrtip_path),"{0}/Advanced_Mode.ipynb".format(self._ipynb_path))
 
             self._logger.info("Adding threat investigation IPython Notebook")
             
shutil.copy("{0}/ipynb_templates/Threat_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Threat_Investigation.ipynb".format(self._ipynb_path))
@@ -127,58 +149,59 @@ class OA(object):
         get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
         self._logger.info("{0}".format(get_command))
 
-         # validate files exists
         if os.path.isfile(dns_results):
 
             # read number of results based in the limit specified.
             self._logger.info("Reading {0} dns results file: 
{1}".format(self._date,dns_results))
             self._dns_results = 
Util.read_results(dns_results,self._limit,self._results_delimiter)[:]
-            if len(self._dns_results) == 0: self._logger.error("There are not 
flow results.");sys.exit(1)
+            if len(self._dns_results) == 0: self._logger.error("There are not 
dns results.");sys.exit(1)
 
         else:
             self._logger.error("There was an error getting ML results from 
HDFS")
             sys.exit(1)
 
-        # add headers.        
-        self._logger.info("Adding headers")
-        self._dns_scores_headers = [  str(key) for (key,value) in 
self._conf['dns_score_fields'].items() ]
-
         # add dns content.
-        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]       
+        self._dns_scores = [ conn[:]  for conn in self._dns_results][:]
+
 
     def _move_time_stamp(self,dns_data):
         
-        for dns in dns_data:
-            time_stamp = dns[1]
-            dns.remove(time_stamp)
-            dns.append(time_stamp)
-        
+        # return dns_data_ordered
         return dns_data        
 
-    def _create_dns_scores_csv(self):
+
+    def _create_dns_scores(self):
         
-        dns_scores_csv = "{0}/dns_scores.csv".format(self._data_path)
-        dns_scores_final =  self._move_time_stamp(self._dns_scores)
-        dns_scores_final.insert(0,self._dns_scores_headers)
-        Util.create_csv_file(dns_scores_csv,dns_scores_final)   
+        # get date parameters.
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]
+        value_string = ""
+
+        dns_scores_final = self._move_time_stamp(self._dns_scores)
+        self._dns_scores = dns_scores_final
 
-        # create bk file
-        dns_scores_bu_csv = "{0}/dns_scores_bu.csv".format(self._data_path)
-        Util.create_csv_file(dns_scores_bu_csv,dns_scores_final)     
+        for row in dns_scores_final:
+            value_string += str(tuple(Util.cast_val(item) for item in row)) + 
","
+
+        load_into_impala = ("""
+             INSERT INTO {0}.dns_scores partition(y={2}, m={3}, d={4}) VALUES 
{1}
+        """).format(self._db, value_string[:-1], yr, mn, dy)
+        impala.execute_query(load_into_impala)
 
 
     def _add_tld_column(self):
         qry_name_col = self._conf['dns_results_fields']['dns_qry_name'] 
-        self._dns_scores = [conn + [ get_tld("http://"; + 
str(conn[qry_name_col]), fail_silently=True) if "http://"; not in 
str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), 
fail_silently=True)] for conn in self._dns_scores ] 
-  
+        self._dns_scores = [conn + [ get_tld("http://"; + 
str(conn[qry_name_col]), fail_silently=True) if "http://"; not in 
str(conn[qry_name_col]) else get_tld(str(conn[qry_name_col]), 
fail_silently=True)] for conn in self._dns_scores ]
+
+
     def _add_reputation(self):
 
         # read configuration.
         reputation_conf_file = 
"{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         self._logger.info("Reading reputation configuration file: 
{0}".format(reputation_conf_file))
         rep_conf = json.loads(open(reputation_conf_file).read())
-        
-        
+
         # initialize reputation services.
         self._rep_services = []
         self._logger.info("Initializing reputation services.")
@@ -200,7 +223,6 @@ class OA(object):
         # get reputation per column.
         self._logger.info("Getting reputation for each service in config")     
   
         rep_services_results = []
-
  
         if self._rep_services :
             for key,value in rep_cols.items():
@@ -209,21 +231,22 @@ class OA(object):
                 for result in rep_services_results:            
                     rep_results = {k: "{0}::{1}".format(rep_results.get(k, 
""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
 
-                self._dns_scores = [ conn + [ rep_results[conn[key]] ]   for 
conn in self._dns_scores  ]
+                if rep_results:
+                    self._dns_scores = [ conn + [ rep_results[conn[key]] ]   
for conn in self._dns_scores  ]
+                else:
+                    self._dns_scores = [ conn + [""]   for conn in 
self._dns_scores  ]
         else:
             self._dns_scores = [ conn + [""]   for conn in self._dns_scores  ]
 
 
+    def _add_hh_column(self):
 
-    def _add_hh_and_severity(self):
-
-        # add hh value and sev columns.
+        # add hh value column.
         dns_date_index = self._conf["dns_results_fields"]["frame_time"]
-        self._dns_scores = [conn + [ filter(None,conn[dns_date_index].split(" 
"))[3].split(":")[0]] + [0] + [0] for conn in self._dns_scores  ]
+        self._dns_scores = [conn + [ filter(None,conn[dns_date_index].split(" 
"))[3].split(":")[0]] for conn in self._dns_scores  ]
 
 
     def _add_iana(self):
-
         iana_conf_file = 
"{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         if os.path.isfile(iana_conf_file):
             iana_config  = json.loads(open(iana_conf_file).read())
@@ -232,13 +255,13 @@ class OA(object):
             dns_qry_class_index = 
self._conf["dns_results_fields"]["dns_qry_class"]
             dns_qry_type_index = 
self._conf["dns_results_fields"]["dns_qry_type"]
             dns_qry_rcode_index = 
self._conf["dns_results_fields"]["dns_qry_rcode"]
-            self._dns_scores = [ conn + [ 
dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + 
[dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + [ 
dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode") ] for conn in 
self._dns_scores ]
+            self._dns_scores = [ conn + [ 
dns_iana.get_name(conn[dns_qry_class_index],"dns_qry_class")] + 
[dns_iana.get_name(conn[dns_qry_type_index],"dns_qry_type")] + 
[dns_iana.get_name(conn[dns_qry_rcode_index],"dns_qry_rcode")] for conn in 
self._dns_scores ]
             
         else:            
             self._dns_scores = [ conn + ["","",""] for conn in 
self._dns_scores ] 
 
-    def _add_network_context(self):
 
+    def _add_network_context(self):
         nc_conf_file = 
"{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         if os.path.isfile(nc_conf_file):
             nc_conf = json.loads(open(nc_conf_file).read())["NC"]
@@ -246,15 +269,15 @@ class OA(object):
             ip_dst_index = self._conf["dns_results_fields"]["ip_dst"]
             self._dns_scores = [ conn + [dns_nc.get_nc(conn[ip_dst_index])] 
for conn in self._dns_scores ]
         else:
-            self._dns_scores = [ conn + [""] for conn in self._dns_scores ]
+            self._dns_scores = [ conn + [0] for conn in self._dns_scores ]
 
 
     def _get_oa_details(self):
         
-        self._logger.info("Getting OA DNS suspicious details/chord diagram")   
    
+        self._logger.info("Getting OA DNS suspicious details/dendro diagram")
         # start suspicious connects details process.
         p_sp = Process(target=self._get_suspicious_details)
-        p_sp.start()        
+        p_sp.start()
 
         # start chord diagram process.            
         p_dn = Process(target=self._get_dns_dendrogram)
@@ -263,6 +286,7 @@ class OA(object):
         p_sp.join()
         p_dn.join()
 
+
     def _get_suspicious_details(self):
 
         iana_conf_file = 
"{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -271,85 +295,87 @@ class OA(object):
             dns_iana = IanaTransform(iana_config["IANA"])
         
         for conn in self._dns_scores:
-            # get data to query
-            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
-            date = filter(None,date)
-
-            if len(date) == 5:
-                year=date[2]
-                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
-                day=date[1]                
-                hh=conn[self._conf["dns_score_fields"]["hh"]]
-                dns_qry_name = 
conn[self._conf["dns_score_fields"]["dns_qry_name"]]
-                self._get_dns_details(dns_qry_name,year,month,day,hh,dns_iana)
+
+            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+            full_date = 
datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
+
+            date = full_date.split(" ")[0].split("-")
+            # get date parameters.
+            yr = date[0]
+            mn = date[1]
+            dy = date[2]
+            time = full_date.split(" ")[1].split(":")
+            hh = int(time[0])
+
+            dns_qry_name = conn[self._conf["dns_score_fields"]["dns_qry_name"]]
+            self._get_dns_details(dns_qry_name,yr,mn,dy,hh,dns_iana)
+
 
     def _get_dns_details(self,dns_qry_name,year,month,day,hh,dns_iana):
-                    
-        limit = self._details_limit
-        edge_file 
="{0}/edge-{1}_{2}_00.csv".format(self._data_path,dns_qry_name.replace("/","-"),hh)
-        edge_tmp  
="{0}/edge-{1}_{2}_00.tmp".format(self._data_path,dns_qry_name.replace("/","-"),hh)
+        value_string = ""
+        query_to_load =("""
+            SELECT 
unix_tstamp,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a,h
 as hh
+            FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE 
'%{5}%' AND h={6} LIMIT {7};
+        
""").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,self._details_limit)
+
+        try:
+             dns_details = impala.execute_query(query_to_load)
+        except:
+            self._logger.info("WARNING. Details couldn't be retreived for {0}, 
skipping this step".format(dns_qry_name))
+        else:
+        # add IANA to results.
+            update_rows = []
+            if dns_iana:
+                self._logger.info("Adding IANA translation to details results")
 
-        if not os.path.isfile(edge_file):
-    
-            dns_qry = ("SELECT 
frame_time,frame_len,ip_dst,ip_src,dns_qry_name,dns_qry_class,dns_qry_type,dns_qry_rcode,dns_a
 FROM {0}.{1} WHERE y={2} AND m={3} AND d={4} AND dns_qry_name LIKE '%{5}%' AND 
h={6} LIMIT 
{7};").format(self._db,self._table_name,year,month,day,dns_qry_name,hh,limit)
-            
-            # execute query
-           try:
-                self._engine.query(dns_qry,edge_tmp)
-            except:
-               self._logger.error("ERROR. Edge file couldn't be created for 
{0}, skipping this step".format(dns_qry_name))
+                dns_details = [ conn + 
(dns_iana.get_name(str(conn[5]),"dns_qry_class"),dns_iana.get_name(str(conn[6]),"dns_qry_type"),dns_iana.get_name(str(conn[7]),"dns_qry_rcode"))
 for conn in dns_details ]
+            else:
+                self._logger.info("WARNING: NO IANA configured.")
+                dns_details = [ conn + ("","","") for conn in dns_details ]
+
+            nc_conf_file = 
"{0}/components/nc/nc_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+            if os.path.isfile(nc_conf_file):
+                nc_conf = json.loads(open(nc_conf_file).read())["NC"]
+                dns_nc = NetworkContext(nc_conf,self._logger)
+                dns_details = [ conn + (dns_nc.get_nc(conn[2]),) for conn in 
dns_details ]
             else:
-            # add IANA to results.
-                if dns_iana:
-                    update_rows = []
-                    self._logger.info("Adding IANA translation to details 
results")
-                    with open(edge_tmp) as dns_details_csv:
-                        rows = csv.reader(dns_details_csv, delimiter=',', 
quotechar='|')
-                        try:
-                            next(rows)
-                            update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + 
[conn[3]] + [conn[4]] + [dns_iana.get_name(conn[5],"dns_qry_class")] + 
[dns_iana.get_name(conn[6],"dns_qry_type")] + 
[dns_iana.get_name(conn[7],"dns_qry_rcode")] + [conn[8]] for conn in rows]
-                            update_rows = filter(None, update_rows)
-                            header = [ "frame_time", "frame_len", 
"ip_dst","ip_src","dns_qry_name","dns_qry_class_name","dns_qry_type_name","dns_qry_rcode_name","dns_a"
 ]
-                            update_rows.insert(0,header)
-                        except IndexError:
-                            pass
+                dns_details = [ conn + (0,) for conn in dns_details ]
 
-                else:
-                    self._logger.info("WARNING: NO IANA configured.")
-
-                    # create edge file.
-                self._logger.info("Creating edge file:{0}".format(edge_file))
-                with open(edge_file,'wb') as dns_details_edge:
-                    writer = csv.writer(dns_details_edge, 
quoting=csv.QUOTE_ALL)
-                    if update_rows:
-                        writer.writerows(update_rows)
-                    else:            
-                        shutil.copy(edge_tmp,edge_file)           
+            for row in dns_details:
+                value_string += str(tuple(item for item in row)) + ","
+
+            if value_string != "":
                 
-                os.remove(edge_tmp)
+                query_to_insert=("""
+                    INSERT INTO {0}.dns_edge PARTITION (y={1}, m={2}, d={3}) 
VALUES ({4});
+                """).format(self._db,year, month, day,  value_string[:-1])
+
+                impala.execute_query(query_to_insert)
 
 
     def _get_dns_dendrogram(self):
-        limit = self._details_limit
-        for conn in self._dns_scores:            
-            date=conn[self._conf["dns_score_fields"]["frame_time"]].split(" ")
-            date = filter(None,date)
 
-            if len(date) == 5:
-                year=date[2]
-                month=datetime.datetime.strptime(date[0], '%b').strftime('%m')
-                day=date[1]
-                ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
-                
self._get_dendro(self._db,self._table_name,ip_dst,year,month,day, limit)
+        for conn in self._dns_scores:
+            timestamp = conn[self._conf["dns_score_fields"]["unix_tstamp"]]
+            full_date = 
datetime.datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
+
+            date = full_date.split(" ")[0].split("-")
+            # get date parameters.
 
+            yr = date[0]
+            mn = date[1]
+            dy = date[2]
+            ip_dst=conn[self._conf["dns_score_fields"]["ip_dst"]]
 
-    def _get_dendro(self,db,table,ip_dst,year,month,day,limit):
+            query_to_load = ("""
+                INSERT INTO TABLE {0}.dns_dendro PARTITION (y={2}, m={3},d={4})
+                SELECT unix_tstamp, dns_a, dns_qry_name, ip_dst
+                FROM (SELECT unix_tstamp, susp.ip_dst, susp.dns_qry_name, 
susp.dns_a
+                    FROM {0}.{1} as susp WHERE susp.y={2} AND susp.m={3} AND 
susp.d={4} AND susp.ip_dst='{5}'
+                LIMIT {6}) AS tmp GROUP BY dns_a, dns_qry_name, ip_dst, 
unix_tstamp
+            
""").format(self._db,self._table_name,yr,mn,dy,ip_dst,self._details_limit)
 
-        dendro_file = "{0}/dendro-{1}.csv".format(self._data_path,ip_dst)
-        if not os.path.isfile(dendro_file):
-            dndro_qry = ("SELECT dns_a, dns_qry_name, ip_dst FROM (SELECT 
susp.ip_dst, susp.dns_qry_name, susp.dns_a FROM {0}.{1} as susp WHERE 
susp.y={2} AND susp.m={3} AND susp.d={4} AND susp.ip_dst='{5}' LIMIT {6}) AS 
tmp GROUP BY dns_a, dns_qry_name, 
ip_dst").format(db,table,year,month,day,ip_dst,limit)
-            # execute query
-            self._engine.query(dndro_qry,dendro_file)
+            impala.execute_query(query_to_load)
 
         
     def _ingest_summary(self):
@@ -364,48 +390,32 @@ class OA(object):
         result_rows = []        
         df_filtered =  pd.DataFrame()
 
-        ingest_summary_file = 
"{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)                     
 
-        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-
-        if os.path.isfile(ingest_summary_file):
-               df = pd.read_csv(ingest_summary_file, delimiter=',')
-            #discards previous rows from the same date
-               df_filtered = 
df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False] 
-        else:
-               df = pd.DataFrame()
-            
-        # get ingest summary.
-        ingest_summary_qry = ("SELECT frame_time, COUNT(*) as total "
-                                    " FROM {0}.{1}"
-                                    " WHERE y={2} AND m={3} AND d={4} "
-                                    " AND unix_tstamp IS NOT NULL AND 
frame_time IS NOT NULL"
-                                    " AND frame_len IS NOT NULL AND 
dns_qry_name IS NOT NULL"
-                                    " AND ip_src IS NOT NULL " 
-                                    " AND (dns_qry_class IS NOT NULL AND 
dns_qry_type IS NOT NULL AND dns_qry_rcode IS NOT NULL ) "
-                                    " GROUP BY frame_time;") 
-
-        ingest_summary_qry = 
ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-        
-        results_file = 
"{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
-        
self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
-
-
-        if os.path.isfile(results_file):        
-            df_results = pd.read_csv(results_file, delimiter=',') 
-
-            # Forms a new dataframe splitting the minutes from the time column
-            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, 
dy,val['frame_time'].split(" 
")[3].split(":")[0].zfill(2),val['frame_time'].split(" 
")[3].split(":")[1].zfill(2)), int(val['total']) if not 
math.isnan(val['total']) else 0 ] for key,val in df_results.iterrows()],columns 
= ingest_summary_cols)
-    
-            #Groups the data by minute 
-            sf = df_new.groupby(by=['date'])['total'].sum()
-        
-            df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
-            
-            df_final = df_filtered.append(df_per_min, ignore_index=True)
-            df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
-
-            os.remove(results_file)
-            os.rename(ingest_summary_tmp,ingest_summary_file)
-        else:
-            self._logger.info("No data found for the ingest summary")
-        
+        query_to_load = ("""
+            SELECT frame_time, COUNT(*) as total FROM {0}.{1}
+            WHERE y={2} AND m={3} AND d={4} AND unix_tstamp IS NOT NULL
+            AND frame_time IS NOT NULL AND frame_len IS NOT NULL
+            AND dns_qry_name IS NOT NULL AND ip_src IS NOT NULL
+            AND (dns_qry_class IS NOT NULL AND dns_qry_type IS NOT NULL
+            AND dns_qry_rcode IS NOT NULL ) GROUP BY frame_time;
+        """).format(self._db,self._table_name, yr, mn, dy)
+
+        results = impala.execute_query_as_list(query_to_load)
+        df = pd.DataFrame(results)
+
+        # Forms a new dataframe splitting the minutes from the time column
+        df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy,\
+            val['frame_time'].replace("  "," ").split(" 
")[3].split(":")[0].zfill(2),\
+            val['frame_time'].replace("  "," ").split(" 
")[3].split(":")[1].zfill(2)),\
+            int(val['total']) if not math.isnan(val['total']) else 0 ] for 
key,val in df.iterrows()],columns = ingest_summary_cols)
+
+        #Groups the data by minute
+        sf = df_new.groupby(by=['date'])['total'].sum()
+        df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
+
+        df_final = df_filtered.append(df_per_min, 
ignore_index=True).to_records(False,False)
+
+        if len(df_final) > 0:
+            query_to_insert=("""
+                INSERT INTO {0}.dns_ingest_summary PARTITION (y={1}, m={2}, 
d={3}) VALUES {4};
+            """).format(self._db, yr, mn, dy, tuple(df_final))
+            impala.execute_query(query_to_insert)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1241398a/spot-oa/oa/flow/flow_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index cc2318b..53cec6b 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -22,34 +22,37 @@ import sys
 import json
 import numpy as np
 import linecache, bisect
-import csv
+import csv, math
 import pandas as pd
+import subprocess
+import numbers
+import api.resources.hdfs_client as HDFSClient
+import api.resources.impala_engine as impala
 
 from collections import OrderedDict
 from multiprocessing import Process
-from utils import Util,ProgressBar
+from utils import Util, ProgressBar
 from components.data.data import Data
 from components.geoloc.geoloc import GeoLocalization
 from components.reputation.gti import gti
-
+from impala.util import as_pandas
 import time
 
 
 class OA(object):
 
-    def __init__(self,date,limit=500,logger=None):       
-       
-       self._initialize_members(date,limit,logger)
-       
-    def _initialize_members(self,date,limit,logger):
-        
+    def __init__(self,date,limit=500,logger=None):
+        self._initialize_members(date,limit,logger)
+
+    def _initialize_members(self,date,limit,logger): 
+
         # get logger if exists. if not, create new instance.
         self._logger = logging.getLogger('OA.Flow') if logger else 
Util.get_logger('OA.Flow',create_file=False)
 
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        #self._table_name = "flow"
+        self._table_name = "flow"
         self._flow_results = []
         self._limit = limit
         self._data_path = None
@@ -57,32 +60,32 @@ class OA(object):
         self._ingest_summary_path = None
         self._flow_scores = []
         self._results_delimiter = '\t'
+        
 
         # get app configuration.
         self._spot_conf = Util.get_spot_conf()
 
-        # get scores fields conf
+        # # get scores fields conf
         conf_file = "{0}/flow_conf.json".format(self._scrtip_path)
-        self._conf = json.loads(open 
(conf_file).read(),object_pairs_hook=OrderedDict)     
- 
+        self._conf = json.loads(open 
(conf_file).read(),object_pairs_hook=OrderedDict)
+
         # initialize data engine
-        self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
-        self._table_name = self._spot_conf.get('conf', 'FLOW_TABLE')
-        self._engine = Data(self._db, self._table_name,self._logger)
-                      
+        self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')        
+                
     def start(self):       
         
         ####################
         start = time.time()
-        ####################
+        ####################         
 
         self._create_folder_structure()
+        self._clear_previous_executions()        
         self._add_ipynb()  
         self._get_flow_results()
         self._add_network_context()
         self._add_geo_localization()
         self._add_reputation()        
-        self._create_flow_scores_csv()
+        self._create_flow_scores()
         self._get_oa_details()
         self._ingest_summary()
 
@@ -90,19 +93,38 @@ class OA(object):
         end = time.time()
         print(end - start)
         ##################
-       
-    def _create_folder_structure(self):
+        
+
+    def _clear_previous_executions(self):
+        
+        self._logger.info("Cleaning data from previous executions for the 
day")       
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]  
+        table_schema = []
+        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", 
"").replace('"', '')
+        table_schema=['suspicious', 'edge','chords','threat_investigation', 
'timeline', 'storyboard', 'summary' ] 
+
+        for path in table_schema:
+            
HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
        
+        impala.execute_query("invalidate metadata")
+        #removes Feedback file
+        
HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+        #removes json files from the storyboard
+        
HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+    def _create_folder_structure(self):   
 
-        # create date folder structure if it does not exist.
         self._logger.info("Creating folder structure for OA (data and ipynb)") 
      
         self._data_path,self._ingest_summary_path,self._ipynb_path = 
Util.create_oa_folders("flow",self._date)
+ 
 
     def _add_ipynb(self):     
 
         if os.path.isdir(self._ipynb_path):
 
-            self._logger.info("Adding edge investigation IPython Notebook")
-            
shutil.copy("{0}/ipynb_templates/Edge_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Edge_Investigation.ipynb".format(self._ipynb_path))
+            self._logger.info("Adding the advanced mode IPython Notebook")
+            
shutil.copy("{0}/ipynb_templates/Advanced_Mode_master.ipynb".format(self._scrtip_path),"{0}/Advanced_Mode.ipynb".format(self._ipynb_path))
 
             self._logger.info("Adding threat investigation IPython Notebook")
             
shutil.copy("{0}/ipynb_templates/Threat_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Threat_Investigation.ipynb".format(self._ipynb_path))
@@ -110,6 +132,7 @@ class OA(object):
         else:
             self._logger.error("There was a problem adding the IPython 
Notebooks, please check the directory exists.")
             
+            
     def _get_flow_results(self):
                
         self._logger.info("Getting {0} Machine Learning Results from 
HDFS".format(self._date))
@@ -118,8 +141,8 @@ class OA(object):
         # get hdfs path from conf file 
         HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", 
"").replace('"', '')
         hdfs_path = 
"{0}/flow/scored_results/{1}/scores/flow_results.csv".format(HUSER,self._date)
-               
-        # get results file from hdfs
+        
+         # get results file from hdfs
         get_command = Util.get_ml_results_form_hdfs(hdfs_path,self._data_path)
         self._logger.info("{0}".format(get_command))
 
@@ -135,36 +158,36 @@ class OA(object):
             self._logger.error("There was an error getting ML results from 
HDFS")
             sys.exit(1)
 
-        # add headers.        
-        self._logger.info("Adding headers based on configuration file: 
score_fields.json")
-        self._flow_scores = [ [ str(key) for (key,value) in 
self._conf['flow_score_fields'].items()] ]
-
-        # filter results add sev and rank.
+        # filter results add rank.
         self._logger.info("Filtering required columns based on configuration")
-        self._flow_scores.extend([ [0] +  [ conn[i] for i in 
self._conf['column_indexes_filter'] ] + [n] for n, conn in 
enumerate(self._flow_results) ])
+
+        self._flow_scores.extend([ [ conn[i] for i in 
self._conf['column_indexes_filter'] ] + [n] for n, conn in 
enumerate(self._flow_results) ])
      
-    def _create_flow_scores_csv(self):
 
-        flow_scores_csv = "{0}/flow_scores.csv".format(self._data_path)
-        Util.create_csv_file(flow_scores_csv,self._flow_scores)
+    def _create_flow_scores(self):
 
-        # create bk file
-        flow_scores_bu_csv = "{0}/flow_scores_bu.csv".format(self._data_path)
-        Util.create_csv_file(flow_scores_bu_csv,self._flow_scores)  
+        # get date parameters.
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:] 
+        value_string = ""
+
+        for row in self._flow_scores:
+            value_string += str(tuple(Util.cast_val(item) for item in row)) + 
","              
+    
+        load_into_impala = ("""
+             INSERT INTO {0}.flow_scores partition(y={2}, m={3}, d={4}) VALUES 
{1}
+        """).format(self._db, value_string[:-1], yr, mn, dy) 
+        impala.execute_query(load_into_impala)
+ 
 
     def _add_network_context(self):
 
         # use ipranges to see if the IPs are internals.         
         ip_ranges_file = 
"{0}/context/ipranges.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 
-        # add new headers (srcIpInternal/destIpInternal).
-        self._logger.info("Adding network context headers")
-        flow_headers = self._flow_scores[0]
-        flow_headers.extend(["srcIpInternal","destIpInternal"])
-
         # add values to srcIpInternal and destIpInternal.
         flow_scores = iter(self._flow_scores)
-        next(flow_scores)
 
         if os.path.isfile(ip_ranges_file):
 
@@ -185,11 +208,9 @@ class OA(object):
             self._flow_scores = [ conn + [ 
self._is_ip_internal(conn[src_ip_index],ip_internal_ranges)]+[ 
self._is_ip_internal(conn[dst_ip_index],ip_internal_ranges)] for conn in 
flow_scores]
            
         else:
-
-            self._flow_scores = [ conn + ["",""] for conn in flow_scores ]     
       
+            self._flow_scores = [ conn + [0,0] for conn in flow_scores ]       
     
             self._logger.info("WARNING: Network context was not added because 
the file ipranges.csv does not exist.")
         
-        self._flow_scores.insert(0,flow_headers)
 
     def _is_ip_internal(self,ip, ranges):
         result = 0
@@ -205,14 +226,10 @@ class OA(object):
         # use ipranges to see if the IPs are internals.         
         iploc_file = 
"{0}/context/iploc.csv".format(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 
-        # add new headers (srcIpInternal/destIpInternal).     
         self._logger.info("Adding geo localization headers")
-        flow_headers = self._flow_scores[0]
-        flow_headers.extend(["srcGeo","dstGeo","srcDomain","dstDomain"]) 
 
         # add values to srcIpInternal and destIpInternal.
         flow_scores = iter(self._flow_scores)
-        next(flow_scores)
 
         if os.path.isfile(iploc_file):
 
@@ -242,17 +259,11 @@ class OA(object):
             self._flow_scores = [ conn + ["","","",""] for conn in flow_scores 
]   
             self._logger.info("WARNING: IP location was not added because the 
file {0} does not exist.".format(iploc_file))
 
-        self._flow_scores.insert(0,flow_headers)       
-
+        
     def _add_reputation(self):
         
         reputation_conf_file = 
"{0}/components/reputation/reputation_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         
-        # add new headers (gtiSrcRep/gtiDstRep).
-        self._logger.info("Adding reputation headers")
-        flow_headers_rep = self._flow_scores[0]
-        flow_headers_rep.extend(["srcIP_rep","dstIP_rep"])
-        
         # read configuration.
         self._logger.info("Reading reputation configuration file: 
{0}".format(reputation_conf_file))
         rep_conf = json.loads(open(reputation_conf_file).read())
@@ -269,7 +280,6 @@ class OA(object):
 
             self._logger.info("Getting GTI reputation for src IPs")
             flow_scores_src = iter(self._flow_scores)
-            next(flow_scores_src)
 
             # getting reputation for src IPs
             src_ips = [ conn[src_ip_index] for conn in flow_scores_src ]       
     
@@ -277,30 +287,25 @@ class OA(object):
 
             self._logger.info("Getting GTI reputation for dst IPs")
             flow_scores_dst = iter(self._flow_scores)
-            next(flow_scores_dst)
 
             # getting reputation for dst IPs            
             dst_ips = [  conn[dst_ip_index] for conn in flow_scores_dst ]
             dst_rep_results = flow_gti.check(dst_ips)
 
             flow_scores_final = iter(self._flow_scores)
-            next(flow_scores_final)
 
             self._flow_scores = []
-            flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + 
[dst_rep_results[conn[dst_ip_index]]]  for conn in  flow_scores_final ]
+            flow_scores = [conn + [src_rep_results[conn[src_ip_index]]] + 
[dst_rep_results[conn[dst_ip_index]]] for conn in flow_scores_final ]
             self._flow_scores = flow_scores           
             
         else:
             # add values to gtiSrcRep and gtiDstRep.
             flow_scores = iter(self._flow_scores)
-            next(flow_scores)
 
             self._flow_scores = [ conn + ["",""] for conn in flow_scores ]   
             self._logger.info("WARNING: IP reputation was not added. No 
refclient configured")  
 
 
-        self._flow_scores.insert(0,flow_headers_rep)       
-
     def _get_oa_details(self):
 
         self._logger.info("Getting OA Flow suspicious details/chord diagram")
@@ -320,8 +325,6 @@ class OA(object):
         
         # skip header
         sp_connections = iter(self._flow_scores)
-        next(sp_connections)
-      
         # loop connections.
         connections_added = [] 
         for conn in sp_connections:
@@ -331,7 +334,7 @@ class OA(object):
                 continue
             else:
                 connections_added.append(conn)
-           
+            
             src_ip_index = self._conf["flow_score_fields"]["srcIP"]
             dst_ip_index = self._conf["flow_score_fields"]["dstIP"]
 
@@ -341,34 +344,32 @@ class OA(object):
             dip = conn[dst_ip_index]
 
             # get hour and date  (i.e. 2014-07-08 10:10:40)
-            date_array = conn[1].split(' ')
+            
+            date_array = conn[0].split(' ')
             date_array_1 = date_array[0].split('-')
             date_array_2 = date_array[1].split(':')
-
+           
             yr = date_array_1[0]                   
             dy = date_array_1[2]
             mh = date_array_1[1]
 
             hr = date_array_2[0]
             mm = date_array_2[1]
-        
-            # connection details query.
-            sp_query = ("SELECT treceived as tstart,sip as srcip,dip as 
dstip,sport as sport,dport as dport,proto as proto,flag as flags,stos as 
TOS,ibyt as ibytes,ipkt as ipkts,input as input, output as output,rip as rip, 
obyt as obytes, opkt as opkts from {0}.{1} where ((sip='{2}' AND dip='{3}') or 
(sip='{3}' AND dip='{2}')) AND y={8} AND m={4} AND d={5} AND h={6} AND 
trminute={7} order by tstart limit 100")
-                 
-            # sp query.
-            sp_query = 
sp_query.format(self._db,self._table_name,sip,dip,mh,dy,hr,mm,yr)
-
-            # output file.
-            edge_file = 
"{0}/edge-{1}-{2}-{3}-{4}.tsv".format(self._data_path,sip.replace(".","_"),dip.replace(".","_"),hr,mm)
+            
+            query_to_load = ("""
+                INSERT INTO TABLE {0}.flow_edge PARTITION (y={2}, m={3}, d={4})
+                SELECT treceived as tstart,sip as srcip,dip as dstip,sport as 
sport,dport as dport,proto as proto,flag as flags,
+                stos as tos,ibyt as ibyt,ipkt as ipkt, input as input, output 
as output,rip as rip, obyt as obyt, 
+                opkt as opkt, h as hh, trminute as mn from {0}.{1} where 
((sip='{7}' AND dip='{8}') or (sip='{8}' AND dip='{7}')) 
+                AND y={2} AND m={3} AND d={4} AND h={5} AND trminute={6};
+                """).format(self._db,self._table_name,yr, mh, dy, hr, mm, 
sip,dip)
+            impala.execute_query(query_to_load)
+            
 
-            # execute query
-            self._engine.query(sp_query,output_file=edge_file,delimiter="\\t")
-    
     def _get_chord_details(self,bar=None):
 
          # skip header
         sp_connections = iter(self._flow_scores)
-        next(sp_connections) 
 
         src_ip_index = self._conf["flow_score_fields"]["srcIP"]
         dst_ip_index = self._conf["flow_score_fields"]["dstIP"] 
@@ -390,21 +391,24 @@ class OA(object):
             if n > 1:
                 ip_list = []                
                 sp_connections = iter(self._flow_scores)
-                next(sp_connections)
                 for row in sp_connections:                    
-                    if ip == row[2] : ip_list.append(row[3])
-                    if ip == row[3] :ip_list.append(row[2])    
+                    if ip == row[1] : ip_list.append(row[2])
+                    if ip == row[2] :ip_list.append(row[1])    
                 ips = list(set(ip_list))
              
                 if len(ips) > 1:
                     ips_filter = (",".join(str("'{0}'".format(ip)) for ip in 
ips))
-                    chord_file = 
"{0}/chord-{1}.tsv".format(self._data_path,ip.replace(".","_"))                 
    
-                    ch_query = ("SELECT sip as srcip, dip as dstip, SUM(ibyt) 
as ibytes, SUM(ipkt) as ipkts from {0}.{1} where y={2} and m={3} \
-                        and d={4} and ( (sip='{5}' and dip IN({6})) or (sip 
IN({6}) and dip='{5}') ) group by sip,dip")
-                    
self._engine.query(ch_query.format(self._db,self._table_name,yr,mn,dy,ip,ips_filter),chord_file,delimiter="\\t")
+ 
+                    query_to_load = ("""
+                        INSERT INTO TABLE {0}.flow_chords PARTITION (y={2}, 
m={3}, d={4})
+                        SELECT '{5}' as ip_threat, sip as srcip, dip as dstip, 
SUM(ibyt) as ibyt, SUM(ipkt) as ipkt from {0}.{1} where y={2} and m={3}
+                        and d={4} and ((sip='{5}' and dip IN({6})) or (sip 
IN({6}) and dip='{5}')) group by sip,dip,m,d;
+                        
""").format(self._db,self._table_name,yr,mn,dy,ip,ips_filter)
 
-     
-    def _ingest_summary(self): 
+                    impala.execute_query(query_to_load)
+ 
+ 
+    def _ingest_summary(self):
         # get date parameters.
         yr = self._date[:4]
         mn = self._date[4:6]
@@ -413,46 +417,52 @@ class OA(object):
         self._logger.info("Getting ingest summary data for the day")
         
         ingest_summary_cols = ["date","total"]         
-        result_rows = []       
-        df_filtered =  pd.DataFrame() 
-
-        ingest_summary_file = 
"{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)                     
 
-        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-        if os.path.isfile(ingest_summary_file):
-            df = pd.read_csv(ingest_summary_file, 
delimiter=',',names=ingest_summary_cols, skiprows=1)
-            df_filtered = df[df['date'].str.contains("{0}-{1}-{2}".format(yr, 
mn, dy)) == False] 
-        else:
-            df = pd.DataFrame()
+        result_rows = []        
+        df_filtered =  pd.DataFrame()
+
+        # get ingest summary.
+
+        query_to_load=("""
+                SELECT tryear, trmonth, trday, trhour, trminute, COUNT(*) as 
total
+                FROM {0}.{1} WHERE y={2} AND m={3} AND d={4}
+                AND unix_tstamp IS NOT NULL
+                AND sip IS NOT NULL
+                AND sport IS NOT NULL
+                AND dip IS NOT NULL
+                AND dport IS NOT NULL
+                AND ibyt IS NOT NULL
+                AND ipkt IS NOT NULL
+                AND tryear={2}
+                AND cast(treceived as timestamp) IS NOT NULL
+                GROUP BY tryear, trmonth, trday, trhour, trminute;
+        """).format(self._db,self._table_name, yr, mn, dy)
         
-        # get ingest summary.           
-        ingest_summary_qry = ("SELECT tryear, trmonth, trday, trhour, 
trminute, COUNT(*) total"
-                            " FROM {0}.{1} "
-                            " WHERE "
-                            " y={2} "
-                            " AND m={3} "
-                            " AND d={4} "
-                            " AND unix_tstamp IS NOT NULL AND sip IS NOT NULL "
-                            " AND sport IS NOT NULL AND dip IS NOT NULL "
-                            " AND dport IS NOT NULL AND ibyt IS NOT NULL "
-                            " AND ipkt IS NOT NULL "
-                            " GROUP BY tryear, trmonth, trday, trhour, 
trminute;")
-
-
-        ingest_summary_qry = 
ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-        results_file = 
"{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)
-        
self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
-
-        if os.path.isfile(results_file):
-            result_rows = pd.read_csv(results_file, delimiter=',') 
-
-            df_new = pd.DataFrame([["{0}-{1}-{2} {3}:{4}".format(yr, mn, dy, 
str(val['trhour']).zfill(2), str(val['trminute']).zfill(2)), int(val[5])] for 
key,val in result_rows.iterrows()],columns = ingest_summary_cols)               
                            
-
-            df_filtered = df_filtered.append(df_new, ignore_index=True)
-            df_filtered.to_csv(ingest_summary_tmp,sep=',', index=False)
-
-            os.remove(results_file)
-            os.rename(ingest_summary_tmp,ingest_summary_file)
+        results = impala.execute_query(query_to_load) 
+ 
+        if results:
+            df_results = as_pandas(results) 
+            
+            #Forms a new dataframe splitting the minutes from the time column
+            df_new = pd.DataFrame([["{0}-{1}-{2} 
{3}:{4}".format(val['tryear'],val['trmonth'],val['trday'], val['trhour'], 
val['trminute']), int(val['total']) if not math.isnan(val['total']) else 0 ] 
for key,val in df_results.iterrows()],columns = ingest_summary_cols)
+            value_string = ''
+            #Groups the data by minute 
+
+            sf = df_new.groupby(by=['date'])['total'].sum()
+            df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
+            
+            df_final = df_filtered.append(df_per_min, 
ignore_index=True).to_records(False,False) 
+            if len(df_final) > 0:
+                query_to_insert=("""
+                    INSERT INTO {0}.flow_ingest_summary PARTITION (y={1}, 
m={2}, d={3}) VALUES {4};
+                """).format(self._db, yr, mn, dy, tuple(df_final))
+
+                impala.execute_query(query_to_insert)
+                
         else:
             self._logger.info("No data found for the ingest summary")
 
-        
\ No newline at end of file
+
+
+ 
+
+        

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1241398a/spot-oa/oa/proxy/proxy_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/proxy_oa.py b/spot-oa/oa/proxy/proxy_oa.py
index d16769c..1fcef19 100644
--- a/spot-oa/oa/proxy/proxy_oa.py
+++ b/spot-oa/oa/proxy/proxy_oa.py
@@ -21,14 +21,18 @@ import json
 import shutil
 import sys
 import datetime
-import csv, math
+import csv, math 
 from collections import OrderedDict
 from utils import Util
 from components.data.data import Data
 from components.iana.iana_transform import IanaTransform
 from components.nc.network_context import NetworkContext
+
+import api.resources.hdfs_client as HDFSClient
+import api.resources.impala_engine as impala
 from multiprocessing import Process
 import pandas as pd 
+from impala.util import as_pandas
 
 import time
 import md5
@@ -48,7 +52,7 @@ class OA(object):
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        #self._table_name = "proxy"
+        self._table_name = "proxy"
         self._proxy_results = []
         self._limit = limit
         self._data_path = None
@@ -68,8 +72,6 @@ class OA(object):
 
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
-        self._table_name = self._spot_conf.get('conf', 'PROXY_TABLE')
-        self._engine = Data(self._db, self._table_name,self._logger)
 
 
     def start(self):
@@ -79,13 +81,12 @@ class OA(object):
         ####################
 
         self._create_folder_structure()
+        self._clear_previous_executions()   
         self._add_ipynb()
         self._get_proxy_results()
-        self._add_reputation()
-        self._add_severity()
+        self._add_reputation() 
         self._add_iana()
-        self._add_network_context()
-        self._add_hash()
+        self._add_network_context() 
         self._create_proxy_scores_csv()
         self._get_oa_details()
         self._ingest_summary()
@@ -103,12 +104,33 @@ class OA(object):
         self._data_path,self._ingest_summary_path,self._ipynb_path = 
Util.create_oa_folders("proxy",self._date)
 
 
+    def _clear_previous_executions(self):
+        
+        self._logger.info("Cleaning data from previous executions for the 
day")       
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:]  
+        table_schema = []
+        HUSER = self._spot_conf.get('conf', 'HUSER').replace("'", 
"").replace('"', '')
+        table_schema=['suspicious', 'edge','threat_investigation', 'timeline', 
'storyboard', 'summary' ] 
+
+        for path in table_schema:
+            
HDFSClient.delete_folder("{0}/{1}/hive/oa/{2}/y={3}/m={4}/d={5}".format(HUSER,self._table_name,path,yr,int(mn),int(dy)),user="impala")
        
+        impala.execute_query("invalidate metadata")
+
+        #removes Feedback file
+        
HDFSClient.delete_folder("{0}/{1}/scored_results/{2}{3}{4}/feedback/ml_feedback.csv".format(HUSER,self._table_name,yr,mn,dy))
+        #removes json files from the storyboard
+        
HDFSClient.delete_folder("{0}/{1}/oa/{2}/{3}/{4}/{5}".format(HUSER,self._table_name,"storyboard",yr,mn,dy))
+
+
+
     def _add_ipynb(self):
 
         if os.path.isdir(self._ipynb_path):
 
-            self._logger.info("Adding edge investigation IPython Notebook")
-            
shutil.copy("{0}/ipynb_templates/Edge_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Edge_Investigation.ipynb".format(self._ipynb_path))
+            self._logger.info("Adding advanced mode IPython Notebook")
+            
shutil.copy("{0}/ipynb_templates/Advanced_Mode_master.ipynb".format(self._scrtip_path),"{0}/Advanced_Mode.ipynb".format(self._ipynb_path))
 
             self._logger.info("Adding threat investigation IPython Notebook")
             
shutil.copy("{0}/ipynb_templates/Threat_Investigation_master.ipynb".format(self._scrtip_path),"{0}/Threat_Investigation.ipynb".format(self._ipynb_path))
@@ -141,23 +163,23 @@ class OA(object):
             self._logger.error("There was an error getting ML results from 
HDFS")
             sys.exit(1)
 
-        # add headers.
-        self._logger.info("Adding headers")
-        self._proxy_scores_headers = [  str(key) for (key,value) in 
self._conf['proxy_score_fields'].items() ]
-
         self._proxy_scores = self._proxy_results[:]
 
 
     def _create_proxy_scores_csv(self):
-
-        proxy_scores_csv = "{0}/proxy_scores.tsv".format(self._data_path)
-        proxy_scores_final = self._proxy_scores[:];
-        proxy_scores_final.insert(0,self._proxy_scores_headers)
-        Util.create_csv_file(proxy_scores_csv,proxy_scores_final, 
self._results_delimiter)
-
-        # create bk file
-        proxy_scores_bu_csv = "{0}/proxy_scores_bu.tsv".format(self._data_path)
-        Util.create_csv_file(proxy_scores_bu_csv,proxy_scores_final, 
self._results_delimiter)
+        # get date parameters.
+        yr = self._date[:4]
+        mn = self._date[4:6]
+        dy = self._date[6:] 
+        value_string = ""
+ 
+        for row in self._proxy_scores:
+            value_string += str(tuple(Util.cast_val(item) for item in row)) + 
","              
+    
+        load_into_impala = ("""
+             INSERT INTO {0}.proxy_scores partition(y={2}, m={3}, d={4}) 
VALUES {1}
+        """).format(self._db, value_string[:-1], yr, mn, dy) 
+        impala.execute_query(load_into_impala)
 
 
     def _add_reputation(self):
@@ -196,25 +218,22 @@ class OA(object):
                 for result in rep_services_results:
                     rep_results = {k: "{0}::{1}".format(rep_results.get(k, 
""), result.get(k, "")).strip('::') for k in set(rep_results) | set(result)}
 
-                self._proxy_scores = [ conn + [ rep_results[conn[key]] ]   for 
conn in self._proxy_scores  ]
+                if rep_results:
+                    self._proxy_scores = [ conn + [ rep_results[conn[key]] ]   
for conn in self._proxy_scores  ]
+                else:
+                    self._proxy_scores = [ conn + [""] for conn in 
self._proxy_scores  ]
         else:
             self._proxy_scores = [ conn + [""] for conn in self._proxy_scores  
]
 
 
-
-    def _add_severity(self):
-        # Add severity column
-        self._proxy_scores = [conn + [0] for conn in self._proxy_scores]
-
-
     def _add_iana(self):
 
         iana_conf_file = 
"{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         if os.path.isfile(iana_conf_file):
             iana_config  = json.loads(open(iana_conf_file).read())
             proxy_iana = IanaTransform(iana_config["IANA"])
-            proxy_rcode_index = self._conf["proxy_score_fields"]["respcode"]
-            self._proxy_scores = [ conn + [ 
proxy_iana.get_name(conn[proxy_rcode_index],"proxy_http_rcode")] for conn in 
self._proxy_scores ]
+            proxy_rcode_index = self._conf["proxy_score_fields"]["respcode"]   
         
+            self._proxy_scores = [ conn + 
[proxy_iana.get_name(conn[proxy_rcode_index],"proxy_http_rcode")] for conn in 
self._proxy_scores ]
         else:
             self._proxy_scores = [ conn + [""] for conn in self._proxy_scores ]
 
@@ -226,96 +245,70 @@ class OA(object):
             nc_conf = json.loads(open(nc_conf_file).read())["NC"]
             proxy_nc = NetworkContext(nc_conf,self._logger)
             ip_dst_index = self._conf["proxy_score_fields"]["clientip"]
-            self._proxy_scores = [ conn + 
[proxy_nc.get_nc(conn[ip_dst_index])] for conn in self._proxy_scores ]
-
+            self._proxy_scores = [ conn + 
[proxy_nc.get_nc(conn[ip_dst_index])] for conn in self._proxy_scores ] 
         else:
             self._proxy_scores = [ conn + [""] for conn in self._proxy_scores ]
 
 
-    def _add_hash(self):
-        #A hash string is generated to be used as the file name for the edge 
files.
-        #These fields are used for the hash creation, so this combination of 
values is treated as
-        #a 'unique' connection
-        cip_index = self._conf["proxy_score_fields"]["clientip"]
-        uri_index = self._conf["proxy_score_fields"]["fulluri"]
-        tme_index = self._conf["proxy_score_fields"]["p_time"]
-
-        self._proxy_scores = [conn + [str( md5.new(str(conn[cip_index]) + 
str(conn[uri_index])).hexdigest() + str((conn[tme_index].split(":"))[0]) )] for 
conn in self._proxy_scores]
-
-
     def _get_oa_details(self):
 
         self._logger.info("Getting OA Proxy suspicious details")
         # start suspicious connects details process.
         p_sp = Process(target=self._get_suspicious_details)
         p_sp.start()
-
-        # p_sp.join()
+ 
 
     def _get_suspicious_details(self):
-        hash_list = []
+        uri_list = []
         iana_conf_file = 
"{0}/components/iana/iana_config.json".format(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
         if os.path.isfile(iana_conf_file):
             iana_config  = json.loads(open(iana_conf_file).read())
             proxy_iana = IanaTransform(iana_config["IANA"])
 
         for conn in self._proxy_scores:
-            conn_hash = conn[self._conf["proxy_score_fields"]["hash"]]
-            if conn_hash not in hash_list:
-                hash_list.append(conn_hash)
-                clientip = conn[self._conf["proxy_score_fields"]["clientip"]]
-                fulluri = conn[self._conf["proxy_score_fields"]["fulluri"]]
-                
date=conn[self._conf["proxy_score_fields"]["p_date"]].split('-')
-                if len(date) == 3:
-                    year=date[0]
-                    month=date[1].zfill(2)
-                    day=date[2].zfill(2)
-                    
hh=(conn[self._conf["proxy_score_fields"]["p_time"]].split(":"))[0]
-                    
self._get_proxy_details(fulluri,clientip,conn_hash,year,month,day,hh,proxy_iana)
-
-
-    def 
_get_proxy_details(self,fulluri,clientip,conn_hash,year,month,day,hh,proxy_iana):
-
-        limit = 250
-        output_delimiter = '\t'
-        edge_file 
="{0}/edge-{1}-{2}.tsv".format(self._data_path,clientip,conn_hash)
-        edge_tmp  
="{0}/edge-{1}-{2}.tmp".format(self._data_path,clientip,conn_hash)
-
-        if not os.path.isfile(edge_file):
-            proxy_qry = ("SELECT p_date, p_time, clientip, host, webcat, 
respcode, reqmethod, useragent, resconttype, \
-                referer, uriport, serverip, scbytes, csbytes, fulluri FROM 
{0}.{1} WHERE y=\'{2}\' AND m=\'{3}\' AND d=\'{4}\' AND \
-                h=\'{5}\' AND fulluri =\'{6}\' AND clientip = \'{7}\' LIMIT 
{8};").format(self._db,self._table_name, 
year,month,day,hh,fulluri,clientip,limit)
-
-            # execute query
-            self._engine.query(proxy_qry,edge_tmp,output_delimiter)
-            # add IANA to results.
+            clientip = conn[self._conf["proxy_score_fields"]["clientip"]]
+            fulluri = conn[self._conf["proxy_score_fields"]["fulluri"]]
+            date=conn[self._conf["proxy_score_fields"]["p_date"]].split('-')
+            if len(date) == 3:
+                year=date[0]
+                month=date[1].zfill(2)
+                day=date[2].zfill(2)
+                
hh=(conn[self._conf["proxy_score_fields"]["p_time"]].split(":"))[0]
+                
self._get_proxy_details(fulluri,clientip,year,month,day,hh,proxy_iana)
+
+
+    def _get_proxy_details(self,fulluri,clientip,year,month,day,hh,proxy_iana):
+        limit = 250 
+        value_string = ""
+        
+        query_to_load =("""
+            SELECT p_date, p_time, clientip, host, webcat, respcode, 
reqmethod, useragent, resconttype,
+            referer, uriport, serverip, scbytes, csbytes, fulluri, {5} as hh
+            FROM {0}.{1} WHERE y='{2}' AND m='{3}' AND d='{4}' AND
+            h='{5}' AND fulluri='{6}' AND clientip='{7}' LIMIT {8};
+        """).format(self._db,self._table_name, 
year,month,day,hh,fulluri.replace("'","\\'"),clientip,limit)
+
+        detail_results = impala.execute_query(query_to_load)
+ 
+        if proxy_iana:
+             # add IANA to results.
             self._logger.info("Adding IANA translation to details results")
-            with open(edge_tmp) as proxy_details_csv:
-                rows = csv.reader(proxy_details_csv, 
delimiter=output_delimiter,quotechar='"')
-                next(proxy_details_csv)
-                update_rows = [[conn[0]] + [conn[1]] + [conn[2]] + [conn[3]] + 
[conn[4]] + [proxy_iana.get_name(conn[5],"proxy_http_rcode") if proxy_iana else 
conn[5]] + [conn[6]] + [conn[7]] + [conn[8]] + [conn[9]] + [conn[10]] + 
[conn[11]] + [conn[12]] + [conn[13]] + [conn[14]] if len(conn) > 0 else [] for 
conn in rows]
-                update_rows = filter(None, update_rows)
-                header = 
["p_date","p_time","clientip","host","webcat","respcode","reqmethod","useragent","resconttype","referer","uriport","serverip","scbytes","csbytes","fulluri"]
-                update_rows.insert(0,header)
-
-               # due an issue with the output of the query.
-               update_rows = [ [ w.replace('"','') for w in l ] for l in 
update_rows ]
-       
-
-            # create edge file.
-            self._logger.info("Creating edge file:{0}".format(edge_file))
-            with open(edge_file,'wb') as proxy_details_edge:
-                writer = csv.writer(proxy_details_edge, 
quoting=csv.QUOTE_NONE, delimiter=output_delimiter)
-                if update_rows:
-                    writer.writerows(update_rows)
-                else:
-                    shutil.copy(edge_tmp,edge_file)
-
-            try:
-                os.remove(edge_tmp)
-            except OSError:
-                pass
+ 
+            updated_rows = [conn + 
(proxy_iana.get_name(conn[5],"proxy_http_rcode"),) for conn in detail_results]
+            updated_rows = filter(None, updated_rows)            
+        else:
+            updated_rows = [conn + ("") for conn in detail_results ]
+ 
+        for row in updated_rows:
+            value_string += str(tuple(item for item in row)) + ","     
+        
+        if value_string != "":  
+            query_to_insert=("""
+                INSERT INTO {0}.proxy_edge PARTITION (y={1}, m={2}, d={3}) 
VALUES ({4});
+            """).format(self._db,year, month, day, value_string[:-1])
 
+            impala.execute_query(query_to_insert) 
+ 
 
     def _ingest_summary(self): 
         # get date parameters.
@@ -329,44 +322,36 @@ class OA(object):
         result_rows = []        
         df_filtered =  pd.DataFrame()
 
-        ingest_summary_file = 
"{0}/is_{1}{2}.csv".format(self._ingest_summary_path,yr,mn)                     
 
-        ingest_summary_tmp = "{0}.tmp".format(ingest_summary_file)
-
-        if os.path.isfile(ingest_summary_file):
-               df = pd.read_csv(ingest_summary_file, delimiter=',')
-            #discards previous rows from the same date
-               df_filtered = 
df[df['date'].str.contains("{0}-{1}-{2}".format(yr, mn, dy)) == False] 
-        else:
-               df = pd.DataFrame()
-            
         # get ingest summary.
-        ingest_summary_qry = ("SELECT p_date, p_time, COUNT(*) as total "
-                                    " FROM {0}.{1}"
-                                    " WHERE y='{2}' AND m='{3}' AND d='{4}' "
-                                    " AND p_date IS NOT NULL AND p_time IS NOT 
NULL " 
-                                    " AND clientip IS NOT NULL AND p_time != 
'' "
-                                    " AND host IS NOT NULL AND fulluri IS NOT 
NULL "
-                                    " GROUP BY p_date, p_time;") 
-
-        ingest_summary_qry = 
ingest_summary_qry.format(self._db,self._table_name, yr, mn, dy)
-        results_file = 
"{0}/results_{1}.csv".format(self._ingest_summary_path,self._date)        
-        
self._engine.query(ingest_summary_qry,output_file=results_file,delimiter=",")
+
+        query_to_load=("""
+                SELECT p_date, p_time, COUNT(*) as total
+                FROM {0}.{1} WHERE y='{2}' AND m='{3}' AND d='{4}'
+                AND p_date IS NOT NULL AND p_time IS NOT NULL
+                AND clientip IS NOT NULL AND p_time != ''
+                AND host IS NOT NULL AND fulluri IS NOT NULL
+                GROUP BY p_date, p_time;
+        """).format(self._db,self._table_name, yr, mn, dy)
         
-        if os.path.isfile(results_file):
-            df_results = pd.read_csv(results_file, delimiter=',')  
-            
+        results = impala.execute_query(query_to_load) 
+ 
+        if results:
+            df_results = as_pandas(results)
             #Forms a new dataframe splitting the minutes from the time column/
             df_new = pd.DataFrame([["{0} {1}:{2}".format(val['p_date'], 
val['p_time'].split(":")[0].zfill(2), val['p_time'].split(":")[1].zfill(2)), 
int(val['total']) if not math.isnan(val['total']) else 0 ] for key,val in 
df_results.iterrows()],columns = ingest_summary_cols)
-            
+            value_string = ''
             #Groups the data by minute 
             sf = df_new.groupby(by=['date'])['total'].sum()
             df_per_min = pd.DataFrame({'date':sf.index, 'total':sf.values})
             
-            df_final = df_filtered.append(df_per_min, ignore_index=True)
-            df_final.to_csv(ingest_summary_tmp,sep=',', index=False)
-
-            os.remove(results_file)
-            os.rename(ingest_summary_tmp,ingest_summary_file)
+            df_final = df_filtered.append(df_per_min, 
ignore_index=True).to_records(False,False) 
+            if len(df_final) > 0:
+                query_to_insert=("""
+                    INSERT INTO {0}.proxy_ingest_summary PARTITION (y={1}, 
m={2}, d={3}) VALUES {4};
+                """).format(self._db, yr, mn, dy, tuple(df_final))
+
+                impala.execute_query(query_to_insert) 
+                
         else:
             self._logger.info("No data found for the ingest summary")
-        
\ No newline at end of file
+        

Reply via email to