Yurik has uploaded a new change for review.
https://gerrit.wikimedia.org/r/168980
Change subject: log2dfs script
......................................................................
log2dfs script
Change-Id: Iec7779d7392e338410755ba19358810d02d668a0
---
M scripts/clone-xcs.hql
A scripts/log2dfs.py
M scripts/run-hivezero.sh
3 files changed, 273 insertions(+), 29 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/zero-sms
refs/changes/80/168980/1
diff --git a/scripts/clone-xcs.hql b/scripts/clone-xcs.hql
index 5e7f500..39dc43c 100644
--- a/scripts/clone-xcs.hql
+++ b/scripts/clone-xcs.hql
@@ -5,35 +5,41 @@
-- Clone one day worth of data to a temp table
--
-- Usage:
--- hive -f clone-xcs.hql -d year=2014 -d month=9 -d day=15 -d xcs=515-05
-d table=tmp_clone
+-- hive -f clone-xcs.hql -d year=2014 -d month=10 -d day=11 -d hour=12 -d
xcs=515-05 -d table=tmp_clone
--
-- set hivevar:year=2014;
-- set hivevar:month=10;
--- set hivevar:day=21;
+-- set hivevar:day=11;
+-- set hivevar:hour=12;
-- set hivevar:xcs=515-05;
-- set hivevar:table=tmp_clone;
use yurik;
CREATE TABLE IF NOT EXISTS ${table} (
- hostname string,
- sequence bigint,
- dt string,
- time_firstbyte float,
- ip string,
- cache_status string,
- http_status string,
- response_size bigint,
- http_method string,
- uri_host string,
- uri_path string,
- uri_query string,
- content_type string,
- referer string,
- x_forwarded_for string,
- user_agent string,
- accept_language string,
- x_analytics string)
+ `hostname` string,
+ `sequence` bigint,
+ `dt` string,
+ `time_firstbyte` float,
+ `ip` string,
+ `cache_status` string,
+ `http_status` string,
+ `response_size` bigint,
+ `http_method` string,
+ `uri_host` string,
+ `uri_path` string,
+ `uri_query` string,
+ `content_type` string,
+ `referer` string,
+ `x_forwarded_for` string,
+ `user_agent` string,
+ `accept_language` string,
+ `x_analytics` string,
+ `webrequest_source` string,
+ `year` int,
+ `month` int,
+ `day` int,
+ `hour` int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
@@ -47,6 +53,7 @@
AND year=${year}
AND month=${month}
AND day=${day}
+ AND hour=${hour}
AND x_analytics LIKE '%zero=%'
AND SUBSTR(uri_path, 1, 6) = '/wiki/'
AND (
@@ -65,5 +72,5 @@
AND http_status NOT IN ( '301', '302', '303' )
AND uri_host RLIKE '^[A-Za-z0-9-]+(\\.(zero|m))?\\.[a-z]*\\.org$'
AND NOT (SPLIT(TRANSLATE(SUBSTR(uri_path, 7), ' ', '_'), '#')[0]
RLIKE '^[Uu]ndefined$')
- AND regexp_extract(x_analytics, 'zero=([^\;]+)') = ${xcs}
-
+ AND regexp_extract(x_analytics, 'zero=([^\;]+)') = '${xcs}'
+;
diff --git a/scripts/log2dfs.py b/scripts/log2dfs.py
new file mode 100644
index 0000000..4050b46
--- /dev/null
+++ b/scripts/log2dfs.py
@@ -0,0 +1,236 @@
+# coding=utf-8
+import gzip
+import re
+
+try:
+ from urllib.parse import unquote
+except ImportError:
+ from urllib import unquote
+
+from logprocessor import *
+
+
+columnHdrCache =
u'date,type,xcs,via,ipset,https,lang,subdomain,site,count'.split(',')
+columnHdrResult =
u'date,type,xcs,via,ipset,https,lang,subdomain,site,iszero,ison,count'.split(',')
+validSubDomains = {'m', 'zero', 'mobile', 'wap'}
+validHttpCode = {'200', '304'}
+
+
+def isValidInt(val):
+ try:
+ int(val)
+ return True
+ except ValueError:
+ return False
+
+
+xcsFromName = {
+ 'celcom-malaysia': '502-13',
+ 'dialog-sri-lanka': '413-02',
+ 'digi-malaysia': '502-16',
+ 'dtac-thailand': '520-18',
+ 'grameenphone-bangladesh': '470-01',
+ 'hello-cambodia': '456-02',
+ 'orange-botswana': '652-02',
+ 'orange-cameroon': '624-02',
+ 'orange-congo': '630-86',
+ 'orange-ivory-coast': '612-03',
+ 'orange-kenya': '639-07', # Also 639-02 (Safaricom Kenya)
+ 'orange-morocco': '604-00',
+ 'orange-niger': '614-04',
+ 'orange-tunesia': '605-01', # Also 639-02 (Safaricom Kenya)
+ 'orange-uganda': '641-14',
+ 'saudi-telecom': '420-01', # Also 652-02 (orange-botswana)
+ 'tata-india': '405-029',
+ 'telenor-montenegro': '297-01', # Also 250-99 (beeline ru)
+ 'tim-brasil': '724-02',
+ 'vodaphone-india': '404-01',
+ 'xl-axiata-indonesia': '510-11',
+}
+
+httpStatuses = {
+ '-': '',
+ 'hit': 'hit',
+ 'miss': 'miss',
+ 'pass': 'pass',
+ 'TCP_CLIENT_REFRESH_MISS': 'miss',
+ 'TCP_MEM_HIT': 'hit',
+ 'TCP_MISS': 'miss',
+ 'TCP_REFRESH_HIT': 'hit',
+ 'FAKE_CACHE_STATUS': '',
+ 'TCP_HIT': 'hit',
+ 'TCP_IMS_HIT': 'hit',
+ 'TCP_DENIED': '',
+ 'TCP_REFRESH_MISS': 'miss',
+ 'TCP_NEGATIVE_HIT': 'hit',
+}
+
+
+class LogConverter(LogProcessor):
+ def __init__(self, settingsFile='settings/log2dfs.json'):
+ super(LogConverter, self).__init__(settingsFile, 'w2h')
+
+ self.logFileRe = re.compile(r'\d\d\d\d\d\d\d\d')
+ self.dateRe = re.compile(r'(201\d-\d\d-\d\dT\d\d):\d\d:\d\d(\.\d+)?')
+ self.urlRe = re.compile(r'^(https?)://([^/]+)([^?#]*)(.*)',
re.IGNORECASE)
+
+ def processLogFiles(self):
+
+ safePrint('Processing log files')
+ for f in os.listdir(self.pathLogs):
+
+ if not self.logFileRe.match(f):
+ continue
+ logFile = os.path.join(self.pathLogs, f)
+ statFile = os.path.join(self.pathCache, f)
+ if statFile.endswith('.gz'):
+ statFile = statFile[:-3]
+
+ # if not os.path.exists(statFile):
+ if True:
+ self.processLogFile(logFile, statFile)
+
+ def processLogFile(self, logFile, statFile):
+ """
+ 0 cp1046.eqiad.wmnet
+ 1 13866141087
+ 2 2014-08-07T06:30:46
+ 3 0.000130653
+ 4 <ip>
+ 5 hit/301
+ 6 0
+ 7 GET
+ 8 http://en.m.wikipedia.org/wiki/Royal_Challenge
+ 9 -
+ 10 text/html; charset=UTF-8
+ 11 http://en.m.wikipedia.org/wiki/Royal_Challenge
+ 12 -
+ 13 Mozilla/5.0 (Linux; U; Android 2.3.5; en-us; ...
+ .. Version/4.0 Mobile Safari/534.30
+ -2 en-US
+ -1 zero=410-01
+ """
+
+ safePrint('Processing %s' % logFile)
+ count = 0
+ isTab = '.tsv' in logFile or '.tab' in logFile
+ isNoOpera = 'no_opera' in logFile
+ webrequest_source = 'mobile'
+ lastDate = year = month = day = hour = None
+ xcsWarns = set()
+
+ defaultXcs = None
+ for k, v in xcsFromName.iteritems():
+ if k in logFile:
+ defaultXcs = u'zero=' + v
+ break
+
+ if logFile.endswith('.gz'):
+ streamData =
io.TextIOWrapper(io.BufferedReader(gzip.open(logFile)), encoding='utf8',
errors='ignore')
+ else:
+ streamData = io.open(logFile, 'r', encoding='utf8',
errors='ignore')
+
+ with io.open(statFile, 'w', encoding='utf8') as out:
+ for line in streamData:
+ count += 1
+ if count % 1000000 == 0:
+ safePrint('%d lines processed' % count)
+
+ strip = line.strip('\n\r')
+ if isTab:
+ l = strip.split('\t')
+ if l[2].startswith('201'):
+ while len(l) > 16:
+ l[13] += ' ' + l[14]
+ del l[11]
+ else:
+ l = strip.split(' ')
+ # fix text/html; charset=UTF-8 into one field
+ while len(l) >= 10 and l[10].endswith(';') and l[11] !=
'-' and not l[11].startswith('http'):
+ l[10] += ' ' + l[11]
+ del l[11]
+ l[13] = unquote(l[13])
+ if len(l) == 14:
+ l.append(u'')
+ l.append(u'')
+
+ partsCount = len(l)
+ if partsCount != 16:
+ safePrint(u'Wrong parts count - %d parts\n%s' %
(partsCount, line))
+ continue
+
+ l = ['' if v == '-' else v for v in l]
+ (hostname, sequence, dt, time_firstbyte, ip, status,
response_size, http_method, uri, unknown1,
+ content_type, referer, x_forwarded_for, user_agent,
accept_language, x_analytics) = l
+ # status -> cache_status, http_status
+ # uri -> uri_host, uri_path, uri_query
+ # new: webrequest_source, year, month, day, hour
+
+ m = self.dateRe.match(dt)
+ if not m:
+ safePrint(u'Invalid date\n%s' % line)
+ continue
+ if lastDate != m.group(1):
+ lastDate = m.group(1)
+ d = datetime.strptime(lastDate, r'%Y-%m-%dT%H')
+ year = unicode(d.year)
+ month = unicode(d.month)
+ day = unicode(d.day)
+ hour = unicode(d.hour)
+
+ if 'zero=' not in x_analytics:
+ if defaultXcs:
+ if x_analytics:
+ x_analytics += ';'
+ x_analytics += defaultXcs
+ else:
+ safePrint(u'String too short - %d parts\n%s' %
(partsCount, line))
+ continue
+ elif defaultXcs and x_analytics not in xcsWarns:
+ if defaultXcs not in x_analytics:
+ safePrint(u'Warning: XCS mismatch, expecting "%s",
found "%s"' % (defaultXcs, x_analytics))
+ else:
+ safePrint(u'Warning: XCS confirmed, found expected
"%s"' % defaultXcs)
+ xcsWarns.add(x_analytics)
+
+ # expand "hit/200" into "hit", "200"
+ tmp = status.split(u'/')
+ if len(tmp) != 2 or not isValidInt(tmp[1]):
+ safePrint(u'Invalid status - "%s"\n%s' % (status, line))
+ continue
+ (cache_status, http_status) = tmp
+ if cache_status not in httpStatuses:
+ safePrint(u'Unknown cache_status - "%s"\n%s' %
(cache_status, line))
+ continue
+ cache_status = httpStatuses[cache_status]
+
+ m = self.urlRe.match(uri)
+ if not m:
+ safePrint(u'URL parsing failed: "%s"\n%s' % (uri, line))
+ continue
+ if m.group(1).lower() == u'https' and u'https=' not in
x_analytics:
+ x_analytics += u'https=1'
+ uri_host = m.group(2)
+ if uri_host.endswith(':80'):
+ uri_host = uri_host[:-3]
+ if uri_host.endswith('.'):
+ uri_host = uri_host[:-1]
+ uri_path = m.group(3)
+ uri_query = m.group(4)
+
+ result = '\t'.join(
+ [hostname, sequence, dt, time_firstbyte, ip, cache_status,
http_status, response_size, http_method,
+ uri_host, uri_path, uri_query, content_type, referer,
x_forwarded_for, user_agent, accept_language,
+ x_analytics, webrequest_source, year, month, day, hour])
+ out.write(result + '\n')
+
+ def run(self):
+ self.processLogFiles()
+
+ def manualRun(self):
+ self.processLogFiles()
+
+
+if __name__ == '__main__':
+ # LogConverter().manualRun()
+ LogConverter().safeRun()
diff --git a/scripts/run-hivezero.sh b/scripts/run-hivezero.sh
index 0df0a65..ab81156 100755
--- a/scripts/run-hivezero.sh
+++ b/scripts/run-hivezero.sh
@@ -1,13 +1,14 @@
#!/bin/bash
+# ./run-clone.sh 515-05 2014 10 11 0 23
-if [[ -z "$4" ]]; then
- last=$3
+if [[ -z "$6" ]]; then
+ last=$5
else
- last=$4
+ last=$6
fi
-for ((day = $3; day <= $last; day++)); do
- printf -v p "%04d-%02d-%02d" $1 $2 $day
- echo hive -f zero-counts.hql -d "year="$1 -d "month="$2 -d "day="$day
-d "date="$p
- export HADOOP_HEAPSIZE=1024 && hive -f zero-counts.hql -d "year="$1 -d
"month="$2 -d "day="$day -d "date="$p
+for ((hour = $5; hour <= $last; hour++)); do
+ printf -v t "tmp_%04d_%02d_%02d_%02d" $2 $3 $4 $hour
+ echo hive -f clone-xcs.hql -d "xcs="$1 -d "year="$2 -d "month="$3 -d
"day="$4 -d "hour="$hour -d "table="$t
+ export HADOOP_HEAPSIZE=1024 && hive -f clone-xcs.hql -d "xcs="$1 -d
"year="$2 -d "month="$3 -d "day="$4 -d "hour="$hour -d "table="$t
done
--
To view, visit https://gerrit.wikimedia.org/r/168980
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iec7779d7392e338410755ba19358810d02d668a0
Gerrit-PatchSet: 1
Gerrit-Project: analytics/zero-sms
Gerrit-Branch: master
Gerrit-Owner: Yurik <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits