Ori.livneh has submitted this change and it was merged.
Change subject: Add Kafka writer; move writer-specific imports to writers
......................................................................
Add Kafka writer; move writer-specific imports to writers
* Adds a writer which uses kafka-python to write to Kafka.
* Import writer-specific dependencies in the writers themselves, so that
EventLogging as a whole doesn't accumulate dependencies.
Change-Id: Iced31fcfbaf5a4b92e5ff5a432307b2c9aabca0b
---
M server/eventlogging/handlers.py
M server/requirements.txt
M server/setup.py
M server/tests/test_compat.py
4 files changed, 19 insertions(+), 5 deletions(-)
Approvals:
Ori.livneh: Looks good to me, approved
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index cd52255..9806d46 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -20,7 +20,6 @@
import socket
import sys
-import pymongo
import sqlalchemy
from .factory import writes, reads
@@ -51,6 +50,8 @@
@writes('mongodb')
def mongodb_writer(uri, database='events'):
+ import pymongo
+
client = pymongo.MongoClient(uri)
db = client[database]
datetime_from_timestamp = datetime.datetime.fromtimestamp
@@ -63,6 +64,21 @@
db[collection].insert(event)
+@writes('kafka')
+def kafka_writer(brokers, topic='eventlogging', **kwargs):
+ """Write events to Kafka, keyed by SCID."""
+ from kafka.client import KafkaClient
+ from kafka.producer import KeyedProducer
+
+ kafka = KafkaClient(brokers)
+ producer = KeyedProducer(kafka, topic, **kwargs)
+
+ while 1:
+ event = (yield)
+ key = '%(schema)s_%(revision)s' % event # e.g. 'EchoMail_5467650'
+ producer.send(key, json.dumps(event, sort_keys=True))
+
+
@writes('mysql', 'sqlite')
def sql_writer(uri):
engine = sqlalchemy.create_engine(uri)
diff --git a/server/requirements.txt b/server/requirements.txt
index 14523f1..3dc0456 100644
--- a/server/requirements.txt
+++ b/server/requirements.txt
@@ -1,5 +1,4 @@
jsonschema>=0.7
pygments>=1.5
-pymongo>=2.1
pyzmq>=2.1
sqlalchemy>=0.7
diff --git a/server/setup.py b/server/setup.py
index 439fd22..8e24dbc 100644
--- a/server/setup.py
+++ b/server/setup.py
@@ -55,7 +55,6 @@
install_requires=(
"jsonschema>=0.7",
"pygments>=1.5",
- "pymongo>=2.1",
"pyzmq>=2.1",
"sqlalchemy>=0.7",
)
diff --git a/server/tests/test_compat.py b/server/tests/test_compat.py
index e414862..f6bd4d3 100644
--- a/server/tests/test_compat.py
+++ b/server/tests/test_compat.py
@@ -16,7 +16,7 @@
import eventlogging
-TRAVIS = os.getenv('TRAVIS', False)
+CI = 'TRAVIS' in os.environ or 'JENKINS_URL' in os.environ
class SingleServingHttpd(multiprocessing.Process):
@@ -45,7 +45,7 @@
class HttpGetTestCase(unittest.TestCase):
"""Test cases for ``http_get``."""
- @unittest.skipIf(TRAVIS, 'Running in Travis')
+ @unittest.skipIf(CI, 'Running in a CI environment')
def test_http_get(self):
"""``http_get`` can pull content via HTTP."""
server = SingleServingHttpd('secret')
--
To view, visit https://gerrit.wikimedia.org/r/124502
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Iced31fcfbaf5a4b92e5ff5a432307b2c9aabca0b
Gerrit-PatchSet: 3
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits