Repository: incubator-senssoft-distill Updated Branches: refs/heads/master 5efac3b27 -> a22b46c99
Applied linter to code Project: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/commit/a22b46c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/tree/a22b46c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/diff/a22b46c9 Branch: refs/heads/master Commit: a22b46c992104452493f2afd27c4acdfa78a1dae Parents: 5efac3b Author: mooshu1x2 <mbe...@draper.com> Authored: Wed Oct 19 17:30:30 2016 -0400 Committer: mooshu1x2 <mbe...@draper.com> Committed: Wed Oct 19 17:30:30 2016 -0400 ---------------------------------------------------------------------- distill/__init__.py | 40 +- distill/algorithms/graphs/graph.py | 17 +- distill/algorithms/graphs/tests/__init__.py | 2 +- distill/algorithms/stats/hist.py | 334 ++++++++--------- distill/algorithms/stats/tests/__init__.py | 2 +- distill/app.py | 408 +++++++++++---------- distill/models/brew.py | 442 ++++++++++++----------- distill/models/stout.py | 165 ++++----- distill/models/tests/__init__.py | 2 +- distill/models/userale.py | 239 ++++++------ distill/server.py | 19 +- distill/tests/basic_test.py | 1 + distill/tests/distill_test.py | 14 +- distill/utils/exceptions.py | 10 +- distill/utils/query_builder.py | 30 +- distill/utils/tests/__init__.py | 2 +- distill/utils/validation.py | 40 +- distill/version.py | 2 +- setup.py | 108 +++--- 19 files changed, 961 insertions(+), 916 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/__init__.py ---------------------------------------------------------------------- diff --git a/distill/__init__.py b/distill/__init__.py index 2b44372..009d341 100644 --- a/distill/__init__.py +++ b/distill/__init__.py @@ -12,34 +12,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - + from flask import Flask from elasticsearch_dsl.connections import connections # Initialize Flask instance -app = Flask (__name__) +app = Flask(__name__) # Load Configurations app.config.from_pyfile('config.cfg') # Unpack Elasticsearch configuration and create elasticsearch connection -host = app.config ['ES_HOST'] -port = app.config ['ES_PORT'] -http_auth = app.config ['HTTP_AUTH'] -use_ssl = app.config ['USE_SSL'] -verify_certs = app.config ['VERIFY_CERTS'] -ca_certs = app.config ['CA_CERTS'] -client_cert = app.config ['CLIENT_CERT'] -client_key = app.config ['CLIENT_KEY'] -timeout = app.config ['TIMEOUT'] +host = app.config['ES_HOST'] +port = app.config['ES_PORT'] +http_auth = app.config['HTTP_AUTH'] +use_ssl = app.config['USE_SSL'] +verify_certs = app.config['VERIFY_CERTS'] +ca_certs = app.config['CA_CERTS'] +client_cert = app.config['CLIENT_CERT'] +client_key = app.config['CLIENT_KEY'] +timeout = app.config['TIMEOUT'] # Initialize Elasticsearch instance -es = connections.create_connection (hosts = [host], - port = port, - http_auth = http_auth, - use_ssl = use_ssl, - verify_certs = verify_certs, - ca_certs = ca_certs, - client_cert = client_cert, - client_key = client_key, - timeout=timeout) \ No newline at end of file +es = connections.create_connection(hosts=[host], + port=port, + http_auth=http_auth, + use_ssl=use_ssl, + verify_certs=verify_certs, + ca_certs=ca_certs, + client_cert=client_cert, + client_key=client_key, + timeout=timeout) http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/algorithms/graphs/graph.py ---------------------------------------------------------------------- diff --git a/distill/algorithms/graphs/graph.py b/distill/algorithms/graphs/graph.py index 3c44730..cd238f5 100644 --- a/distill/algorithms/graphs/graph.py +++ b/distill/algorithms/graphs/graph.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. + class GraphAnalytics (object): - """ - Distill's graph analytics package. Apply graph algorithms to User Ale log data segmented with - Stout. - """ - - @staticmethod - def foo (): - pass \ No newline at end of file + """ + Distill's graph analytics package. Apply graph algorithms + to User Ale log data segmented with Stout. + """ + + @staticmethod + def foo(): + pass http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/algorithms/graphs/tests/__init__.py ---------------------------------------------------------------------- diff --git a/distill/algorithms/graphs/tests/__init__.py b/distill/algorithms/graphs/tests/__init__.py index f6f6899..329a18d 100644 --- a/distill/algorithms/graphs/tests/__init__.py +++ b/distill/algorithms/graphs/tests/__init__.py @@ -19,4 +19,4 @@ distill: tests module. Meant for use with py.test. Organize tests into files, each named xxx_test.py Read more here: http://pytest.org/ -''' \ No newline at end of file +''' http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/algorithms/stats/hist.py ---------------------------------------------------------------------- diff --git a/distill/algorithms/stats/hist.py b/distill/algorithms/stats/hist.py index b516423..9297501 100644 --- a/distill/algorithms/stats/hist.py +++ b/distill/algorithms/stats/hist.py @@ -14,170 +14,176 @@ # limitations under the License. from distill import es -from distill.utils.query_builder import QueryBuilder +# from distill.utils.query_builder import QueryBuilder from flask import jsonify -from elasticsearch import Elasticsearch, TransportError +from elasticsearch import TransportError + class Hist (object): - """ - Distill's statistics package. Apply statistical algorithms to User Ale log data segmented with - Stout. Need to query/filter by session or user id. - """ - - def __init__ (self): - # parse out query - pass - - # @staticmethod - # def filter (app, app_type=None, q=''): - - # field = q.get ("field") if q.get ("field") else "" - # size = q.get ("size") if q.get ("size") else 10 - - # query = { "aggs" : { - # "count_by_type" : { - # "filter" : { "term" : { field : }} - # "terms" : { - # "field" : field, - # "size" : 100 - # } - # } - # } - # } - - # d = {} - # # try: - # response = es.search (index=app, doc_type=app_type, body=query) - # # for tag in response['aggregations']['count_by_type']['buckets']: - # # d [tag ['key']] = tag ['doc_count'] - # # except TransportError as e: - # # d ['error'] = e.info - # # except Exception as e: - # # d ['error'] = str (e) - # # return jsonify (d) - # return jsonify (response) - - @staticmethod - def terms (app, app_type=None, q=''): - """ - Group by field (find all elements ) - """ - field = q.get ("field") if q.get ("field") else "" - segment = q.get ("seg") if q.get ("seg") else "*" - size = q.get ("size") if q.get ("size") else 10000 - numhits = q.get ("numhits") if q.get ("numhits") else 10 - - query = { "aggs" : { - "count_by_type" : { - "terms" : { - "field" : field, - "size" : size # maximum number of keys (unique fields) - }, - "aggs" : { - "top" : { # arbitrary name - "top_hits" : { - "size" : numhits, # number of logs in subgroup - "_source" : { # segment on fields - return only subgroup based on field - "include" : [ - segment - ] - } - } - } - } - } - } - } - - d = {} - # try: - response = es.search (index=app, doc_type=app_type, body=query) - # for tag in response['aggregations']['count_by_type']['buckets']: - # d [tag ['key']] = tag ['doc_count'] - # except TransportError as e: - # d ['error'] = e.info - # except Exception as e: - # d ['error'] = str (e) - # return jsonify (d) - return jsonify (response) - - @staticmethod - def unique_terms (app, app_type=None, q=""): - """ - Aggregate the number of unique terms in a field. Missing values are counted and marked as "N/A". - - .. todo:: - - Need to incorporate QueryBuilder library instead of manually generating queries. - - :param app: [string] application name - :param app_type: [string] application type - :param field: [string] field to search against for unique values - :param size: [int] the top size terms returned in the result. Default value is 10. - :param min_hits: [int] return tags which have been found in min_hits or more. Default value is 1. - :return: [dict] dictionary of results - """ - - field = q.get ("field") if q.get ("field") else "" - size = q.get ("size") if q.get ("size") else 10000 - min_hits = q.get ("min_hits") if q.get ("min_hits") else 0 - - print field - query = { "aggs" : { - "terms_agg" : { - "terms" : { - "field" : field, - "size" : size, - "min_doc_count" : min_hits, - "missing" : "N/A" - } - } - } - } - - d = {} - try: - response = es.search (index=app, doc_type=app_type, body=query) - for tag in response['aggregations']['terms_agg']['buckets']: - d [tag ['key']] = tag ['doc_count'] - except TransportError as e: - d ['error'] = e.info - except Exception as e: - d ['error'] = str (e) - return jsonify (d) - - @staticmethod - def histogram (app, app_type=None, q=""): - """ - Only works on numerical data. - """ - field = q.get ("field") if q.get ("field") else "" - - interval = 50 - query = { "aggs" : { - "hist_agg" : { - "histogram" : { - "field" : field, - "interval" : interval - } - } - } - } - - d = {} - try: - response = es.search (index=app, doc_type=app_type, body=query) - for tag in response['aggregations']['hist_agg']['buckets']: - d [tag ['key']] = tag ['doc_count'] - except TransportError as e: - d ['error'] = e.info - except Exception as e: - d ['error'] = str (e) - return jsonify (d) - - def get_value (): - return 0 - - def _parse_msg (query): - # should have form ?measure=name&field=f1, f2&event=a,b - pass + """ + Distill's statistics package. Apply statistical algorithms + to UserALE log data segmented with Stout. + Need to query/filter by session or user id. + """ + + def __init__(self): + # parse out query + pass + + # @staticmethod + # def filter (app, app_type=None, q=''): + + # field = q.get ("field") if q.get ("field") else "" + # size = q.get ("size") if q.get ("size") else 10 + + # query = { "aggs" : { + # "count_by_type" : { + # "filter" : { "term" : { field : }} + # "terms" : { + # "field" : field, + # "size" : 100 + # } + # } + # } + # } + + # d = {} + # # try: + # response = es.search (index=app, doc_type=app_type, body=query) + # # for tag in response['aggregations']['count_by_type']['buckets']: + # # d [tag ['key']] = tag ['doc_count'] + # # except TransportError as e: + # # d ['error'] = e.info + # # except Exception as e: + # # d ['error'] = str (e) + # # return jsonify (d) + # return jsonify (response) + + @staticmethod + def terms(app, app_type=None, q=''): + """ + Group by field (find all elements ) + """ + field = q.get("field") if q.get("field") else "" + segment = q.get("seg") if q.get("seg") else "*" + size = q.get("size") if q.get("size") else 10000 + numhits = q.get("numhits") if q.get("numhits") else 10 + + query = {"aggs": { + "count_by_type": { + "terms": { + "field": field, + "size": size # maximum number of keys (unique fields) + }, + "aggs": { + "top": { # arbitrary name + "top_hits": { + "size": numhits, # number of logs in subgroup + "_source": { + "include": [ + segment + ] + } + } + } + } + } + } + } + + # d = {} + # try: + response = es.search(index=app, doc_type=app_type, body=query) + # for tag in response['aggregations']['count_by_type']['buckets']: + # d [tag ['key']] = tag ['doc_count'] + # except TransportError as e: + # d ['error'] = e.info + # except Exception as e: + # d ['error'] = str (e) + # return jsonify (d) + return jsonify(response) + + @staticmethod + def unique_terms(app, app_type=None, q=""): + """ + Aggregate the number of unique terms in a field. + Missing values are counted and marked as "N/A". + + .. todo:: + + Need to incorporate QueryBuilder library instead of + manually generating queries. + + :param app: [string] application name + :param app_type: [string] application type + :param field: [string] field to search against for unique values + :param size: [int] the top size terms returned in the result. + Default value is 10. + :param min_hits: [int] return tags which have been found + in min_hits or more. Default value is 1. + :return: [dict] dictionary of results + """ + + field = q.get("field") if q.get("field") else "" + size = q.get("size") if q.get("size") else 10000 + min_hits = q.get("min_hits") if q.get("min_hits") else 0 + + print field + query = {"aggs": { + "terms_agg": { + "terms": { + "field": field, + "size": size, + "min_doc_count": min_hits, + "missing": "N/A" + } + } + } + } + + d = {} + try: + response = es.search(index=app, doc_type=app_type, body=query) + for tag in response['aggregations']['terms_agg']['buckets']: + d[tag['key']] = tag['doc_count'] + except TransportError as e: + d['error'] = e.info + except Exception as e: + d['error'] = str(e) + return jsonify(d) + + @staticmethod + def histogram(app, app_type=None, q=""): + """ + Only works on numerical data. + """ + field = q.get("field") if q.get("field") else "" + + interval = 50 + query = {"aggs": { + "hist_agg": { + "histogram": { + "field": field, + "interval": interval + } + } + } + } + + d = {} + try: + response = es.search(index=app, doc_type=app_type, body=query) + for tag in response['aggregations']['hist_agg']['buckets']: + d[tag['key']] = tag['doc_count'] + except TransportError as e: + d['error'] = e.info + except Exception as e: + d['error'] = str(e) + return jsonify(d) + + def get_value(): + return 0 + + def _parse_msg(query): + # should have form ?measure=name&field=f1, f2&event=a,b + pass http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/algorithms/stats/tests/__init__.py ---------------------------------------------------------------------- diff --git a/distill/algorithms/stats/tests/__init__.py b/distill/algorithms/stats/tests/__init__.py index f6f6899..329a18d 100644 --- a/distill/algorithms/stats/tests/__init__.py +++ b/distill/algorithms/stats/tests/__init__.py @@ -19,4 +19,4 @@ distill: tests module. Meant for use with py.test. Organize tests into files, each named xxx_test.py Read more here: http://pytest.org/ -''' \ No newline at end of file +''' http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/app.py ---------------------------------------------------------------------- diff --git a/distill/app.py b/distill/app.py index 58434a1..a2bb380 100644 --- a/distill/app.py +++ b/distill/app.py @@ -13,204 +13,226 @@ # See the License for the specific language governing permissions and # limitations under the License. -from flask import Flask, request, jsonify +from flask import request +from flask import jsonify from distill import app from distill.models.brew import Brew from distill.models.userale import UserAle from distill.models.stout import Stout from distill.algorithms.stats.hist import Hist -@app.route ('/', methods=['GET']) -def index (): - """ - Show Distill version information, connection status, and all registered applications. - - .. code-block:: bash - - $ curl -XGET https://localhost:8090 - - { - "author" : "Michelle Beard", - "email" : "mbe...@draper.com", - "name": "Distill", - "status" : true, - "version" : "1.0", - "applications" : { - "xdata_v3" : { - testing: 205, - parsed: 500, - }, - "test_app" : { - logs: 500, - parsed: 100, - } - } - } - - :return: Distill's status information as JSON blob - """ - return jsonify (name="Distill", version="1.0 alpha", author="Michelle Beard", email="mbe...@draper.com", status=Brew.get_status (), applications=Brew.get_applications ()) - -@app.route ('/create/<app_id>', methods=['POST', 'PUT']) -def create (app_id): - """ - Registers an application in Distill. - - .. code-block:: bash - - $ curl -XPOST https://localhost:8090/xdata_v3 - - :param app_id: Application name - :return: Newly created application's status as JSON blob - """ - return Brew.create (app_id) - -@app.route ('/status/<app_id>', defaults={"app_type" : None}, methods=['GET']) -@app.route ('/status/<app_id>/<app_type>', methods=['GET']) -def status (app_id, app_type): - """ - Presents meta information about an registered application, including field names and document types. - - .. code-block:: bash - - $ curl -XGET https://localhost:8090/status/xdata_v3 - - { - "application": "xdata_v3", - "health": "green", - "num_docs": "433", - "status": "open" - } - - :param app_id: Application name - :return: Registered applications meta data as JSON blob - """ - return Brew.read (app_id, app_type=app_type) - -@app.route ('/update/<app_id>', methods=['POST', 'PUT']) -def update (app_id): - """ - Renames a specific application - - .. code-block:: bash - - $ curl -XPOST https://localhost:8090/update/xdata_v3?name="xdata_v4" - - :param app_id: Application name - :return: Boolean response message as JSON blob - """ - return Brew.update (app_id) - -@app.route ('/delete/<app_id>', methods=['DELETE']) -def delete (app_id): - """ - Deletes an application permentantly from Distill - - .. code-block:: bash - - $ curl -XDELETE https://localhost:8090/xdata_v3 - - :param app_id: Application name - :return: Boolean response message as JSON blob - """ - return Brew.delete (app_id) - -@app.route ('/search/<app_id>', defaults={"app_type" : None}, methods=['GET']) -@app.route ('/search/<app_id>/<app_type>', methods=['GET']) -def segment (app_id, app_type): - """ - Search against an application on various fields. - - .. code-block:: bash - - $ curl -XGET https://[hostname]:[port]/search/xdata_v3?q=session_id:A1234&size=100&scroll=false&fl=param1,param2 - - :param app_id: Application name - :param app_type: Optional document type to filter against - :param q: Main search query. To return all documents, pass in q=*:* - :param size: Maximum number of documents to return in request - :param scroll: Scroll id if the number of documents exceeds 10,000 - :param fl: List of fields to restrict the result set - :return: JSON blob of result set - """ - q = request.args - return UserAle.segment (app_id, app_type=app_type, params=q) - -@app.route ('/stat/<app_id>', defaults={"app_type" : None}, methods=['GET']) -@app.route ('/stat/<app_id>/<app_type>', methods=['GET']) -def stat (app_id, app_type): - """ - Generic histogram counts for a single registered application filtered optionally by document type. - View the Statistics document page for method definitions and arguments - - .. code-block:: bash - - $ curl -XGET https://localhost:8090/stat/xdata_v3/testing/?stat=terms&elem=signup&event=click - - :param app_id: Application name - :param app_type: Application type - :return: JSON blob of result set - """ - stat = request.args.get ('stat') - q = request.args - - hist_cls = Hist () - method = None - try: - method = getattr (hist_cls, stat) - return method (app_id, app_type, q=q) - except AttributeError: - msg = "Class `{}` does not implement `{}`".format(hist_cls.__class__.__name__, stat) - return jsonify (error=msg) - -@app.route ('/denoise/<app_id>', methods=['GET']) -def denoise (app_id): - """ - Bootstrap script to cleanup the raw logs. A document type called "parsed" - will be stored with new log created unless specified in the request. Have option to save - parsed results back to data store. These parsed logs can be intergrated with STOUT results - by running the stout bootstrap script. - - .. code-block:: bash - - $ curl -XGET https://localhost:8090/denoise/xdata_v3?save=true&type=parsed - - :param app_id: Application name - :return: [dict] - """ - doc_type = 'parsed' - save = False - q = request.args - if 'save' in q: - save = str2bool (q.get ('save')) - if 'type' in q: - # @TODO: Proper cleanup script needs to happen - doc_type = q.get ('type') - return UserAle.denoise (app_id, doc_type=doc_type, save=save) - -@app.route ('/stout', methods=['GET']) -def merge_stout (): - """ - Bootstrap script to aggregate user ale logs to stout master answer table - This will save the merged results back to ES instance at new index stout - OR denoise data first, then merge with the stout index... - If STOUT is enabled, the select method expects a stout index to exist or otherwise - it will return an error message. - - .. code-block:: bash - - $ curl -XGET https://locahost:8090/stout/xdata_v3 - - :return: Status message - """ - flag = app.config ['ENABLE_STOUT'] - if flag: - return Stout.ingest () - return jsonify (status="STOUT is disabled.") + +@app.route('/', methods=['GET']) +def index(): + """ + Show Distill version information, connection status, + and all registered applications. + + .. code-block:: bash + + $ curl -XGET http://localhost:8090 + + { + "author" : "Michelle Beard", + "email" : "mbe...@draper.com", + "name": "Distill", + "status" : true, + "version" : "1.0", + "applications" : { + "xdata_v3" : { + testing: 205, + parsed: 500, + }, + "test_app" : { + logs: 500, + parsed: 100, + } + } + } + + :return: Distill's status information as JSON blob + """ + return jsonify(name="Distill", + version="1.0 alpha", + author="Michelle Beard", + email="mbe...@draper.com", + status=Brew.get_status(), + applications=Brew.get_applications()) + + +@app.route('/create/<app_id>', methods=['POST', 'PUT']) +def create(app_id): + """ + Registers an application in Distill. + + .. code-block:: bash + + $ curl -XPOST http://localhost:8090/xdata_v3 + + :param app_id: Application name + :return: Newly created application's status as JSON blob + """ + return Brew.create(app_id) + + +@app.route('/status/<app_id>', defaults={"app_type": None}, methods=['GET']) +@app.route('/status/<app_id>/<app_type>', methods=['GET']) +def status(app_id, app_type): + """ + Presents meta information about an registered application, + including field names and document types. + + .. code-block:: bash + + $ curl -XGET http://localhost:8090/status/xdata_v3 + + { + "application": "xdata_v3", + "health": "green", + "num_docs": "433", + "status": "open" + } + + :param app_id: Application name + :return: Registered applications meta data as JSON blob + """ + return Brew.read(app_id, app_type=app_type) + + +@app.route('/update/<app_id>', methods=['POST', 'PUT']) +def update(app_id): + """ + Renames a specific application + + .. code-block:: bash + + $ curl -XPOST http://localhost:8090/update/xdata_v3?name="xdata_v4" + + :param app_id: Application name + :return: Boolean response message as JSON blob + """ + return Brew.update(app_id) + + +@app.route('/delete/<app_id>', methods=['DELETE']) +def delete(app_id): + """ + Deletes an application permentantly from Distill + + .. code-block:: bash + + $ curl -XDELETE http://localhost:8090/xdata_v3 + + :param app_id: Application name + :return: Boolean response message as JSON blob + """ + return Brew.delete(app_id) + + +@app.route('/search/<app_id>', defaults={"app_type": None}, methods=['GET']) +@app.route('/search/<app_id>/<app_type>', methods=['GET']) +def segment(app_id, app_type): + """ + Search against an application on various fields. + + .. code-block:: bash + + $ curl -XGET http://localhost:8090/search/xdata_v3?q=session_id:A1234&size=100&scroll=false&fl=param1,param2 + + :param app_id: Application name + :param app_type: Optional document type to filter against + :param q: Main search query. To return all documents, pass in q=*:* + :param size: Maximum number of documents to return in request + :param scroll: Scroll id if the number of documents exceeds 10,000 + :param fl: List of fields to restrict the result set + :return: JSON blob of result set + """ + q = request.args + return UserAle.segment(app_id, app_type=app_type, params=q) + + +@app.route('/stat/<app_id>', defaults={"app_type": None}, methods=['GET']) +@app.route('/stat/<app_id>/<app_type>', methods=['GET']) +def stat(app_id, app_type): + """ + Generic histogram counts for a single registered + application filtered optionally by document type. + + View the Statistics document page for method definitions and arguments + + .. code-block:: bash + + $ curl -XGET http://localhost:8090/stat/xdata_v3/testing/?stat=terms&elem=signup&event=click + + :param app_id: Application name + :param app_type: Application type + :return: JSON blob of result set + """ + stat = request.args.get('stat') + q = request.args + + hist_cls = Hist() + method = None + try: + method = getattr(hist_cls, stat) + return method(app_id, app_type, q=q) + except AttributeError: + msg = "Class `{}` does not implement `{}`".format( + hist_cls.__class__.__name__, stat) + return jsonify(error=msg) + + +@app.route('/denoise/<app_id>', methods=['GET']) +def denoise(app_id): + """ + Bootstrap script to cleanup the raw logs. A document type called "parsed" + will be stored with new log created unless specified in the request. + Have option to save parsed results back to data store. + These parsed logs can be intergrated with STOUT results + by running the stout bootstrap script. + + .. code-block:: bash + + $ curl -XGET http://localhost:8090/denoise/xdata_v3?save=true&type=parsed + + :param app_id: Application name + :return: [dict] + """ + doc_type = 'parsed' + save = False + # q = request.args + # if 'save' in q: + # save = str2bool(q.get('save')) + # if 'type' in q: + # # @TODO: Proper cleanup script needs to happen + # doc_type = q.get('type') + return UserAle.denoise(app_id, doc_type=doc_type, save=save) + + +@app.route('/stout', methods=['GET']) +def merge_stout(): + """ + Bootstrap script to aggregate user ale logs to stout master answer table + This will save the merged results back to ES instance at new index stout + OR denoise data first, then merge with the stout index... + If STOUT is enabled, the select method expects a stout index + to exist or otherwise it will return an error message. + + .. code-block:: bash + + $ curl -XGET http://locahost:8090/stout/xdata_v3 + + :return: Status message + """ + flag = app.config['ENABLE_STOUT'] + if flag: + return Stout.ingest() + return jsonify(status="STOUT is disabled.") + @app.errorhandler(404) -def page_not_found (error): - """ - Generic Error Message - """ - return "Unable to find Distill." +def page_not_found(error): + """ + Generic Error Message + """ + return "Unable to find Distill." http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/models/brew.py ---------------------------------------------------------------------- diff --git a/distill/models/brew.py b/distill/models/brew.py index 28d16b3..8357de6 100644 --- a/distill/models/brew.py +++ b/distill/models/brew.py @@ -14,222 +14,234 @@ # limitations under the License. -from elasticsearch import Elasticsearch, TransportError +from elasticsearch import TransportError from flask import jsonify from distill import es + class Brew (object): - """ - Distill supports basic CRUD operations and publishes the status - of an persistenct database. Eventually it will support ingesting logs sent from - an registered application. - """ - - @staticmethod - def get_status (): - """ - Fetch the status of the underlying database instance. - - :return: [bool] if connection to database instance has been established - """ - return es.ping (ignore=[400, 404]) - - @staticmethod - def get_applications (): - """ - Fetch all the registered applications in Distill. - - .. note:: Private indexes starting with a period are not included in the result set - - :return: [dict] dictionary of all registered applications and meta information - """ - doc = {} - query = { "aggs" : { - "count_by_type" : { - "terms" : { - "field" : "_type", - "size" : 100 - } - } - } - } - - try: - cluster_status = es.cat.indices (h=["index"], pri=False) - x = cluster_status.splitlines() - - for idx in x: - idx = idx.rstrip () - - # Ignore private indexes (like .kibana or .stout) - if idx [:1] != '.': - response = es.search (index=idx, body=query) - d = {} - for tag in response["aggregations"]["count_by_type"]["buckets"]: - d [tag ['key']] = tag ['doc_count'] - doc [idx] = d - except TransportError as e: - doc ['error'] = e.info - except Exception as e: - doc ['error'] = str (e) - return doc - - @staticmethod - def create (app): - """ - Register a new application in Distill - - .. code-block:: bash - - { - "application" : "xdata_v3", - "health" : "green", - "num_docs" : 0, - "status" : "open" - } - - :param app: [string] application name (e.g. xdata_v3) - :return: [dict] dictionary of application and its meta information - """ - - # ignore 400 cause by IndexAlreadyExistsException when creating an index - res = es.indices.create (index=app, ignore=[400, 404]) - doc = _get_cluster_status (app) - return jsonify (doc) - - @staticmethod - def read (app, app_type=None): - """ - Fetch meta data associated with an application - - .. code-block:: bash - - Example: - { - "application" : "xdata_v3", - "health" : "green", - "num_docs" : "100", - "status" : "open" - "types" : { - "raw_logs" : { - "@timestamp" : "date", - "action" : "string", - "elementId" : "string" - }, - "parsed" : { - "@timestamp" : "date", - "elementId_interval" : "string" - }, - "graph" : { - "uniqueID" : "string", - "transition_count" : "long", - "p_value" : "float" - } - } - } - - :param app: [string] application name (e.g. xdata_v3) - :return: [dict] dictionary of application and its meta information - """ - - return jsonify (_get_cluster_status (app, app_type=app_type)) - - @staticmethod - def update (app): - """ - .. todo:: - Currently not implemented - """ - - return jsonify (status="not implemented") - - @staticmethod - def delete (app): - """ - Technically closes the index so its content is not searchable. - - .. code-block: bash - - Example: - { - status: "Deleted index xdata_v3" - } - - :param app: [string] application name (e.g. xdata_v3) - :return: [dict] status message of the event - """ - - es.indices.close (index=app, ignore=[400, 404]) - return jsonify (status="Deleted index %s" % app) - -def _get_cluster_status (app, app_type=None): - """ - Return cluster status, index health, and document count as string - - @todo figure out how to count individual documents stored at an app_type (currently shows only index count) - :param app: [string] application name (e.g. xdata_v3) - :return: [dict] dictionary of index meta data including field names - """ - - doc = {} - try: - cluster_status = es.cat.indices (index=app, h=["health", "status", "docs.count"], pri=True, ignore=[400, 404]) - v = str (cluster_status).split (" ") - m = ["health", "status", "num_docs"] - doc = dict (zip (m, v)) - # Add back application - doc ["application"] = app - except TransportError as e: - doc ['error'] = e.info - except Exception as e: - doc ['error'] = str (e) - - doc ['fields'] = _get_all_fields (app, app_type) - return doc - -def _parse_mappings (app, app_type=None): - """ - .. todo: - - Need to parse out result set that presents field list and type - """ - - try: - mappings = es.indices.get_mapping (index=app, doc_type=[app_type], ignore=[400, 404]) - # mappings = yaml.safe_load (json.ess (mappings)) - # print json.dumps (mappings [app]["mappings"], indent=4, separators=(',', ': ')) - ignore = ["properties", "format"] - except TransportError as e: - doc ['error'] = e.info - except Exception as e: - doc ['error'] = str (e) - return doc - -def _get_all_fields (app, app_type=None): - """ - Retrieve all possible fields in an application - - :param app: [string] application name (e.g. xdata_v3) - :param app_type: [string] application type (e.g. logs) - :return: [list] list of strings representing the fields names - """ - d = list () - query = { "aggs" : { - "fields" : { - "terms" : { - "field" : "_field_names", - "size" : 100 - } - } - } - } - - try: - response = es.search (index=app, doc_type=app_type, body=query) - for tag in response['aggregations']['fields']['buckets']: - d.append (tag ['key']) - except TransportError as e: - d.append (str (e.info)) - except Exception as e: - d.append (str (e)) - return d + """ + Distill supports basic CRUD operations and publishes the status + of an persistenct database. Eventually it will support ingesting + logs sent from a registered application. + """ + + @staticmethod + def get_status(): + """ + Fetch the status of the underlying database instance. + + :return: [bool] if connection to database instance has been established + """ + return es.ping(ignore=[400, 404]) + + @staticmethod + def get_applications(): + """ + Fetch all the registered applications in Distill. + + .. note:: Private indexes starting with a period are not included + in the result set + + :return: [dict] dictionary of all registered applications and meta info + """ + doc = {} + query = {"aggs": { + "count_by_type": { + "terms": { + "field": "_type", + "size": 100 + } + } + } + } + + try: + cluster_status = es.cat.indices(h=["index"], pri=False) + x = cluster_status.splitlines() + + for idx in x: + idx = idx.rstrip() + + # Ignore private indexes (like .kibana or .stout) + if idx[:1] != '.': + response = es.search(index=idx, body=query) + d = {} + for tag in response["aggregations"]["count_by_type"]["buckets"]: + d[tag['key']] = tag['doc_count'] + doc[idx] = d + except TransportError as e: + doc['error'] = e.info + except Exception as e: + doc['error'] = str(e) + return doc + + @staticmethod + def create(app): + """ + Register a new application in Distill + + .. code-block:: bash + + { + "application" : "xdata_v3", + "health" : "green", + "num_docs" : 0, + "status" : "open" + } + + :param app: [string] application name (e.g. xdata_v3) + :return: [dict] dictionary of application and its meta information + """ + + # ignore 400 cause by IndexAlreadyExistsException when creating index + res = es.indices.create(index=app, ignore=[400, 404]) + doc = _get_cluster_status(app) + return jsonify(doc) + + @staticmethod + def read(app, app_type=None): + """ + Fetch meta data associated with an application + + .. code-block:: bash + + Example: + { + "application" : "xdata_v3", + "health" : "green", + "num_docs" : "100", + "status" : "open" + "types" : { + "raw_logs" : { + "@timestamp" : "date", + "action" : "string", + "elementId" : "string" + }, + "parsed" : { + "@timestamp" : "date", + "elementId_interval" : "string" + }, + "graph" : { + "uniqueID" : "string", + "transition_count" : "long", + "p_value" : "float" + } + } + } + + :param app: [string] application name (e.g. xdata_v3) + :return: [dict] dictionary of application and its meta information + """ + + return jsonify(_get_cluster_status(app, app_type=app_type)) + + @staticmethod + def update(app): + """ + .. todo:: + Currently not implemented + """ + + return jsonify(status="not implemented") + + @staticmethod + def delete(app): + """ + Technically closes the index so its content is not searchable. + + .. code-block: bash + + Example: + { + status: "Deleted index xdata_v3" + } + + :param app: [string] application name (e.g. xdata_v3) + :return: [dict] status message of the event + """ + + es.indices.close(index=app, ignore=[400, 404]) + return jsonify(status="Deleted index %s" % app) + + +def _get_cluster_status(app, app_type=None): + """ + Return cluster status, index health, and document count as string + + @todo figure out how to count individual documents stored + at an app_type (currently shows only index count) + :param app: [string] application name (e.g. xdata_v3) + :return: [dict] dictionary of index meta data including field names + """ + + doc = {} + try: + cluster_status = es.cat.indices(index=app, + h=["health", "status", "docs.count"], + pri=True, + ignore=[400, 404]) + v = str(cluster_status).split(" ") + m = ["health", "status", "num_docs"] + doc = dict(zip(m, v)) + # Add back application + doc["application"] = app + except TransportError as e: + doc['error'] = e.info + except Exception as e: + doc['error'] = str(e) + + doc['fields'] = _get_all_fields(app, app_type) + return doc + + +def _parse_mappings(app, app_type=None): + """ + .. todo: + + Need to parse out result set that presents field list and type + """ + doc = {} + try: + mappings = es.indices.get_mapping(index=app, + doc_type=[app_type], + ignore=[400, 404]) + # mappings = yaml.safe_load (json.ess (mappings)) + # print json.dumps (mappings [app]["mappings"], indent=4, + # separators=(',', ': ')) + ignore = ["properties", "format"] + except TransportError as e: + doc['error'] = e.info + except Exception as e: + doc['error'] = str(e) + return doc + + +def _get_all_fields(app, app_type=None): + """ + Retrieve all possible fields in an application + + :param app: [string] application name (e.g. xdata_v3) + :param app_type: [string] application type (e.g. logs) + :return: [list] list of strings representing the fields names + """ + d = list() + query = {"aggs": { + "fields": { + "terms": { + "field": "_field_names", + "size": 100 + } + } + } + } + + try: + response = es.search(index=app, doc_type=app_type, body=query) + for tag in response['aggregations']['fields']['buckets']: + d.append(tag['key']) + except TransportError as e: + d.append(str(e.info)) + except Exception as e: + d.append(str(e)) + return d http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/models/stout.py ---------------------------------------------------------------------- diff --git a/distill/models/stout.py b/distill/models/stout.py index d6421d8..3517553 100644 --- a/distill/models/stout.py +++ b/distill/models/stout.py @@ -13,31 +13,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -from distill import app, es -from elasticsearch_dsl import DocType, String, Boolean, Date, Nested, Search -from elasticsearch_dsl.query import MultiMatch, Match, Q -from elasticsearch import Elasticsearch, TransportError +from distill import app +from elasticsearch_dsl import DocType, String, Nested from flask import jsonify -import pandas as pd +import pandas as pd + class StoutDoc (DocType): """ Representation of a Stout documentat. """ - sessionID = String (index="not_analyzed") - task1 = Nested () - task2 = Nested () + sessionID = String(index="not_analyzed") + task1 = task2 = Nested() class Meta: index = '.stout' doc_type = 'testing' - def save (self, *args, **kwargs): + def save(self, *args, **kwargs): """ Save data from parsing as a Stout document in Distill """ - return super (StoutDoc, self).save (*args, **kwargs) + return super(StoutDoc, self).save(*args, **kwargs) + class Stout (object): """ @@ -45,105 +44,107 @@ class Stout (object): """ @staticmethod - def ingest (): + def ingest(): """ Ingest data coming from Stout to Distill """ # Create the mappings in elasticsearch - StoutDoc.init () + StoutDoc.init() status = True - data = _parse (); + data = _parse() try: - for k,v in data.items (): - doc = StoutDoc () + for k, v in data.items(): + doc = StoutDoc() if 'sessionID' in v: doc.sessionID = v['sessionID'] if 'task1' in v: doc.task1 = v['task1'] if 'task2' in v: doc.task2 = v['task2'] - doc.save () - except Error as e: + doc.save() + except: status = False - return jsonify (status=status) + return jsonify(status=status) + -def _parse (): +def _parse(): """ Parse master answer table with mapping into an associative array :return: [dict] dictionary of session information """ - master = app.config ['MASTER'] - mappings = app.config ['MAPPINGS'] - - fileContents=pd.read_csv(master, encoding='utf-8') - plainTextMappings=pd.read_csv(mappings, encoding='raw_unicode_escape') - headers=list(fileContents.columns.values) - - #generate the mapping between header and plain text - translationRow={}; - for fieldIndex in range(1,len(headers)): - t=plainTextMappings.ix[fieldIndex] - translationRow[headers[fieldIndex]]=t[9] - - dictBySessionID={} - translationRow['items.text']='foo' - index=0 + master = app.config['MASTER'] + mappings = app.config['MAPPINGS'] + + fileContents = pd.read_csv(master, encoding='utf-8') + plainTextMappings = pd.read_csv(mappings, encoding='raw_unicode_escape') + headers = list(fileContents.columns.values) + + # generate the mapping between header and plain text + translationRow = {} + for fieldIndex in range(1, len(headers)): + t = plainTextMappings.ix[fieldIndex] + translationRow[headers[fieldIndex]] = t[9] + + dictBySessionID = {} + translationRow['items.text'] = 'foo' + index = 0 for row in fileContents.iterrows(): - index=index+1 - - taskMetrics={} - index,data=row - identifier=row[1][0].split("::") - sessionID=identifier[0] - taskID=(identifier[1]) - workingData={} - #is this session id already in the dictionary? + index = index + 1 + + index, data = row + identifier = row[1][0].split("::") + sessionID = identifier[0] + taskID = (identifier[1]) + workingData = {} + # is this session id already in the dictionary? if sessionID in dictBySessionID: - #grab the entry as workingData - workingData=dictBySessionID[sessionID] - - sysData={} - task1Data={} - task2Data={} - metaData={} - d={} - - for fieldIndex in range(1,len(headers)): - if not pd.isnull(row[1][fieldIndex]): #only interested in non-null fields - tempDict={} + # grab the entry as workingData + workingData = dictBySessionID[sessionID] + + sysData = {} + task1Data = {} + task2Data = {} + metaData = {} + d = {} + + for fieldIndex in range(1, len(headers)): + # only interested in non-null fields + if not pd.isnull(row[1][fieldIndex]): + tempDict = {} if headers[fieldIndex] in translationRow: - tempDict['field']=translationRow[headers[fieldIndex]] - #tempDict['field']=translationRow[9] - tempDict['value']=row[1][fieldIndex] - d[headers[fieldIndex]]=row[1][fieldIndex] + tempDict['field'] = translationRow[headers[fieldIndex]] + # tempDict['field']=translationRow[9] + tempDict['value'] = row[1][fieldIndex] + d[headers[fieldIndex]] = row[1][fieldIndex] if "SYS" in headers[fieldIndex]: - sysData[headers[fieldIndex]]=tempDict + sysData[headers[fieldIndex]] = tempDict elif "OT1" in headers[fieldIndex]: - task1Data[headers[fieldIndex]]=tempDict + task1Data[headers[fieldIndex]] = tempDict elif "OT2" in headers[fieldIndex]: - task2Data[headers[fieldIndex]]=tempDict + task2Data[headers[fieldIndex]] = tempDict else: - metaData[headers[fieldIndex]]=tempDict - - if d['TSK_TIME_DIFF_']>0: #block tasks with zero time elapsed - a=int(d['TSK_TIME_DIFF_OT1_']) - b=int(d['TSK_TIME_DIFF_OT2_']) - #figure out which task the values belong to - if ((a>0) & (b<=0)): - task1Data['taskID']=taskID - task1Data['meta']=metaData - task1Data['system']=sysData - workingData['task1']=task1Data - elif ((a<=0) & (b>0)): - task2Data['taskID']=taskID - task2Data['meta']=metaData - task2Data['system']=sysData - workingData['task2']=task2Data + metaData[headers[fieldIndex]] = tempDict + + if d['TSK_TIME_DIFF_'] > 0: # block tasks with zero time elapsed + a = int(d['TSK_TIME_DIFF_OT1_']) + b = int(d['TSK_TIME_DIFF_OT2_']) + # figure out which task the values belong to + if ((a > 0) & (b <= 0)): + task1Data['taskID'] = taskID + task1Data['meta'] = metaData + task1Data['system'] = sysData + workingData['task1'] = task1Data + elif ((a <= 0) & (b > 0)): + task2Data['taskID'] = taskID + task2Data['meta'] = metaData + task2Data['system'] = sysData + workingData['task2'] = task2Data else: - raise ValueError('Encountered an unexpected task time diff state') + raise ValueError( + 'Encountered an unexpected task time diff state') - workingData['sessionID'] = sessionID - dictBySessionID[sessionID]=workingData + workingData['sessionID'] = sessionID + dictBySessionID[sessionID] = workingData return dictBySessionID http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/models/tests/__init__.py ---------------------------------------------------------------------- diff --git a/distill/models/tests/__init__.py b/distill/models/tests/__init__.py index f6f6899..329a18d 100644 --- a/distill/models/tests/__init__.py +++ b/distill/models/tests/__init__.py @@ -19,4 +19,4 @@ distill: tests module. Meant for use with py.test. Organize tests into files, each named xxx_test.py Read more here: http://pytest.org/ -''' \ No newline at end of file +''' http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/models/userale.py ---------------------------------------------------------------------- diff --git a/distill/models/userale.py b/distill/models/userale.py index f63fa51..a512a50 100644 --- a/distill/models/userale.py +++ b/distill/models/userale.py @@ -13,125 +13,126 @@ # See the License for the specific language governing permissions and # limitations under the License. -from elasticsearch import Elasticsearch, TransportError -from elasticsearch_dsl import DocType, String, Boolean, Date, Float, Search -from elasticsearch_dsl.query import MultiMatch, Match, Q -from elasticsearch import Elasticsearch, TransportError -from elasticsearch_dsl.connections import connections -from werkzeug.datastructures import ImmutableMultiDict, MultiDict +from flask import jsonify +from distill import es +from distill import Stout -from flask import jsonify, Markup -from distill import app, es -import datetime class UserAle (object): - """ - Main method of entry to perform segmentation and integration of STOUT's master - answer table (if STOUT is enabled). Advanced and basic analytics is performed in the - distill.algorithms.stats and distill.algorithms.graphs module. - """ - - @staticmethod - def segment (app, app_type=None, params=''): - """ - Just support match all for now. - """ - q = params.get ("q") if params.get ("q") else {} - fields = params.get ("fields") if params.get ("fields") else [] - size = params.get ("size") if params.get ("size") else 10 - scroll = params.get ("scroll") if params.get ("scroll") else False - fl = params.get ("fl") if params.get ("fl") else [] - - # filters = params.get ("filter") if params.get ("filter") else {} - - # 'q': args.get('q', '{}'), - # 'fields': args.get('fl', '{}'), - # 'size': args.get ('size', 100), - # 'scroll': args.get ('scroll', False), - # 'filters': request_args.getlist ('fq') - query = {} - query ['size'] = size - - if q: - res = q.split(":") - key = res [0] - val = res [1] - query ['query'] = {"match" : { key : val } } - else: - query ['query'] = {"match_all" : {}} - - if len (fields) > 0: - ex = { - "include" : fields.split(",") - } - query ['_source'] = ex - - - response = es.search (index=app, doc_type=app_type, body=query) - - return jsonify (response) - - @staticmethod - def search (app, - app_type=None, - filters=list (), - size=100, - include="*", - scroll=None, - sort_field=None): - """ - Perform a search query. - - :param app: [string] application id (e.g. "xdata_v3") - :param app_type: [string] name of the application type. If None all application types are searched. - :param filters: [list of strings] list of filters for a query. - :param size: [int] maximum number of hits that should be returned - :param sort_field: [string] sorting field. Currently supported fields: "timestamp", "date" - :return: [dict] dictionary with processed results. If STOUT is enabled, STOUT data will be merged with final result. - """ - - # Need some query builder... - log_result = es.search (index=app, doc_type=app_type, body=query, fields=filters, size=size) - - stout_result = Stout.getSessions () - - data = merged_results (log_result, stout_result) - return data - - @staticmethod - def denoise (app, app_type='parsed', save=False): - """ - """ - pass - -""" -Combine a list of dictionaries together to form one complete dictionary -""" -def merge_dicts (lst): - dall = {} - for d in lst: - dall.update (d) - return dall - -""" -Get query parameters from the request and preprocess them. -:param [dict-like structure] Any structure supporting get calls -:result [dict] Parsed parameters -""" -def parse_query_parameters (indx, app_type=None, request_args = {}): - args = {key: value[0] for (key, value) in dict (request_args).iteritems ()} - - # print "args = ", args - # Parse out simple filter queries - filters = [] - for filter in get_all_fields (indx, app_type): - if filter in args: - filters.append((filter, args[filter])) - - return { - 'q': args.get('q', '{}'), - 'fields': args.get('fl', []), - 'size': args.get ('size', 100), - 'scroll': args.get ('scroll', False), - 'filters': request_args.getlist ('fq') - } \ No newline at end of file + """ + Main method of entry to perform segmentation and integration of STOUT's + master answer table (if STOUT is enabled). Advanced and basic analytics + is performed in the distill.algorithms.stats and + distill.algorithms.graphs module. + """ + + @staticmethod + def segment(app, app_type=None, params=''): + """ + Just support match all for now. + """ + q = params.get("q") if params.get("q") else {} + fields = params.get("fields") if params.get("fields") else [] + size = params.get("size") if params.get("size") else 10 + scroll = params.get("scroll") if params.get("scroll") else False + fl = params.get("fl") if params.get("fl") else [] + + # filters = params.get ("filter") if params.get ("filter") else {} + + # 'q': args.get('q', '{}'), + # 'fields': args.get('fl', '{}'), + # 'size': args.get ('size', 100), + # 'scroll': args.get ('scroll', False), + # 'filters': request_args.getlist ('fq') + query = {} + query['size'] = size + + if q: + res = q.split(":") + key = res[0] + val = res[1] + query['query'] = {"match": {key: val}} + else: + query['query'] = {"match_all": {}} + + if len(fields) > 0: + ex = { + "include": fields.split(",") + } + query['_source'] = ex + + response = es.search(index=app, doc_type=app_type, body=query) + + return jsonify(response) + + @staticmethod + def search(app, + app_type=None, + filters=list(), + size=100, + include="*", + scroll=None, + sort_field=None): + """ + Perform a search query. + + :param app: [string] application id (e.g. "xdata_v3") + :param app_type: [string] name of the application type. + If None all application types are searched. + :param filters: [list of strings] list of filters for a query. + :param size: [int] maximum number of hits that should be returned + :param sort_field: [string] sorting field. + Currently supported fields: "timestamp", "date" + :return: [dict] dictionary with processed results. + If STOUT is enabled, STOUT data will be merged with final result. + """ + + # Need some query builder... + query = {} + log_result = es.search(index=app, doc_type=app_type, + body=query, fields=filters, size=size) + + stout_result = Stout.getSessions() + + data = merged_results(log_result, stout_result) + return data + + @staticmethod + def denoise(app, app_type='parsed', save=False): + """ + """ + pass + + +def merge_dicts(lst): + """ + Combine a list of dictionaries together to form one complete dictionary + """ + dall = {} + for d in lst: + dall.update(d) + return dall + + +def parse_query_parameters(indx, app_type=None, request_args={}): + """ + Get query parameters from the request and preprocess them. + :param [dict-like structure] Any structure supporting get calls + :result [dict] Parsed parameters + """ + args = {key: value[0] for (key, value) in dict(request_args).iteritems()} + + # print "args = ", args + # Parse out simple filter queries + filters = [] + for filter in get_all_fields(indx, app_type): + if filter in args: + filters.append((filter, args[filter])) + + return { + 'q': args.get('q', '{}'), + 'fields': args.get('fl', []), + 'size': args.get('size', 100), + 'scroll': args.get('scroll', False), + 'filters': request_args.getlist('fq') + } http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/server.py ---------------------------------------------------------------------- diff --git a/distill/server.py b/distill/server.py index 23acd83..9cd3d12 100644 --- a/distill/server.py +++ b/distill/server.py @@ -14,16 +14,15 @@ # limitations under the License. from distill import app -from distill.app import * -""" -Start up a local WSGI server called development -""" -def dev_server (): - host = app.config ['HOST'] - port = app.config ['PORT'] - debug = app.config ['DEBUG'] - app.run (host=host, port=port, debug=debug) + +def dev_server(): + """Start up a local WSGI server called development""" + host = app.config['HOST'] + port = app.config['PORT'] + debug = app.config['DEBUG'] + app.run(host=host, port=port, debug=debug) + if __name__ == '__main__': - dev_server () + dev_server() http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/tests/basic_test.py ---------------------------------------------------------------------- diff --git a/distill/tests/basic_test.py b/distill/tests/basic_test.py index 712d1fe..3f44294 100644 --- a/distill/tests/basic_test.py +++ b/distill/tests/basic_test.py @@ -20,5 +20,6 @@ Write each test as a function named test_<something>. Read more here: http://pytest.org/ ''' + def test_example(): assert True http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/tests/distill_test.py ---------------------------------------------------------------------- diff --git a/distill/tests/distill_test.py b/distill/tests/distill_test.py index 2fb6502..dc64027 100644 --- a/distill/tests/distill_test.py +++ b/distill/tests/distill_test.py @@ -12,15 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from flask import Flask, request -from distill import app as test_app -def test_example (): - assert True - # with test_app.test_client () as c: - # rv = c.get ('/?tequila=42') - # assert request.args ['tequila'] == '42' +def test_example(): + assert True + # with test_app.test_client () as c: + # rv = c.get ('/?tequila=42') + # assert request.args ['tequila'] == '42' # import os # import flaskr @@ -40,4 +38,4 @@ def test_example (): # os.unlink(flaskr.app.config['DATABASE']) # if __name__ == '__main__': -# unittest.main() \ No newline at end of file +# unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/utils/exceptions.py ---------------------------------------------------------------------- diff --git a/distill/utils/exceptions.py b/distill/utils/exceptions.py index a391241..5839bf1 100644 --- a/distill/utils/exceptions.py +++ b/distill/utils/exceptions.py @@ -13,13 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. + class Error (Exception): """Base class for exceptions.""" pass + class ValidationError (Error): - """ Exceptions raised for errors in validated a url.""" + """ Exceptions raised for errors in validated a url.""" - def __init__ (self, url, msg): - self.url = url - self.msg = msg + def __init__(self, url, msg): + self.url = url + self.msg = msg http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/utils/query_builder.py ---------------------------------------------------------------------- diff --git a/distill/utils/query_builder.py b/distill/utils/query_builder.py index 017a08b..111ef19 100644 --- a/distill/utils/query_builder.py +++ b/distill/utils/query_builder.py @@ -13,23 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -class QueryBuilder (object): - - def __init__ (self, query=None): - if query: - self.query = query - else: - self.query = { - "query" : { - "match_all" : {} - } - } +class QueryBuilder (object): - def add_filters (self, filters): - pass + def __init__(self, query=None): + if query: + self.query = query + else: + self.query = { + "query": { + "match_all": {} + } + } - def add_sorting (self, sort_field='', sort_order=''): - pass + def add_filters(self, filters): + pass - \ No newline at end of file + def add_sorting(self, sort_field='', sort_order=''): + pass http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/utils/tests/__init__.py ---------------------------------------------------------------------- diff --git a/distill/utils/tests/__init__.py b/distill/utils/tests/__init__.py index 09c5e2f..29767a2 100644 --- a/distill/utils/tests/__init__.py +++ b/distill/utils/tests/__init__.py @@ -18,4 +18,4 @@ distill: tests module. Meant for use with py.test. Organize tests into files, each named xxx_test.py Read more here: http://pytest.org/ -''' \ No newline at end of file +''' http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/utils/validation.py ---------------------------------------------------------------------- diff --git a/distill/utils/validation.py b/distill/utils/validation.py index 7cd3362..88c9661 100644 --- a/distill/utils/validation.py +++ b/distill/utils/validation.py @@ -15,25 +15,27 @@ from distill.utils.exceptions import ValidationError -def validate_request (q): - """ - Parse out request message and validate inputs - :param q: Url query string - :raises ValidationError: if the query is missing required parameters - """ - if 'q' not in q: - raise ValidationError ("Missing required parameter: %s" % 'q') - else: - # Handle rest of parsing - pass +def validate_request(q): + """ + Parse out request message and validate inputs -def str2bool (v): - """ - Convert string expression to boolean + :param q: Url query string + :raises ValidationError: if the query is missing required parameters + """ + if 'q' not in q: + raise ValidationError("Missing required parameter: %s" % 'q') + else: + # Handle rest of parsing + pass - :param v: Input value - :returns: Converted message as boolean type - :rtype: bool - """ - return v.lower() in ("yes", "true", "t", "1") \ No newline at end of file + +def str2bool(v): + """ + Convert string expression to boolean + + :param v: Input value + :returns: Converted message as boolean type + :rtype: bool + """ + return v.lower() in ("yes", "true", "t", "1") http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/distill/version.py ---------------------------------------------------------------------- diff --git a/distill/version.py b/distill/version.py index 6532ea7..1088c72 100644 --- a/distill/version.py +++ b/distill/version.py @@ -19,4 +19,4 @@ This file is imported by ``Distill.__init__``, and parsed by ``setup.py``. """ -__version__ = "0.1.3" \ No newline at end of file +__version__ = "0.1.4" http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/a22b46c9/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 8ddd32f..dc7f9f3 100644 --- a/setup.py +++ b/setup.py @@ -15,73 +15,75 @@ from __future__ import absolute_import from setuptools import setup, find_packages -import distutils.cmd -import distutils.log -from setuptools.command.test import test as TestCommand -import io, os, sys, subprocess +import io +import os +import sys if sys.version_info[:2] < (2, 7): m = "Python 2.7 or later is required for Distill (%d.%d detected)." - raise ImportError (m % sys.version_info[:2]) + raise ImportError(m % sys.version_info[:2]) if sys.argv[-1] == 'setup.py': print ("To install, run 'python setup.py install'") print () - -def read (*filenames, **kwargs): - encoding = kwargs.get ('encoding', 'utf-8') - sep = kwargs.get ('sep', '\n') + + +def read(*filenames, **kwargs): + encoding = kwargs.get('encoding', 'utf-8') + sep = kwargs.get('sep', '\n') buf = [] for filename in filenames: - with io.open (filename, encoding=encoding) as f: - buf.append (f.read ()) - return sep.join (buf) + with io.open(filename, encoding=encoding) as f: + buf.append(f.read()) + return sep.join(buf) + # Get the version string -def get_version (): - basedir = os.path.dirname (__file__) - with open (os.path.join (basedir, 'distill/version.py')) as f: +def get_version(): + basedir = os.path.dirname(__file__) + with open(os.path.join(basedir, 'distill/version.py')) as f: version = {} - exec (f.read (), version) + exec (f.read(), version) return version['__version__'] - raise RuntimeError ('No version info found.') + raise RuntimeError('No version info found.') -setup ( - name = "Distill", - version = get_version (), - url = "https://github.com/apache/incubator-senssoft-distill", - license = "Apache Software License", - author = "Michelle Beard", - author_email = "msbe...@apache.org", - description = "An analytical framework for UserALE.", - long_description = __doc__, - classifiers = [ - 'Development Status :: 4 - Beta', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', - 'Natural Language :: English', - 'Environment :: Web Environment', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: OS Independent', - 'Private :: Do Not Upload"' - ], - keywords = "stout userale tap distill", # Separate with spaces - packages = find_packages (exclude=['examples', 'tests']), - include_package_data = True, - zip_safe = False, - setup_requires = ['pytest-runner'], - tests_require = ['pytest>=3.0.0', 'pytest-pylint', 'coverage'], - install_requires = ['Flask==0.10.1', - #'networkx==1.11', - 'elasticsearch-dsl==2.0.0', - #'numpy>=1.10.0', - #'scipy>=0.17.0', - 'pandas>=0.18.1' + +setup( + name="Distill", + version=get_version(), + url="https://github.com/apache/incubator-senssoft-distill", + license="Apache Software License 2.0", + author="Michelle Beard", + author_email="msbe...@apache.org", + description="An analytical framework for UserALE.", + long_description=__doc__, + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Intended Audience :: Science/Research', + 'License :: OSI Approved :: Apache Software License', + 'Natural Language :: English', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.5', + 'Environment :: Web Environment', + 'Framework :: Flask', + 'Framework :: Pytest', + 'Topic :: Internet :: Log Analysis' ], - entry_points = { - 'console_scripts': [ - 'dev = distill.server:dev_server' + keywords="stout userale tap distill", + packages=find_packages(exclude=['examples', 'tests']), + include_package_data=True, + zip_safe=False, + setup_requires=['pytest-runner'], + tests_require=['pytest>=3.0.0', 'pytest-pylint', 'coverage'], + install_requires=['Flask==0.10.1', + 'elasticsearch-dsl==2.0.0', + 'pandas>=0.18.1'], + entry_points={ + 'console_scripts': [ + 'dev = distill.server:dev_server' ] } -) \ No newline at end of file +)