http://www.mediawiki.org/wiki/Special:Code/MediaWiki/82943
Revision: 82943
Author: diederik
Date: 2011-02-28 18:31:53 +0000 (Mon, 28 Feb 2011)
Log Message:
-----------
Cassandra support (ALPHA)
Modified Paths:
--------------
trunk/tools/editor_trends/etl/enricher.py
Added Paths:
-----------
trunk/tools/editor_trends/database/cassandra.py
Added: trunk/tools/editor_trends/database/cassandra.py
===================================================================
--- trunk/tools/editor_trends/database/cassandra.py
(rev 0)
+++ trunk/tools/editor_trends/database/cassandra.py 2011-02-28 18:31:53 UTC
(rev 82943)
@@ -0,0 +1,32 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+'''
+Copyright (C) 2010 by Diederik van Liere ([email protected])
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License version 2
+as published by the Free Software Foundation.
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+See the GNU General Public License for more details, at
+http://www.fsf.org/licenses/gpl.html
+'''
+
+__author__ = '''\n'''.join(['Diederik van Liere ([email protected])', ])
+__author__email = 'dvanliere at gmail dot com'
+__date__ = '2011-02-25'
+__version__ = '0.1'
+
+def install_schema(keyspace_name, drop_first=False):
+
+ sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
+
+ sm.create_keyspace(keyspace_name, replication_factor=1) # TODO: Change
+
+ sm.create_column_family(keyspace_name, 'revisions',
+ comparator_type=pycassa.system_manager.UTF8_TYPE,
+
default_validation_class=pycassa.system_manager.UTF8_TYPE)
+
+ sm.create_index(keyspace_name, 'revisions', 'article',
pycassa.system_manager.UTF8_TYPE)
+ sm.create_index(keyspace_name, 'revisions', 'username',
pycassa.system_manager.UTF8_TYPE)
+ sm.create_index(keyspace_name, 'revisions', 'user_id',
pycassa.system_manager.LONG_TYPE)
Property changes on: trunk/tools/editor_trends/database/cassandra.py
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: trunk/tools/editor_trends/etl/enricher.py
===================================================================
--- trunk/tools/editor_trends/etl/enricher.py 2011-02-28 18:12:44 UTC (rev
82942)
+++ trunk/tools/editor_trends/etl/enricher.py 2011-02-28 18:31:53 UTC (rev
82943)
@@ -29,14 +29,17 @@
from xml.etree.cElementTree import fromstring, dump
from collections import deque
+if '..' not in sys.path:
+ sys.path.append('..')
+
try:
import pycassa
+ from database import cassandra
except ImportError:
print 'I am not going to use Cassandra today, it\'s my off day.'
-if '..' not in sys.path:
- sys.path.append('..')
+
from database import db
from bots import detector
from utils import file_utils
@@ -61,7 +64,15 @@
def setup_db(self):
if self.storage == 'cassandra':
- pass
+ self.keyspace_name = 'enwiki'
+ try:
+ self.db = pycassa.connect(self.keyspace_name)
+ cassandra.install_schema(self.keyspace_name)
+ self.collection = pycassa.ColumnFamily(self.db, 'Standard1')
+
+ except NameError, e:
+ pass
+
else:
self.db = db.init_mongo_db('enwiki')
self.collection = self.db['kaggle']
@@ -70,14 +81,19 @@
self.revisions.append(revision)
if len(self.revisions) == 100:
self.store()
- self.revisions = []
+ self.clear()
def empty(self):
self.store()
+ self.clear()
+ def clear(self):
+ self.revisions = []
+
def store(self):
if self.storage == 'cassandra':
print 'insert into cassandra'
+ self.collection.batch_insert(self.revisions)
else:
print 'insert into mongo'
@@ -230,6 +246,8 @@
revision_id = extracter.extract_revision_id(revision_id)
row['revision_id'] = revision_id
+ timestamp = revision.find('timestamp').text
+ row['timestamp'] = timestamp
hash = create_md5hash(revision)
revert = is_revision_reverted(hash['hash'], hashes)
@@ -239,7 +257,7 @@
row.update(hash)
row.update(size)
row.update(revert)
- print row
+ #print row
# if row['username'] == None:
# contributor = revision.find('contributor')
# attrs = contributor.getchildren()
@@ -301,20 +319,22 @@
input_queue = JoinableQueue()
result_queue = JoinableQueue()
storage = 'cassandra'
- files =
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+ #files =
['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
+ files =
['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
+
for file in files:
input_queue.put(file)
- for x in xrange(cpu_count):
+ for x in xrange(cpu_count()):
input_queue.put(None)
extracters = [Process(target=create_article, args=[input_queue,
result_queue])
- for x in xrange(cpu_count)]
+ for x in xrange(cpu_count())]
for extracter in extracters:
extracter.start()
creators = [Process(target=create_variables, args=[result_queue, storage])
- for x in xrange(cpu_count)]
+ for x in xrange(cpu_count())]
for creator in creators:
creator.start()
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs