http://www.mediawiki.org/wiki/Special:Code/MediaWiki/89233
Revision: 89233
Author: rfaulk
Date: 2011-05-31 22:47:26 +0000 (Tue, 31 May 2011)
Log Message:
-----------
added log copying functionality as copy_logs method in DataMapper class
Modified Paths:
--------------
trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py
Modified: trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py
===================================================================
--- trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py
2011-05-31 22:44:59 UTC (rev 89232)
+++ trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py
2011-05-31 22:47:26 UTC (rev 89233)
@@ -19,17 +19,19 @@
import math
import commands
-import cgi # web queries
-import re # regular expression matching
-import gzip # process gzipped logs
+import cgi
+import re
+import gzip
+import os
-import MySQLdb # db access
+import MySQLdb
import datetime
import Fundraiser_Tools.classes.DataLoader as DL
import Fundraiser_Tools.classes.Helper as Hlp
import Fundraiser_Tools.settings as projSet
import Fundraiser_Tools.classes.TimestampProcessor as TP
+import Fundraiser_Tools.classes.FundraiserDataHandler as FDH
"""
@@ -38,35 +40,120 @@
Base class for interacting with DataSource. Methods that are intended to
be extended in derived classes include:
METHODS:
-
+
+ copy_logs - copies logs from source to
destination for processing
"""
class DataMapper(object):
- _squid_log_directory_ = projSet.__project_home__ + 'logs/'
-
"""
Copies mining logs from remote site
"""
- def copy_logs(self):
- return
+ def copy_logs(self, type):
+
+ # '/archive/udplogs'
+
+ now = datetime.datetime.now()
+
+ if type == FDH._TESTTYPE_BANNER_:
+ prefix = 'bannerImpressions-'
+ elif type == FDH._TESTTYPE_LP_:
+ prefix = 'landingpages-'
+
+ if now.hour > 12:
+ filename = prefix + str(now.year) + '-' + str(now.month) + '-' +
str(now.day) + '-' + str(now.hour - 12) + 'PM*'
+ else:
+ filename = prefix + str(now.year) + '-' + str(now.month) + '-' +
str(now.day) + '-' + str(now.hour) + 'AM*'
+
+ filename = 'bannerImpressions-2011-05-27-11PM--25*'
+
+ cmd = 'sftp ' + projSet.__user__ + '@' + projSet.__squid_log_server__
+ ':' + projSet.__squid_log_home__ + filename + ' ' +
projSet.__squid_log_local_home__
+
+ os.system(cmd)
+
+ return filename
+
+"""
+
+ CLASS :: FundraiserDataMapper
+
+ Data mapper specific to the Wikimedia Fundraiser
+
+ METHODS:
+
+ mine_squid_impression_requests - mining banner impressions from
squid logs
+ mine_squid_landing_page_requests - mining landing page views from
squid logs
+
+"""
+class FundraiserDataMapper(DataMapper):
+
+ _db = None
+ _cur = None
+
+ _squid_log_directory_ = projSet.__project_home__ + 'logs/'
+ _impression_table_name_ = 'banner_impressions'
+ _landing_page_table_name_ = 'landing_page_requests'
+
+ _BANNER_REQUEST_ = 0
+ _LP_REQUEST_ = 1
+
+ _BANNER_FIELDS_ = ' (start_timestamp, utm_source, referrer, country,
lang, counts, on_minute) '
+ _LP_FIELDS_ = ' (start_timestamp, utm_source, utm_campaign, utm_medium,
landing_page, page_url, referrer_url, browser, lang, country, project, ip,
request_time) '
+
+
+ """ !! MODIFY -- use dataloaders! """
+ def _init_db(self):
+ self._db = MySQLdb.connect(host='127.0.0.1', user='rfaulk',
db='faulkner', port=3307)
+ self._cur = self._db.cursor()
+
+ """ !! MODIFY -- use dataloaders! """
+ def _close_db(self):
+ self._cur.close()
+ self._db.close()
+
+
+
+ def _clear_squid_records(self, start, request_type):
+
+
+ """ Ensure that the range is correct; otherwise abort - critical that
outside records are not deleted """
+ timestamp = TP.timestamp_convert_format(start,1,2)
+
+ if request_type == self._BANNER_REQUEST_:
+ deleteStmnt = 'delete from ' + self._impression_table_name_ + '
where start_timestamp = \'' + timestamp + '\';'
+ elif request_type == self._LP_REQUEST_:
+ deleteStmnt = 'delete from ' + self._landing_page_table_name_ + '
where start_timestamp = \'' + timestamp + '\';'
+
+ try:
+ self._cur.execute(deleteStmnt)
+ print >> sys.stdout, "Executed delete from impression: " +
deleteStmnt
+ except:
+ print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt +
"\nResuming insert ..."
+ pass
+
+
+
"""
"""
def mine_squid_impression_requests(self, logFileName):
- """ !! MODIFY -- use dataloaders! """
- db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner',
port=3307)
- cur = db.cursor()
+ self._init_db()
sltl = DL.SquidLogTableLoader()
itl = DL.ImpressionTableLoader()
-
+
+ """ Retrieve the log timestamp from the filename """
+ time_stamps = Hlp.get_timestamps(logFileName)
+
+ start = time_stamps[0]
+ end = time_stamps[1]
+ start_timestamp_in = "convert(\'" + start + "\', datetime)"
curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3)
-
- # Initialization - open the file
- # logFileName = sys.argv[1];
+
+ """ Initialization - open the file
+ logFileName = sys.argv[1]; """
if (re.search('\.gz', logFileName)):
logFile = gzip.open(self._squid_log_directory_ + logFileName, 'r')
total_lines_in_file = float(commands.getstatusoutput('zgrep -c ""
' + self._squid_log_directory_ + logFileName)[1])
@@ -77,34 +164,15 @@
queryIndex = 4;
counts = Hlp.AutoVivification()
- insertStmt = 'INSERT INTO impression (utm_source, referrer, country,
lang, counts, on_minute) values '
+ insertStmt = 'INSERT INTO ' + self._impression_table_name_ +
self._BANNER_FIELDS_ + ' values '
min_log = -1
hr_change = 0
clamp = 0
- """ Clear the records for hour ahead of adding """
- time_stamps = Hlp.get_timestamps(logFileName)
+ """ Clear the old records """
+ self._clear_squid_records(start, self._BANNER_REQUEST_)
- start = time_stamps[0]
- end = time_stamps[1]
-
- # Ensure that the range is correct; otherwise abort - critical that
outside records are not deleted
- time_diff = Hlp.get_timestamps_diff(start, end)
-
- if math.fabs(time_diff) <= 1.0:
- deleteStmnt = 'delete from impression where on_minute >= \'' +
start + '\' and on_minute < \'' + end + '\';'
-
- try:
- # cur.execute(deleteStmnt)
- print >> sys.stdout, "Executed delete from impression: " +
deleteStmnt
- except:
- print >> sys.stderr, "Could not execute delete:\n" +
deleteStmnt + "\nResuming insert ..."
- pass
- else:
- print >> sys.stdout, "Could not execute delete statement, DIFF too
large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = "
+ end + "\nResuming insert ..."
-
-
""" Add a row to the SquidLogTable """
sltl.insert_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
@@ -116,11 +184,6 @@
while (line != ''):
lineArgs = line.split()
-
- # Filter out time data by minute -- if the time is not properly
formatted skip the record
- # 2010-11-12T20:56:43.237
- if line_count > 88364:
- print line
try:
time_stamp = lineArgs[2]
@@ -227,12 +290,12 @@
count = langCounts[lang]
try:
- val = '(\'' + banner + '\',\'' + project +
'\',\'' + country + '\',\'' + lang + '\',' \
+ val = '(' + start_timestamp_in + ',\'' +
banner + '\',\'' + project + '\',\'' + country + '\',\'' + lang + '\',' \
+ str(count) + ',' + time_stamp_in + ');'
- cur.execute(insertStmt + val)
+ self._cur.execute(insertStmt + val)
except:
- db.rollback()
+ self._db.rollback()
sys.exit("Database Interface Exception -
Could not execute statement:\n" + insertStmt + val)
# Re-initialize counts
@@ -246,21 +309,25 @@
sltl.update_table_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__())
- cur.close()
- db.close()
+ self._close_db()
"""
"""
def mine_squid_landing_page_requests(self, logFileName):
- """ !! MODIFY -- use dataloaders! """
- db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner',
port=3307)
- cur = db.cursor()
+ self._init_db()
sltl = DL.SquidLogTableLoader()
lptl = DL.LandingPageTableLoader()
+
+ """ Retrieve the log timestamp from the filename """
+ time_stamps = Hlp.get_timestamps(logFileName)
+
+ start = time_stamps[0]
+ end = time_stamps[1]
+ start_timestamp_in = "convert(\'" + start + "\', datetime)"
curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3)
count_parse = 0
@@ -281,39 +348,18 @@
""" SQL Statements """
- insertStmt_lp = 'INSERT INTO landing_page (utm_source, utm_campaign,
utm_medium, landing_page,' + \
- 'page_url, referrer_url, browser, lang, country, project, ip,
request_time) values '
+ insertStmt_lp = 'INSERT INTO ' + self._landing_page_table_name_ +
self._LP_FIELDS_ + ' values '
- """ Clear the records for hour ahead of adding """
- time_stamps = Hlp.get_timestamps(logFileName)
+ """ Clear the old records """
+ self._clear_squid_records(start, self._LP_REQUEST_)
- start = time_stamps[0]
- end = time_stamps[1]
-
- # Ensure that the range is correct; otherwise abort - critical that
outside records are not deleted
- time_diff = Hlp.get_timestamps_diff(start, end)
-
- if math.fabs(time_diff) <= 1.0:
- deleteStmnt = 'delete from landing_page where request_time >= \''
+ start + '\' and request_time < \'' + end + '\';'
-
- try:
- # cur.execute(deleteStmnt)
- print >> sys.stdout, "Executed delete from landing page: " +
deleteStmnt
- except:
- print >> sys.stderr, "Could not execute delete:\n" +
deleteStmnt + "\nResuming insert ..."
- pass
- else:
- print >> sys.stdout, "Could not execute delete statement, DIFF too
large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = "
+ end + "\nResuming insert ..."
-
-
+ """ Add a row to the SquidLogTable """
+
sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
+
count_correct = 0
count_total = 0
line_count = 0
- """ Add a row to the SquidLogTable """
-
sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
-
-
# PROCESS LOG FILE
# ================
line = logFile.readline()
@@ -462,7 +508,7 @@
except:
landing_page = 'NONE'
- country = Hlp.localize_IP(cur, ip_add)
+ country = Hlp.localize_IP(self._cur, ip_add)
else: # ...wikimediafoundation.org/wiki/...
@@ -483,11 +529,11 @@
country = landing_path[3]
except:
- country = Hlp.localize_IP(cur, ip_add)
+ country = Hlp.localize_IP(self._cur, ip_add)
# If country is confused with the language use the ip
if country == country.lower():
- country = Hlp.localize_IP(cur, ip_add)
+ country = Hlp.localize_IP(self._cur, ip_add)
# ensure fields exist
try:
@@ -502,12 +548,12 @@
# INSERT INTO landing_page ('utm_source', 'utm_campaign',
'utm_medium', 'landing_page', 'page_url', 'lang', 'project', 'ip') values ()
try:
- val = '(\'' + utm_source + '\',\'' + utm_campaign +
'\',\'' + utm_medium + '\',\'' + landing_page + \
+ val = '(' + start_timestamp_in + ',\'' + utm_source +
'\',\'' + utm_campaign + '\',\'' + utm_medium + '\',\'' + landing_page + \
'\',\'' + landing_url + '\',\'' + referrer_url + '\',\'' +
browser + '\',\'' + source_lang + '\',\'' + country + '\',\'' \
+ project + '\',\'' + ip_add + '\',' + 'convert(\'' +
timestamp_string + '\', datetime)' + ');'
#print insertStmt + val
- cur.execute(insertStmt_lp + val)
+ self._cur.execute(insertStmt_lp + val)
except:
print "Could not insert:\n" + insertStmt_lp + val
@@ -520,7 +566,5 @@
completion = float(line_count / total_lines_in_file) * 100.0
sltl.update_table_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__())
- cur.close()
- db.close()
-
+ self._close_db()
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs