Yurik has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/168980

Change subject: log2dfs script
......................................................................

log2dfs script

Change-Id: Iec7779d7392e338410755ba19358810d02d668a0
---
M scripts/clone-xcs.hql
A scripts/log2dfs.py
M scripts/run-hivezero.sh
3 files changed, 273 insertions(+), 29 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/zero-sms 
refs/changes/80/168980/1

diff --git a/scripts/clone-xcs.hql b/scripts/clone-xcs.hql
index 5e7f500..39dc43c 100644
--- a/scripts/clone-xcs.hql
+++ b/scripts/clone-xcs.hql
@@ -5,35 +5,41 @@
 -- Clone one day worth of data to a temp table
 --
 -- Usage:
---     hive -f clone-xcs.hql -d year=2014 -d month=9 -d day=15 -d xcs=515-05 
-d table=tmp_clone
+--     hive -f clone-xcs.hql -d year=2014 -d month=10 -d day=11 -d hour=12 -d 
xcs=515-05 -d table=tmp_clone
 --
 -- set hivevar:year=2014;
 -- set hivevar:month=10;
--- set hivevar:day=21;
+-- set hivevar:day=11;
+-- set hivevar:hour=12;
 -- set hivevar:xcs=515-05;
 -- set hivevar:table=tmp_clone;
 
 use yurik;
 
 CREATE TABLE IF NOT EXISTS ${table} (
-  hostname string,
-  sequence bigint,
-  dt string,
-  time_firstbyte float,
-  ip string,
-  cache_status string,
-  http_status string,
-  response_size bigint,
-  http_method string,
-  uri_host string,
-  uri_path string,
-  uri_query string,
-  content_type string,
-  referer string,
-  x_forwarded_for string,
-  user_agent string,
-  accept_language string,
-  x_analytics string)
+  `hostname` string,
+  `sequence` bigint,
+  `dt` string,
+  `time_firstbyte` float,
+  `ip` string,
+  `cache_status` string,
+  `http_status` string,
+  `response_size` bigint,
+  `http_method` string,
+  `uri_host` string,
+  `uri_path` string,
+  `uri_query` string,
+  `content_type` string,
+  `referer` string,
+  `x_forwarded_for` string,
+  `user_agent` string,
+  `accept_language` string,
+  `x_analytics` string,
+  `webrequest_source` string,
+  `year` int,
+  `month` int,
+  `day` int,
+  `hour` int)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t';
 
@@ -47,6 +53,7 @@
             AND year=${year}
             AND month=${month}
             AND day=${day}
+            AND hour=${hour}
             AND x_analytics LIKE '%zero=%'
             AND SUBSTR(uri_path, 1, 6) = '/wiki/'
             AND (
@@ -65,5 +72,5 @@
             AND http_status NOT IN ( '301', '302', '303' )
             AND uri_host RLIKE '^[A-Za-z0-9-]+(\\.(zero|m))?\\.[a-z]*\\.org$'
             AND NOT (SPLIT(TRANSLATE(SUBSTR(uri_path, 7), ' ', '_'), '#')[0] 
RLIKE '^[Uu]ndefined$')
-            AND regexp_extract(x_analytics, 'zero=([^\;]+)') = ${xcs}
-
+            AND regexp_extract(x_analytics, 'zero=([^\;]+)') = '${xcs}'
+;
diff --git a/scripts/log2dfs.py b/scripts/log2dfs.py
new file mode 100644
index 0000000..4050b46
--- /dev/null
+++ b/scripts/log2dfs.py
@@ -0,0 +1,236 @@
+# coding=utf-8
+import gzip
+import re
+
+try:
+    from urllib.parse import unquote
+except ImportError:
+    from urllib import unquote
+
+from logprocessor import *
+
+
+columnHdrCache = 
u'date,type,xcs,via,ipset,https,lang,subdomain,site,count'.split(',')
+columnHdrResult = 
u'date,type,xcs,via,ipset,https,lang,subdomain,site,iszero,ison,count'.split(',')
+validSubDomains = {'m', 'zero', 'mobile', 'wap'}
+validHttpCode = {'200', '304'}
+
+
+def isValidInt(val):
+    try:
+        int(val)
+        return True
+    except ValueError:
+        return False
+
+
+xcsFromName = {
+    'celcom-malaysia': '502-13',
+    'dialog-sri-lanka': '413-02',
+    'digi-malaysia': '502-16',
+    'dtac-thailand': '520-18',
+    'grameenphone-bangladesh': '470-01',
+    'hello-cambodia': '456-02',
+    'orange-botswana': '652-02',
+    'orange-cameroon': '624-02',
+    'orange-congo': '630-86',
+    'orange-ivory-coast': '612-03',
+    'orange-kenya': '639-07',  # Also 639-02 (Safaricom Kenya)
+    'orange-morocco': '604-00',
+    'orange-niger': '614-04',
+    'orange-tunesia': '605-01',  # Also 639-02 (Safaricom Kenya)
+    'orange-uganda': '641-14',
+    'saudi-telecom': '420-01',  # Also 652-02 (orange-botswana)
+    'tata-india': '405-029',
+    'telenor-montenegro': '297-01',  # Also 250-99 (beeline ru)
+    'tim-brasil': '724-02',
+    'vodaphone-india': '404-01',
+    'xl-axiata-indonesia': '510-11',
+}
+
+httpStatuses = {
+    '-': '',
+    'hit': 'hit',
+    'miss': 'miss',
+    'pass': 'pass',
+    'TCP_CLIENT_REFRESH_MISS': 'miss',
+    'TCP_MEM_HIT': 'hit',
+    'TCP_MISS': 'miss',
+    'TCP_REFRESH_HIT': 'hit',
+    'FAKE_CACHE_STATUS': '',
+    'TCP_HIT': 'hit',
+    'TCP_IMS_HIT': 'hit',
+    'TCP_DENIED': '',
+    'TCP_REFRESH_MISS': 'miss',
+    'TCP_NEGATIVE_HIT': 'hit',
+}
+
+
+class LogConverter(LogProcessor):
+    def __init__(self, settingsFile='settings/log2dfs.json'):
+        super(LogConverter, self).__init__(settingsFile, 'w2h')
+
+        self.logFileRe = re.compile(r'\d\d\d\d\d\d\d\d')
+        self.dateRe = re.compile(r'(201\d-\d\d-\d\dT\d\d):\d\d:\d\d(\.\d+)?')
+        self.urlRe = re.compile(r'^(https?)://([^/]+)([^?#]*)(.*)', 
re.IGNORECASE)
+
+    def processLogFiles(self):
+
+        safePrint('Processing log files')
+        for f in os.listdir(self.pathLogs):
+
+            if not self.logFileRe.match(f):
+                continue
+            logFile = os.path.join(self.pathLogs, f)
+            statFile = os.path.join(self.pathCache, f)
+            if statFile.endswith('.gz'):
+                statFile = statFile[:-3]
+
+            # if not os.path.exists(statFile):
+            if True:
+                self.processLogFile(logFile, statFile)
+
+    def processLogFile(self, logFile, statFile):
+        """
+            0  cp1046.eqiad.wmnet
+            1  13866141087
+            2  2014-08-07T06:30:46
+            3  0.000130653
+            4  <ip>
+            5  hit/301
+            6  0
+            7  GET
+            8  http://en.m.wikipedia.org/wiki/Royal_Challenge
+            9  -
+            10 text/html; charset=UTF-8
+            11 http://en.m.wikipedia.org/wiki/Royal_Challenge
+            12 -
+            13 Mozilla/5.0 (Linux; U; Android 2.3.5; en-us; ...
+            .. Version/4.0 Mobile Safari/534.30
+            -2 en-US
+            -1 zero=410-01
+        """
+
+        safePrint('Processing %s' % logFile)
+        count = 0
+        isTab = '.tsv' in logFile or '.tab' in logFile
+        isNoOpera = 'no_opera' in logFile
+        webrequest_source = 'mobile'
+        lastDate = year = month = day = hour = None
+        xcsWarns = set()
+
+        defaultXcs = None
+        for k, v in xcsFromName.iteritems():
+            if k in logFile:
+                defaultXcs = u'zero=' + v
+                break
+
+        if logFile.endswith('.gz'):
+            streamData = 
io.TextIOWrapper(io.BufferedReader(gzip.open(logFile)), encoding='utf8', 
errors='ignore')
+        else:
+            streamData = io.open(logFile, 'r', encoding='utf8', 
errors='ignore')
+
+        with io.open(statFile, 'w', encoding='utf8') as out:
+            for line in streamData:
+                count += 1
+                if count % 1000000 == 0:
+                    safePrint('%d lines processed' % count)
+
+                strip = line.strip('\n\r')
+                if isTab:
+                    l = strip.split('\t')
+                    if l[2].startswith('201'):
+                        while len(l) > 16:
+                            l[13] += ' ' + l[14]
+                            del l[11]
+                else:
+                    l = strip.split(' ')
+                    # fix text/html; charset=UTF-8 into one field
+                    while len(l) >= 10 and l[10].endswith(';') and l[11] != 
'-' and not l[11].startswith('http'):
+                        l[10] += ' ' + l[11]
+                        del l[11]
+                    l[13] = unquote(l[13])
+                    if len(l) == 14:
+                        l.append(u'')
+                        l.append(u'')
+
+                partsCount = len(l)
+                if partsCount != 16:
+                    safePrint(u'Wrong parts count - %d parts\n%s' % 
(partsCount, line))
+                    continue
+
+                l = ['' if v == '-' else v for v in l]
+                (hostname, sequence, dt, time_firstbyte, ip, status, 
response_size, http_method, uri, unknown1,
+                 content_type, referer, x_forwarded_for, user_agent, 
accept_language, x_analytics) = l
+                # status -> cache_status, http_status
+                # uri -> uri_host, uri_path, uri_query
+                # new:  webrequest_source, year, month, day, hour
+
+                m = self.dateRe.match(dt)
+                if not m:
+                    safePrint(u'Invalid date\n%s' % line)
+                    continue
+                if lastDate != m.group(1):
+                    lastDate = m.group(1)
+                    d = datetime.strptime(lastDate, r'%Y-%m-%dT%H')
+                    year = unicode(d.year)
+                    month = unicode(d.month)
+                    day = unicode(d.day)
+                    hour = unicode(d.hour)
+
+                if 'zero=' not in x_analytics:
+                    if defaultXcs:
+                        if x_analytics:
+                            x_analytics += ';'
+                        x_analytics += defaultXcs
+                    else:
+                        safePrint(u'String too short - %d parts\n%s' % 
(partsCount, line))
+                        continue
+                elif defaultXcs and x_analytics not in xcsWarns:
+                    if defaultXcs not in x_analytics:
+                        safePrint(u'Warning: XCS mismatch, expecting "%s", 
found "%s"' % (defaultXcs, x_analytics))
+                    else:
+                        safePrint(u'Warning: XCS confirmed, found expected 
"%s"' % defaultXcs)
+                    xcsWarns.add(x_analytics)
+
+                # expand "hit/200" into "hit", "200"
+                tmp = status.split(u'/')
+                if len(tmp) != 2 or not isValidInt(tmp[1]):
+                    safePrint(u'Invalid status - "%s"\n%s' % (status, line))
+                    continue
+                (cache_status, http_status) = tmp
+                if cache_status not in httpStatuses:
+                    safePrint(u'Unknown cache_status - "%s"\n%s' % 
(cache_status, line))
+                    continue
+                cache_status = httpStatuses[cache_status]
+
+                m = self.urlRe.match(uri)
+                if not m:
+                    safePrint(u'URL parsing failed: "%s"\n%s' % (uri, line))
+                    continue
+                if m.group(1).lower() == u'https' and u'https=' not in 
x_analytics:
+                    x_analytics += u'https=1'
+                uri_host = m.group(2)
+                if uri_host.endswith(':80'):
+                    uri_host = uri_host[:-3]
+                if uri_host.endswith('.'):
+                    uri_host = uri_host[:-1]
+                uri_path = m.group(3)
+                uri_query = m.group(4)
+
+                result = '\t'.join(
+                    [hostname, sequence, dt, time_firstbyte, ip, cache_status, 
http_status, response_size, http_method,
+                     uri_host, uri_path, uri_query, content_type, referer, 
x_forwarded_for, user_agent, accept_language,
+                     x_analytics, webrequest_source, year, month, day, hour])
+                out.write(result + '\n')
+
+    def run(self):
+        self.processLogFiles()
+
+    def manualRun(self):
+        self.processLogFiles()
+
+
+if __name__ == '__main__':
+    # LogConverter().manualRun()
+    LogConverter().safeRun()
diff --git a/scripts/run-hivezero.sh b/scripts/run-hivezero.sh
index 0df0a65..ab81156 100755
--- a/scripts/run-hivezero.sh
+++ b/scripts/run-hivezero.sh
@@ -1,13 +1,14 @@
 #!/bin/bash
+# ./run-clone.sh 515-05 2014 10 11 0 23
 
-if [[ -z "$4" ]]; then
-       last=$3
+if [[ -z "$6" ]]; then
+       last=$5
 else
-       last=$4
+       last=$6
 fi
 
-for ((day = $3; day <= $last; day++)); do
-       printf -v p "%04d-%02d-%02d" $1 $2 $day
-       echo hive -f zero-counts.hql -d "year="$1 -d "month="$2 -d "day="$day 
-d "date="$p
-       export HADOOP_HEAPSIZE=1024 && hive -f zero-counts.hql -d "year="$1 -d 
"month="$2 -d "day="$day -d "date="$p
+for ((hour = $5; hour <= $last; hour++)); do
+       printf -v t "tmp_%04d_%02d_%02d_%02d" $2 $3 $4 $hour
+       echo hive -f clone-xcs.hql -d "xcs="$1 -d "year="$2 -d "month="$3 -d 
"day="$4 -d "hour="$hour -d "table="$t
+       export HADOOP_HEAPSIZE=1024 && hive -f clone-xcs.hql -d "xcs="$1 -d 
"year="$2 -d "month="$3 -d "day="$4 -d "hour="$hour -d "table="$t
 done

-- 
To view, visit https://gerrit.wikimedia.org/r/168980
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iec7779d7392e338410755ba19358810d02d668a0
Gerrit-PatchSet: 1
Gerrit-Project: analytics/zero-sms
Gerrit-Branch: master
Gerrit-Owner: Yurik <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to