Ori.livneh has submitted this change and it was merged.

Change subject: Add Kafka writer; move writer-specific imports to writers
......................................................................


Add Kafka writer; move writer-specific imports to writers

* Adds a writer which uses kafka-python to write to Kafka.
* Import writer-specific dependencies in the writers themselves, so that
  EventLogging as a whole doesn't accumulate dependencies.

Change-Id: Iced31fcfbaf5a4b92e5ff5a432307b2c9aabca0b
---
M server/eventlogging/handlers.py
M server/requirements.txt
M server/setup.py
M server/tests/test_compat.py
4 files changed, 19 insertions(+), 5 deletions(-)

Approvals:
  Ori.livneh: Looks good to me, approved



diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index cd52255..9806d46 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -20,7 +20,6 @@
 import socket
 import sys
 
-import pymongo
 import sqlalchemy
 
 from .factory import writes, reads
@@ -51,6 +50,8 @@
 
 @writes('mongodb')
 def mongodb_writer(uri, database='events'):
+    import pymongo
+
     client = pymongo.MongoClient(uri)
     db = client[database]
     datetime_from_timestamp = datetime.datetime.fromtimestamp
@@ -63,6 +64,21 @@
         db[collection].insert(event)
 
 
+@writes('kafka')
+def kafka_writer(brokers, topic='eventlogging', **kwargs):
+    """Write events to Kafka, keyed by SCID."""
+    from kafka.client import KafkaClient
+    from kafka.producer import KeyedProducer
+
+    kafka = KafkaClient(brokers)
+    producer = KeyedProducer(kafka, topic, **kwargs)
+
+    while 1:
+        event = (yield)
+        key = '%(schema)s_%(revision)s' % event  # e.g. 'EchoMail_5467650'
+        producer.send(key, json.dumps(event, sort_keys=True))
+
+
 @writes('mysql', 'sqlite')
 def sql_writer(uri):
     engine = sqlalchemy.create_engine(uri)
diff --git a/server/requirements.txt b/server/requirements.txt
index 14523f1..3dc0456 100644
--- a/server/requirements.txt
+++ b/server/requirements.txt
@@ -1,5 +1,4 @@
 jsonschema>=0.7
 pygments>=1.5
-pymongo>=2.1
 pyzmq>=2.1
 sqlalchemy>=0.7
diff --git a/server/setup.py b/server/setup.py
index 439fd22..8e24dbc 100644
--- a/server/setup.py
+++ b/server/setup.py
@@ -55,7 +55,6 @@
     install_requires=(
         "jsonschema>=0.7",
         "pygments>=1.5",
-        "pymongo>=2.1",
         "pyzmq>=2.1",
         "sqlalchemy>=0.7",
     )
diff --git a/server/tests/test_compat.py b/server/tests/test_compat.py
index e414862..f6bd4d3 100644
--- a/server/tests/test_compat.py
+++ b/server/tests/test_compat.py
@@ -16,7 +16,7 @@
 import eventlogging
 
 
-TRAVIS = os.getenv('TRAVIS', False)
+CI = 'TRAVIS' in os.environ or 'JENKINS_URL' in os.environ
 
 
 class SingleServingHttpd(multiprocessing.Process):
@@ -45,7 +45,7 @@
 class HttpGetTestCase(unittest.TestCase):
     """Test cases for ``http_get``."""
 
-    @unittest.skipIf(TRAVIS, 'Running in Travis')
+    @unittest.skipIf(CI, 'Running in a CI environment')
     def test_http_get(self):
         """``http_get`` can pull content via HTTP."""
         server = SingleServingHttpd('secret')

-- 
To view, visit https://gerrit.wikimedia.org/r/124502
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Iced31fcfbaf5a4b92e5ff5a432307b2c9aabca0b
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to