This is an automated email from the ASF dual-hosted git repository. humbedooh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kibble-scanners.git
commit 36dd76133c00674d6b08f568e872ec690e2683c2 Author: Daniel Gruno <humbed...@apache.org> AuthorDate: Fri Mar 2 15:44:01 2018 +0100 Rewrite broker class, inherit kibble UI wrapper Instead of doing checks constantly, we'll inherit the wrapper class from the UI repo. There are a few cases where we still have to manually do if/else (bulk and api checks), but the rest can be aliases. Bump accepted DB version to 2. --- src/plugins/brokers/kibbleES.py | 91 ++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/src/plugins/brokers/kibbleES.py b/src/plugins/brokers/kibbleES.py index 79d1caa..120d64b 100644 --- a/src/plugins/brokers/kibbleES.py +++ b/src/plugins/brokers/kibbleES.py @@ -21,7 +21,50 @@ import elasticsearch.helpers import threading import sys -KIBBLE_DB_VERSION = 1 # Current DB struct version +KIBBLE_DB_VERSION = 2 # Current DB struct version + +class KibbleESWrapper(object): + """ + Class for rewriting old-style queries to the new ones, + where doc_type is an integral part of the DB name + """ + def __init__(self, ES): + self.ES = ES + self.indices = self.indicesClass(ES) + + def get(self, index, doc_type, id): + return self.ES.get(index = index+'_'+doc_type, doc_type = '_doc', id = id) + def exists(self, index, doc_type, id): + return self.ES.exists(index = index+'_'+doc_type, doc_type = '_doc', id = id) + def delete(self, index, doc_type, id): + return self.ES.delete(index = index+'_'+doc_type, doc_type = '_doc', id = id) + def index(self, index, doc_type, id, body): + return self.ES.index(index = index+'_'+doc_type, doc_type = '_doc', id = id, body = body) + def update(self, index, doc_type, id, body): + return self.ES.update(index = index+'_'+doc_type, doc_type = '_doc', id = id, body = body) + def search(self, index, doc_type, size = 100, _source_include = None, body = None): + return self.ES.search( + index = index+'_'+doc_type, + doc_type = '_doc', + size = size, + _source_include = _source_include, + body = body + ) + def count(self, index, doc_type, body = None): + return self.ES.count( + index = index+'_'+doc_type, + doc_type = '_doc', + body = body + ) + + class indicesClass(object): + """ Indices helper class """ + def __init__(self, ES): + self.ES = ES + + def exists(self, index): + return self.ES.indices.exists(index = index) + # This is redundant, refactor later? def pprint(string, err = False): @@ -59,43 +102,27 @@ class KibbleBit: def updateSource(self, source): """ Updates a source document, usually with a status update """ - if self.broker.noTypes: - self.broker.DB.index(index=self.broker.config['elasticsearch']['database'] + "_source", - doc_type = '_doc', - id=source['sourceID'], - body = source - ) - else: - self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], - doc_type="source", - id=source['sourceID'], - body = source - ) + self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], + doc_type="source", + id=source['sourceID'], + body = source + ) def get(self, doctype, docid): """ Fetches a document from the DB """ - if self.broker.noTypes: - doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'] + "_" + doctype, id = docid) - else: - doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) + doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) if doc: return doc['_source'] return None def exists(self, doctype, docid): """ Checks whether a document already exists or not """ - if self.broker.noTypes: - return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'] + "_" + doctype, id = docid) - else: - return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) + return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) def index(self, doctype, docid, document): """ Adds a new document to the index """ dbname = self.broker.config['elasticsearch']['database'] - if self.broker.noTypes: - self.broker.DB.index(index=dbname + "_" + doctype, doc_type = '_doc', id = docid, body = document) - else: - self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, body = document) + self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, body = document) def append(self, t, doc): """ Append a document to the bulk push queue """ @@ -141,7 +168,7 @@ class KibbleBit: 'doc_as_upsert': True, }) try: - elasticsearch.helpers.bulk(self.broker.DB, js_arr) + elasticsearch.helpers.bulk(self.broker.oDB, js_arr) except Exception as err: pprint("Warning: Could not bulk insert: %s" % err) @@ -227,17 +254,23 @@ class Broker: es_info = es.info() pprint("Connected!") self.DB = es + self.oDB = es # Original ES class, always. the .DB may change self.config = config self.bitClass = KibbleBit # This bit is required since ES 6.x and above don't like document types self.noTypes = True if int(es_info['version']['number'].split('.')[0]) >= 6 else False if self.noTypes: pprint("This is a type-less DB, expanding database names instead.") + es = KibbleESWrapper(es) + self.DB = es + if not es.indices.exists(index = es_config['database'] + "_api"): + sys.stderr.write("Could not find database group %s_* in ElasticSearch!\n" % es_config['database']) + sys.exit(-1) else: pprint("This DB supports types, utilizing..") - if not es.indices.exists(index = es_config['database']): - sys.stderr.write("Could not find database %s in ElasticSearch!\n" % es_config['database']) - sys.exit(-1) + if not es.indices.exists(index = es_config['database']): + sys.stderr.write("Could not find database %s in ElasticSearch!\n" % es_config['database']) + sys.exit(-1) try: apidoc = es.get(index=es_config['database'], doc_type='api', id = 'current')['_source'] if apidoc['dbversion'] > KIBBLE_DB_VERSION: -- To stop receiving notification emails like this one, please contact humbed...@apache.org.