http://www.mediawiki.org/wiki/Special:Code/MediaWiki/82943

Revision: 82943
Author:   diederik
Date:     2011-02-28 18:31:53 +0000 (Mon, 28 Feb 2011)
Log Message:
-----------
Cassandra support (ALPHA)

Modified Paths:
--------------
    trunk/tools/editor_trends/etl/enricher.py

Added Paths:
-----------
    trunk/tools/editor_trends/database/cassandra.py

Added: trunk/tools/editor_trends/database/cassandra.py
===================================================================
--- trunk/tools/editor_trends/database/cassandra.py                             
(rev 0)
+++ trunk/tools/editor_trends/database/cassandra.py     2011-02-28 18:31:53 UTC 
(rev 82943)
@@ -0,0 +1,32 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+Copyright (C) 2010 by Diederik van Liere ([email protected])
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License version 2
+as published by the Free Software Foundation.
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+See the GNU General Public License for more details, at
+http://www.fsf.org/licenses/gpl.html
+'''
+
+__author__ = '''\n'''.join(['Diederik van Liere ([email protected])', ])
+__author__email = 'dvanliere at gmail dot com'
+__date__ = '2011-02-25'
+__version__ = '0.1'
+
+def install_schema(keyspace_name, drop_first=False):
+
+    sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
+
+    sm.create_keyspace(keyspace_name, replication_factor=1)  # TODO: Change
+
+    sm.create_column_family(keyspace_name, 'revisions',
+                            comparator_type=pycassa.system_manager.UTF8_TYPE,
+                            
default_validation_class=pycassa.system_manager.UTF8_TYPE)
+
+    sm.create_index(keyspace_name, 'revisions', 'article', 
pycassa.system_manager.UTF8_TYPE)
+    sm.create_index(keyspace_name, 'revisions', 'username', 
pycassa.system_manager.UTF8_TYPE)
+    sm.create_index(keyspace_name, 'revisions', 'user_id', 
pycassa.system_manager.LONG_TYPE)


Property changes on: trunk/tools/editor_trends/database/cassandra.py
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py   2011-02-28 18:12:44 UTC (rev 
82942)
+++ trunk/tools/editor_trends/etl/enricher.py   2011-02-28 18:31:53 UTC (rev 
82943)
@@ -29,14 +29,17 @@
 from xml.etree.cElementTree import fromstring, dump
 from collections import deque
 
+if '..' not in sys.path:
+    sys.path.append('..')
+
 try:
     import pycassa
+    from database import cassandra
 except ImportError:
     print 'I am not going to use Cassandra today, it\'s my off day.'
 
-if '..' not in sys.path:
-    sys.path.append('..')
 
+
 from database import db
 from bots import detector
 from utils import file_utils
@@ -61,7 +64,15 @@
 
     def setup_db(self):
         if self.storage == 'cassandra':
-            pass
+            self.keyspace_name = 'enwiki'
+            try:
+                self.db = pycassa.connect(self.keyspace_name)
+                cassandra.install_schema(self.keyspace_name)
+                self.collection = pycassa.ColumnFamily(self.db, 'Standard1')
+
+            except NameError, e:
+                pass
+
         else:
             self.db = db.init_mongo_db('enwiki')
             self.collection = self.db['kaggle']
@@ -70,14 +81,19 @@
         self.revisions.append(revision)
         if len(self.revisions) == 100:
             self.store()
-            self.revisions = []
+            self.clear()
 
     def empty(self):
         self.store()
+        self.clear()
 
+    def clear(self):
+        self.revisions = []
+
     def store(self):
         if self.storage == 'cassandra':
             print 'insert into cassandra'
+            self.collection.batch_insert(self.revisions)
         else:
             print 'insert into mongo'
 
@@ -230,6 +246,8 @@
                     revision_id = extracter.extract_revision_id(revision_id)
                     row['revision_id'] = revision_id
 
+                    timestamp = revision.find('timestamp').text
+                    row['timestamp'] = timestamp
 
                     hash = create_md5hash(revision)
                     revert = is_revision_reverted(hash['hash'], hashes)
@@ -239,7 +257,7 @@
                     row.update(hash)
                     row.update(size)
                     row.update(revert)
-                    print row
+                    #print row
     #                if row['username'] == None:
     #                    contributor = revision.find('contributor')
     #                    attrs = contributor.getchildren()
@@ -301,20 +319,22 @@
     input_queue = JoinableQueue()
     result_queue = JoinableQueue()
     storage = 'cassandra'
-    files = 
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+    #files = 
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+    files = 
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+
     for file in files:
         input_queue.put(file)
 
-    for x in xrange(cpu_count):
+    for x in xrange(cpu_count()):
         input_queue.put(None)
 
     extracters = [Process(target=create_article, args=[input_queue, 
result_queue])
-                  for x in xrange(cpu_count)]
+                  for x in xrange(cpu_count())]
     for extracter in extracters:
         extracter.start()
 
     creators = [Process(target=create_variables, args=[result_queue, storage])
-                for x in xrange(cpu_count)]
+                for x in xrange(cpu_count())]
     for creator in creators:
         creator.start()
 


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to