This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kibble-scanners.git

commit 36dd76133c00674d6b08f568e872ec690e2683c2
Author: Daniel Gruno <humbed...@apache.org>
AuthorDate: Fri Mar 2 15:44:01 2018 +0100

    Rewrite broker class, inherit kibble UI wrapper
    
    Instead of doing checks constantly, we'll inherit the
    wrapper class from the UI repo. There are a few cases
    where we still have to manually do if/else (bulk and
    api checks), but the rest can be aliases.
    
    Bump accepted DB version to 2.
---
 src/plugins/brokers/kibbleES.py | 91 ++++++++++++++++++++++++++++-------------
 1 file changed, 62 insertions(+), 29 deletions(-)

diff --git a/src/plugins/brokers/kibbleES.py b/src/plugins/brokers/kibbleES.py
index 79d1caa..120d64b 100644
--- a/src/plugins/brokers/kibbleES.py
+++ b/src/plugins/brokers/kibbleES.py
@@ -21,7 +21,50 @@ import elasticsearch.helpers
 import threading
 import sys
 
-KIBBLE_DB_VERSION = 1  # Current DB struct version
+KIBBLE_DB_VERSION = 2  # Current DB struct version
+
+class KibbleESWrapper(object):
+    """
+       Class for rewriting old-style queries to the new ones,
+       where doc_type is an integral part of the DB name
+    """
+    def __init__(self, ES):
+        self.ES = ES
+        self.indices = self.indicesClass(ES)
+    
+    def get(self, index, doc_type, id):
+        return self.ES.get(index = index+'_'+doc_type, doc_type = '_doc', id = 
id)
+    def exists(self, index, doc_type, id):
+        return self.ES.exists(index = index+'_'+doc_type, doc_type = '_doc', 
id = id)
+    def delete(self, index, doc_type, id):
+        return self.ES.delete(index = index+'_'+doc_type, doc_type = '_doc', 
id = id)
+    def index(self, index, doc_type, id, body):
+        return self.ES.index(index = index+'_'+doc_type, doc_type = '_doc', id 
= id, body = body)
+    def update(self, index, doc_type, id, body):
+        return self.ES.update(index = index+'_'+doc_type, doc_type = '_doc', 
id = id, body = body)
+    def search(self, index, doc_type, size = 100, _source_include = None, body 
= None):
+        return self.ES.search(
+            index = index+'_'+doc_type,
+            doc_type = '_doc',
+            size = size,
+            _source_include = _source_include,
+            body = body
+            )
+    def count(self, index, doc_type, body = None):
+        return self.ES.count(
+            index = index+'_'+doc_type,
+            doc_type = '_doc',
+            body = body
+            )
+    
+    class indicesClass(object):
+        """ Indices helper class """
+        def __init__(self, ES):
+            self.ES = ES
+            
+        def exists(self, index):
+            return self.ES.indices.exists(index = index)
+
 
 # This is redundant, refactor later?
 def pprint(string, err = False):
@@ -59,43 +102,27 @@ class KibbleBit:
         
     def updateSource(self, source):
         """ Updates a source document, usually with a status update """
-        if self.broker.noTypes:
-            
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'] + 
"_source",
-                    doc_type = '_doc',
-                    id=source['sourceID'],
-                    body = source
-            )
-        else:
-            
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'],
-                    doc_type="source",
-                    id=source['sourceID'],
-                    body = source
-            )
+        
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'],
+                doc_type="source",
+                id=source['sourceID'],
+                body = source
+        )
         
     def get(self, doctype, docid):
         """ Fetches a document from the DB """
-        if self.broker.noTypes:
-            doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'] + "_" 
+ doctype, id = docid)
-        else:
-            doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
+        doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
         if doc:
             return doc['_source']
         return None
     
     def exists(self, doctype, docid):
         """ Checks whether a document already exists or not """
-        if self.broker.noTypes:
-            return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'] + 
"_" + doctype, id = docid)
-        else:
-            return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
+        return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
     
     def index(self, doctype, docid, document):
         """ Adds a new document to the index """
         dbname = self.broker.config['elasticsearch']['database']
-        if self.broker.noTypes:
-            self.broker.DB.index(index=dbname + "_" + doctype, doc_type = 
'_doc', id = docid, body = document)
-        else:
-            self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, 
body = document)        
+        self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, 
body = document)        
         
     def append(self, t, doc):
         """ Append a document to the bulk push queue """
@@ -141,7 +168,7 @@ class KibbleBit:
                     'doc_as_upsert': True,
                 })
         try:
-            elasticsearch.helpers.bulk(self.broker.DB, js_arr)
+            elasticsearch.helpers.bulk(self.broker.oDB, js_arr)
         except Exception as err:
             pprint("Warning: Could not bulk insert: %s" % err)
         
@@ -227,17 +254,23 @@ class Broker:
         es_info = es.info()
         pprint("Connected!")
         self.DB = es
+        self.oDB = es # Original ES class, always. the .DB may change
         self.config = config
         self.bitClass = KibbleBit
         # This bit is required since ES 6.x and above don't like document types
         self.noTypes = True if int(es_info['version']['number'].split('.')[0]) 
>= 6 else False
         if self.noTypes:
             pprint("This is a type-less DB, expanding database names instead.")
+            es = KibbleESWrapper(es)
+            self.DB = es
+            if not es.indices.exists(index = es_config['database'] + "_api"):
+                sys.stderr.write("Could not find database group %s_* in 
ElasticSearch!\n" % es_config['database'])
+                sys.exit(-1)
         else:
             pprint("This DB supports types, utilizing..")
-        if not es.indices.exists(index = es_config['database']):
-            sys.stderr.write("Could not find database %s in ElasticSearch!\n" 
% es_config['database'])
-            sys.exit(-1)
+            if not es.indices.exists(index = es_config['database']):
+                sys.stderr.write("Could not find database %s in 
ElasticSearch!\n" % es_config['database'])
+                sys.exit(-1)
         try:
             apidoc = es.get(index=es_config['database'], doc_type='api', id = 
'current')['_source']
             if apidoc['dbversion'] > KIBBLE_DB_VERSION:

-- 
To stop receiving notification emails like this one, please contact
humbed...@apache.org.

Reply via email to