EBernhardson has uploaded a new change for review.
https://gerrit.wikimedia.org/r/273298
Change subject: Repoint transfer_to_es job at specific servers
......................................................................
Repoint transfer_to_es job at specific servers
The firewall hole that was opened up between analytics and
elasticsearch does not include the LVS boxes. To work around
this point the job at a specific server in each cluster. It
might be better if we provided a list of servers, incase one
is down, but for now this is probably good enough.
Change-Id: Ic212cb1461de5ac9c1f012bed2f58490aee42d07
---
M oozie/transfer_to_es/bundle.xml
A oozie/transfer_to_es/transferToESTest.py
A oozie/transfer_to_es/transferToESTest2.py
3 files changed, 183 insertions(+), 2 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/discovery/analytics
refs/changes/98/273298/1
diff --git a/oozie/transfer_to_es/bundle.xml b/oozie/transfer_to_es/bundle.xml
index 49c5f3b..f7f5182 100644
--- a/oozie/transfer_to_es/bundle.xml
+++ b/oozie/transfer_to_es/bundle.xml
@@ -19,7 +19,7 @@
<configuration>
<property>
<name>elasticsearch_url</name>
- <value>http://search.svc.eqiad.wmnet:9200</value>
+ <value>http://elastic1017.eqiad.wmnet:9200</value>
</property>
</configuration>
</coordinator>
@@ -29,7 +29,7 @@
<configuration>
<property>
<name>elasticsearch_url</name>
- <value>http://search.svc.codfw.wmnet:9200</value>
+ <value>http://elastic2010.codfw.wmnet:9200</value>
</property>
</configuration>
</coordinator>
diff --git a/oozie/transfer_to_es/transferToESTest.py
b/oozie/transfer_to_es/transferToESTest.py
new file mode 100644
index 0000000..22d162f
--- /dev/null
+++ b/oozie/transfer_to_es/transferToESTest.py
@@ -0,0 +1,31 @@
+from multiprocessing import Pool
+import requests
+
+def sendDataToES(data, url):
+ """
+ Send data to ES server
+ """
+ try:
+ r = requests.put(url, data=data)
+ except requests.exceptions.RequestException, e:
+ logging.error("Failed to send update: " + str(e))
+ return False
+ if r.status_code != 200:
+ r.close()
+ return False
+ return True
+
+def send(i):
+ return sendDataToES("", "http://localhost:9876/%d" % (i))
+
+def handlePartition(iterable):
+
+ pool = Pool(processes=4)
+ for i in pool.imap_unordered(send, iterable):
+ yield i
+ pool.close()
+ pool.join()
+
+if __name__ == "__main__":
+ for i in handlePartition(xrange(1000)):
+ print i
diff --git a/oozie/transfer_to_es/transferToESTest2.py
b/oozie/transfer_to_es/transferToESTest2.py
new file mode 100644
index 0000000..93e3556
--- /dev/null
+++ b/oozie/transfer_to_es/transferToESTest2.py
@@ -0,0 +1,150 @@
+# A very primitive script to transfer from hive data files
+# to ElasticSearch HTTP endpoint
+from pyspark import SparkContext
+from pyspark.sql import SQLContext
+from optparse import OptionParser
+import requests
+import json
+import logging
+import subprocess
+from multiprocessing import Pool
+
+oparser = OptionParser()
+oparser.add_option("-s", "--source", dest="source", help="source for the
data", metavar="SOURCE")
+oparser.add_option("-u", "--url", dest="url", help="URL to send the data to",
metavar="URL")
+oparser.add_option("-m", "--hostmap", dest="hostmap", help="Hostnames map in
JSON",
+ metavar="FILE", default="hostmap.json")
+oparser.add_option("-b", "--batch", dest="batch", help="Items per batch to
load into ES", metavar="NUM", default="10")
+oparser.add_option("-n", "--noop-within", dest="noop",
+ help="Only perform update if value has changed by more than this
percentage",
+ metavar="NOOP_WITHIN")
+oparser.add_option("-f", "--field-name", dest="field", help="Name of
elasticsearch field to populate",
+ metavar="FIELD", default="score")
+
+(options, args) = oparser.parse_args()
+
+ITEMS_PER_BATCH = int(options.batch)
+SOURCE = options.source
+TARGET = options.url
+NOOP_WITHIN = options.noop
+FIELD = options.field
+if options.hostmap[0:24] == 'hdfs://analytics-hadoop/':
+ hostMap = json.loads(subprocess.check_output(["hdfs", "dfs", "-cat",
options.hostmap[23:]]))
+else:
+ hostMap = json.load(open(options.hostmap))
+
+print "Transferring from %s to %s" % (SOURCE, TARGET)
+
+if __name__ == "__main__":
+ sc = SparkContext(appName="Send To ES: %s" % (TARGET))
+ sqlContext = SQLContext(sc)
+ broardcastMap = sc.broadcast(hostMap)
+ documentCounter = sc.accumulator(0)
+ updateCounter = sc.accumulator(0)
+ errorCounter = sc.accumulator(0)
+ failedDocumentCounter = sc.accumulator(0)
+
+ def documentData(document):
+ """
+ Create textual representation of the document data for one document
+ """
+ updateData = {"update": {"_id": document.page_id}}
+ if NOOP_WITHIN:
+ updateDoc = {"script": {
+ "script": "super_detect_noop",
+ "lang": "native",
+ "params": {
+ "detectors": {FIELD: "within " + NOOP_WITHIN + "%"},
+ "source": {FIELD: document.score},
+ },
+ }}
+ else:
+ updateDoc = {"doc": {FIELD: document.score}}
+
+ return json.dumps(updateData) + "\n" + json.dumps(updateDoc) + "\n"
+
+ def getTargetURL(wikihost):
+ """
+ Get ES URL from wiki hostname
+ """
+ if wikihost in broardcastMap.value:
+ wiki = broardcastMap.value[wikihost]
+ return TARGET + "/" + wiki + "_content" + "/page/_bulk"
+ return None
+
+ def sendDataToES(data, url):
+ """
+ Send data to ES server
+ """
+ if len(data) < 1:
+ return
+ try:
+ r = requests.put(url, data=data)
+ except requests.exceptions.RequestException, e:
+ errorCounter.add(1)
+ logging.error("Failed to send update: " + str(e))
+ return False
+ if r.status_code != 200:
+ errorCounter.add(1)
+ r.close()
+ return False
+ parseResponse(r)
+ updateCounter.add(1)
+ return True
+
+ def parseResponse(resp):
+ respData = resp.json()
+ for item in respData.get('items', {}):
+ if 'update' not in item:
+ continue
+ if 'status' not in item['update']:
+ continue
+ if item['update']['status'] != 200:
+ if 'error' in item['update'] and item['update']['error'][0:24]
== 'DocumentMissingException':
+ continue
+ failedDocumentCounter.add(1)
+
+ def sendDocumentsToES(documents):
+ """
+ Send a set of documents to ES
+ """
+ if len(documents) < 1:
+ return
+ url = getTargetURL(documents[0].project)
+ if not url:
+ return
+ data = ""
+ for document in documents:
+ data += documentData(document)
+ documentCounter.add(len(documents))
+ if not sendDataToES(data, url):
+ failedDocumentCounter.add(len(documents))
+
+ def groupAndSend(rows):
+ """
+ Group together documents from the same project and batch them
+ to elasticsearch
+ """
+ def genGroups():
+ group = []
+ for row in rows:
+ if len(group) > 0 and group[0].project != row.project:
+ yield group
+ group = []
+ group.append(row)
+ if len(group) >= ITEMS_PER_BATCH:
+ yield group
+ group = []
+ if len(group) > 0:
+ yield group
+
+ pool = Pool(processes=4)
+ pool.imap_unordered(sendDocumentsToES, genGroups())
+ pool.close()
+ pool.join()
+
+ data = sqlContext.load(SOURCE)
+ # print "Count: %d\n" % data.count()
+ data.sort(data.project).foreachPartition(groupAndSend)
+ print "%d documents processed, %d failed." % (documentCounter.value,
failedDocumentCounter.value,)
+ print "%d requests successful, %d requests failed." %
(updateCounter.value, errorCounter.value)
--
To view, visit https://gerrit.wikimedia.org/r/273298
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic212cb1461de5ac9c1f012bed2f58490aee42d07
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/discovery/analytics
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits