EBernhardson has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/273298

Change subject: Repoint transfer_to_es job at specific servers
......................................................................

Repoint transfer_to_es job at specific servers

The firewall hole that was opened up between analytics and
elasticsearch does not include the LVS boxes. To work around
this point the job at a specific server in each cluster. It
might be better if we provided a list of servers, incase one
is down, but for now this is probably good enough.

Change-Id: Ic212cb1461de5ac9c1f012bed2f58490aee42d07
---
M oozie/transfer_to_es/bundle.xml
A oozie/transfer_to_es/transferToESTest.py
A oozie/transfer_to_es/transferToESTest2.py
3 files changed, 183 insertions(+), 2 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/discovery/analytics 
refs/changes/98/273298/1

diff --git a/oozie/transfer_to_es/bundle.xml b/oozie/transfer_to_es/bundle.xml
index 49c5f3b..f7f5182 100644
--- a/oozie/transfer_to_es/bundle.xml
+++ b/oozie/transfer_to_es/bundle.xml
@@ -19,7 +19,7 @@
         <configuration>
             <property>
                 <name>elasticsearch_url</name>
-                <value>http://search.svc.eqiad.wmnet:9200</value>
+                <value>http://elastic1017.eqiad.wmnet:9200</value>
             </property>
         </configuration>
     </coordinator>
@@ -29,7 +29,7 @@
         <configuration>
             <property>
                 <name>elasticsearch_url</name>
-                <value>http://search.svc.codfw.wmnet:9200</value>
+                <value>http://elastic2010.codfw.wmnet:9200</value>
             </property>
         </configuration>
     </coordinator>
diff --git a/oozie/transfer_to_es/transferToESTest.py 
b/oozie/transfer_to_es/transferToESTest.py
new file mode 100644
index 0000000..22d162f
--- /dev/null
+++ b/oozie/transfer_to_es/transferToESTest.py
@@ -0,0 +1,31 @@
+from multiprocessing import Pool
+import requests
+
+def sendDataToES(data, url):
+    """
+    Send data to ES server
+    """
+    try:
+        r = requests.put(url, data=data)
+    except requests.exceptions.RequestException, e:
+        logging.error("Failed to send update: " + str(e))
+        return False
+    if r.status_code != 200:
+        r.close()
+        return False
+    return True
+
+def send(i):
+    return sendDataToES("", "http://localhost:9876/%d"; % (i))
+
+def handlePartition(iterable):
+
+    pool = Pool(processes=4)
+    for i in pool.imap_unordered(send, iterable):
+        yield i
+    pool.close()
+    pool.join()
+
+if __name__ == "__main__":
+    for i in handlePartition(xrange(1000)):
+        print i
diff --git a/oozie/transfer_to_es/transferToESTest2.py 
b/oozie/transfer_to_es/transferToESTest2.py
new file mode 100644
index 0000000..93e3556
--- /dev/null
+++ b/oozie/transfer_to_es/transferToESTest2.py
@@ -0,0 +1,150 @@
+# A very primitive script to transfer from hive data files
+# to ElasticSearch HTTP endpoint
+from pyspark import SparkContext
+from pyspark.sql import SQLContext
+from optparse import OptionParser
+import requests
+import json
+import logging
+import subprocess
+from multiprocessing import Pool
+
+oparser = OptionParser()
+oparser.add_option("-s", "--source", dest="source", help="source for the 
data", metavar="SOURCE")
+oparser.add_option("-u", "--url", dest="url", help="URL to send the data to", 
metavar="URL")
+oparser.add_option("-m", "--hostmap", dest="hostmap", help="Hostnames map in 
JSON",
+    metavar="FILE", default="hostmap.json")
+oparser.add_option("-b", "--batch", dest="batch", help="Items per batch to 
load into ES", metavar="NUM", default="10")
+oparser.add_option("-n", "--noop-within", dest="noop",
+    help="Only perform update if value has changed by more than this 
percentage",
+    metavar="NOOP_WITHIN")
+oparser.add_option("-f", "--field-name", dest="field", help="Name of 
elasticsearch field to populate",
+    metavar="FIELD", default="score")
+
+(options, args) = oparser.parse_args()
+
+ITEMS_PER_BATCH = int(options.batch)
+SOURCE = options.source
+TARGET = options.url
+NOOP_WITHIN = options.noop
+FIELD = options.field
+if options.hostmap[0:24] == 'hdfs://analytics-hadoop/':
+    hostMap = json.loads(subprocess.check_output(["hdfs", "dfs", "-cat", 
options.hostmap[23:]]))
+else:
+    hostMap = json.load(open(options.hostmap))
+
+print "Transferring from %s to %s" % (SOURCE, TARGET)
+
+if __name__ == "__main__":
+    sc = SparkContext(appName="Send To ES: %s" % (TARGET))
+    sqlContext = SQLContext(sc)
+    broardcastMap = sc.broadcast(hostMap)
+    documentCounter = sc.accumulator(0)
+    updateCounter = sc.accumulator(0)
+    errorCounter = sc.accumulator(0)
+    failedDocumentCounter = sc.accumulator(0)
+
+    def documentData(document):
+        """
+        Create textual representation of the document data for one document
+        """
+        updateData = {"update": {"_id": document.page_id}}
+        if NOOP_WITHIN:
+            updateDoc = {"script": {
+                "script": "super_detect_noop",
+                "lang": "native",
+                "params": {
+                    "detectors": {FIELD: "within " + NOOP_WITHIN + "%"},
+                    "source": {FIELD: document.score},
+                },
+            }}
+        else:
+            updateDoc = {"doc": {FIELD: document.score}}
+
+        return json.dumps(updateData) + "\n" + json.dumps(updateDoc) + "\n"
+
+    def getTargetURL(wikihost):
+        """
+        Get ES URL from wiki hostname
+        """
+        if wikihost in broardcastMap.value:
+            wiki = broardcastMap.value[wikihost]
+            return TARGET + "/" + wiki + "_content" + "/page/_bulk"
+        return None
+
+    def sendDataToES(data, url):
+        """
+        Send data to ES server
+        """
+        if len(data) < 1:
+            return
+        try:
+            r = requests.put(url, data=data)
+        except requests.exceptions.RequestException, e:
+            errorCounter.add(1)
+            logging.error("Failed to send update: " + str(e))
+            return False
+        if r.status_code != 200:
+            errorCounter.add(1)
+            r.close()
+            return False
+        parseResponse(r)
+        updateCounter.add(1)
+        return True
+
+    def parseResponse(resp):
+        respData = resp.json()
+        for item in respData.get('items', {}):
+            if 'update' not in item:
+                continue
+            if 'status' not in item['update']:
+                continue
+            if item['update']['status'] != 200:
+                if 'error' in item['update'] and item['update']['error'][0:24] 
== 'DocumentMissingException':
+                    continue
+                failedDocumentCounter.add(1)
+
+    def sendDocumentsToES(documents):
+        """
+        Send a set of documents to ES
+        """
+        if len(documents) < 1:
+            return
+        url = getTargetURL(documents[0].project)
+        if not url:
+            return
+        data = ""
+        for document in documents:
+            data += documentData(document)
+        documentCounter.add(len(documents))
+        if not sendDataToES(data, url):
+            failedDocumentCounter.add(len(documents))
+
+    def groupAndSend(rows):
+        """
+        Group together documents from the same project and batch them
+        to elasticsearch
+        """
+        def genGroups():
+            group = []
+            for row in rows:
+                if len(group) > 0 and group[0].project != row.project:
+                    yield group
+                    group = []
+                group.append(row)
+                if len(group) >= ITEMS_PER_BATCH:
+                    yield group
+                    group = []
+            if len(group) > 0:
+                yield group
+
+        pool = Pool(processes=4)
+        pool.imap_unordered(sendDocumentsToES, genGroups())
+        pool.close()
+        pool.join()
+
+    data = sqlContext.load(SOURCE)
+    # print "Count: %d\n" % data.count()
+    data.sort(data.project).foreachPartition(groupAndSend)
+    print "%d documents processed, %d failed." % (documentCounter.value, 
failedDocumentCounter.value,)
+    print "%d requests successful, %d requests failed." % 
(updateCounter.value, errorCounter.value)

-- 
To view, visit https://gerrit.wikimedia.org/r/273298
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic212cb1461de5ac9c1f012bed2f58490aee42d07
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/discovery/analytics
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to