Repository: incubator-senssoft-distill Updated Branches: refs/heads/master 17e89fe99 -> c76439939
Fixed src and target id generation bug and folded in path length calculation Project: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/commit/43a9f20d Tree: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/tree/43a9f20d Diff: http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/diff/43a9f20d Branch: refs/heads/master Commit: 43a9f20d689be841beb6ab84b3592c5da0f4b6c2 Parents: c82fe23 Author: msbeard <msbe...@apache.org> Authored: Tue Aug 29 17:05:17 2017 -0400 Committer: msbeard <msbe...@apache.org> Committed: Tue Aug 29 17:05:17 2017 -0400 ---------------------------------------------------------------------- distill/algorithms/graphs/graph.py | 286 ++++++++++++++++++++++---------- 1 file changed, 202 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-senssoft-distill/blob/43a9f20d/distill/algorithms/graphs/graph.py ---------------------------------------------------------------------- diff --git a/distill/algorithms/graphs/graph.py b/distill/algorithms/graphs/graph.py index e5b34bd..73f1d34 100644 --- a/distill/algorithms/graphs/graph.py +++ b/distill/algorithms/graphs/graph.py @@ -31,7 +31,6 @@ class GraphAnalytics (object): def generate_graph(app, app_type='logs', log_type='raw', - # version='0.2.0', target_events=[], time_range=['now-1h', 'now'], size=20): @@ -69,13 +68,9 @@ class GraphAnalytics (object): { "term": { "logType": log_type - } + }, + }, - # { - # "term": { - # "useraleVersion": version - # } - # } ] # Sort By Time @@ -110,7 +105,7 @@ class GraphAnalytics (object): agg_query['sessions'] = session_query - # Generating all top targets + # Generating all top targets and breakdowns by type, including path_length target_query = { "terms": { "field": "target.keyword", @@ -124,6 +119,19 @@ class GraphAnalytics (object): "min_doc_count": 1, "size": size } + }, + "top_target": { + "top_hits": { + "script_fields": { + "path_length": { + "script": { + "lang": "painless", + "inline": "doc['path.keyword'].length;" + } + } + }, + "size": 1 + } } } } @@ -135,12 +143,24 @@ class GraphAnalytics (object): "sort": sort_query, "query": { "bool": { + # "must": must_match, "should": should_query, "filter": filter_query, "must_not": must_not_query, } }, - "aggs": agg_query + "_source": { + "includes": ['*'], + }, + "script_fields": { + "path_length": { + "script": { + "lang": "painless", + "inline": "doc['path.keyword'].length;" + } + } + }, + "aggregations": agg_query } # Process Aggregate Results @@ -148,8 +168,8 @@ class GraphAnalytics (object): # Only want to look at aggregations sessions = response['aggregations']['sessions']['buckets'] - allSessions = { x['key']: [] for x in sessions } - intervalSessions = { x['key']: [] for x in sessions } + # allSessions = { x['key']: [] for x in sessions } + # intervalSessions = { x['key']: [] for x in sessions } # Deal with bar chart allTargets = response['aggregations']['targets']['buckets'] @@ -161,28 +181,34 @@ class GraphAnalytics (object): doc_type=app_type, preserve_order=True) + allSessions = dict() # Store all hits in the user's bucket. for elem in iter: data = elem['_source'] + data['pathLength'] = elem['fields']['path_length'][0] if 'sessionID' in data: sessionID = data['sessionID'] if sessionID in allSessions: allSessions[sessionID].append(data) + else: + allSessions[sessionID] = [data] - # Remove all duplicates (and only leave behind interval) - # More than likely will need to create new list + # This fixed sequence/interval logging that what was produced in + # UserALE.js v 0.2.0 + # Possible to remove self-loops here as well (html->html->html->window) := (html->window) + intervalSessions = dict() for sessionID in allSessions: data = allSessions[sessionID] - newData = [] intervalLog = [] - for curr, next in zip(data, data[1:]): + pairs = zip(data, data[1:]) + + for curr, next in pairs: target1 = curr['target'] event1 = curr['type'] target2 = next['target'] event2 = next['type'] - - if target1 != target2: + if target1 != target2: # ignore self-loops targetChange = int(True) eventChange = int(False) if event1 != event2: @@ -193,38 +219,38 @@ class GraphAnalytics (object): curr['targetChange'] = targetChange curr['typeChange'] = eventChange curr['intervalCount'] = len(intervalLog) # some number maybe 0 - # if len(intervalLog) >= 2: - # # Calculate duration - # curr['duration'] = intervalLog[-1:]['clientTime'] - \ - # intervalLog[0]['clientTime'] - # else: - # curr['duration'] = 0 + if len(intervalLog) >= 2: + # Calculate duration + curr['duration'] = intervalLog[-1:]['clientTime'] - \ + intervalLog[0]['clientTime'] + else: + curr['duration'] = 0 newData.append(curr) intervalLog = [] - else: - # They are the same - targetChange = int(False) - eventChange = int(False) - if event1 != event2: - eventChange = int(True) - # starting over - curr['targetChange'] = targetChange - curr['typeChange'] = eventChange - curr['intervalCount'] = len(intervalLog) - # if len(intervalLog) >= 2: - # # Calculate duration - # curr['duration'] = intervalLog[-1:]['clientTime'] - \ - # intervalLog[0]['clientTime'] - # else: - # curr['duration'] = 0 - newData.append(curr) - intervalLog = [] - else: - # increase counter - intervalLog.append(curr) + # else: + # # They are the same + # targetChange = int(False) + # eventChange = int(False) + # if event1 != event2: + # eventChange = int(True) + # # starting over + # curr['targetChange'] = targetChange + # curr['typeChange'] = eventChange + # curr['intervalCount'] = len(intervalLog) + # # if len(intervalLog) >= 2: + # # # Calculate duration + # # curr['duration'] = intervalLog[-1:]['clientTime'] - \ + # # intervalLog[0]['clientTime'] + # # else: + # # curr['duration'] = 0 + # newData.append(curr) + # intervalLog = [] + # else: + # # increase counter + # intervalLog.append(curr) intervalSessions[sessionID] = newData - + # return intervalSessions newSessions = [] # Generate all edges tied to a user @@ -240,70 +266,152 @@ class GraphAnalytics (object): # Align the sequences alignment = itertools.izip_longest(*newSessions) + src_ids = {} + target_ids = {} - for step in alignment: - # step through every users sequence + for i, step in enumerate(alignment): + # print(i) c = collections.Counter() visitedLinks = [] + # visitedLinksUnique = set([]) nodenames = set([]) - # Process all the edges - for edge in step: + for edge in step: # for a single step look at all links if edge: node1 = edge[0] node2 = edge[1] - + session = node1['sessionID'] nodename1 = node1['target'] nodename2 = node2['target'] - # Add src and targetids - nodenames.add(nodename1) - nodenames.add(nodename2) - - # Generate sequence ID seqID = '%s->%s' % (nodename1, nodename2) + print(seqID) - # @todo Ensure src and target are not the same (self-loop) - if nodename1 != nodename2: + if nodename1 != nodename2: #double check again for self-loops + print(node1) link = { 'sequenceID': seqID, 'sourceName': nodename1, 'targetName': nodename2, 'type': node1['type'], - # 'duration': node1['duration'], + 'duration': node1['duration'], 'pathLength': len(node1['path']), 'targetChange': node1['targetChange'], 'typeChange': node1['typeChange'] } visitedLinks.append(link) - # How many users visited a sequence at this step + # Done with visits in a step. Now calculate counts counts = collections.Counter(k['sequenceID'] for k in visitedLinks if k.get('sequenceID')) # print(counts) - # Append into growing node_list - map(lambda x: node_list.append(x), nodenames) - - map(lambda x: node_map.append({ "name": x, - "id": len(node_list) - 1 - node_list[::-1].index(x)}), nodenames) - - for v in visitedLinks: - # Pass through and update count, also generate src and target id - v['value'] = counts[v['sequenceID']] - # Last occurence is the src and target id - v['source'] = len(node_list) -1 - node_list[::-1].index(v['sourceName']) - v['target'] = len(node_list) -1 - node_list[::-1].index(v['targetName']) - links.append(v) + visitedLinksUnique = { v['sequenceID']:v for v in visitedLinks}.values() + # print(visitedLinksUnique) + + # Visit unique links and generate src/targetid + if len(node_map) == 0: + for link in visitedLinksUnique: + # Add all sources + if link['sourceName'] not in src_ids: + node_map.append({"name": link['sourceName']}) + src_ids[link['sourceName']] = len(node_map)-1 + + # Add all targets + if link['targetName'] not in target_ids: + node_map.append({"name": link['targetName']}) + target_ids[link['targetName']] = len(node_map)-1 + + else: + src_ids = target_ids # sources were previous targets + target_ids = {} + for link in visitedLinksUnique: + # Add all sources + # if link['sourceName'] not in src_ids.values(): + # node_map.append(link['sourceName']) + # src_ids[len(node_map)-1] = link['sourceName'] + + # Add all targets + if link['targetName'] not in target_ids: + node_map.append({"name": link['targetName']}) + target_ids[link['targetName']] = len(node_map)-1 + + for link in visitedLinksUnique: + # Perform lookup for ids + # Perform lookup for counts + link['source'] = src_ids[link['sourceName']] + link['target'] = target_ids[link['targetName']] + link['value'] = counts[link['sequenceID']] + + links.append(link) + + # for step in alignment: + # # step through every users sequence + # c = collections.Counter() + # visitedLinks = [] + # nodenames = set([]) + # + # # Process all the edges + # for edge in step: + # if edge: + # node1 = edge[0] + # node2 = edge[1] + # + # nodename1 = node1['target'] + # nodename2 = node2['target'] + # + # # Add src and targetids + # nodenames.add(nodename1) + # nodenames.add(nodename2) + # + # # Generate sequence ID + # seqID = '%s->%s' % (nodename1, nodename2) + # + # # @todo Ensure src and target are not the same (self-loop) + # if nodename1 != nodename2: + # link = { + # 'sequenceID': seqID, + # 'sourceName': nodename1, + # 'targetName': nodename2, + # 'type': node1['type'], + # # 'duration': node1['duration'], + # 'pathLength': len(node1['path']), + # 'targetChange': node1['targetChange'], + # 'typeChange': node1['typeChange'] + # } + # visitedLinks.append(link) + # + # # How many users visited a sequence at this step + # counts = collections.Counter(k['sequenceID'] for k in visitedLinks if k.get('sequenceID')) + # # print(counts) + # # Append into growing node_list + # map(lambda x: node_list.append(x), nodenames) + # + # # map(lambda x: node_map.append({ "name": x} + # # "id": len(node_list) - 1 - node_list[::-1].index(x)}), nodenames) + # + # map(lambda x: node_map.append({ "name": x}), nodenames) + # # "id": len(node_list) - 1 - node_list[::-1].index(x)}), nodenames) + # for v in visitedLinks: + # # Pass through and update count, also generate src and target id + # v['value'] = counts[v['sequenceID']] + # # Last occurence is the src and target id + # v['source'] = len(node_list) -1 - node_list[::-1].index(v['sourceName']) + # v['target'] = len(node_list) -1 - node_list[::-1].index(v['targetName']) + # links.append(v) # Save everything - res = dict() - res['bargraph'] = generate_bargraph(allTargets) - res['sankey'] = { - 'links': links, - 'nodes': node_map - } + res = dict() + res['histogram'] = generate_bargraph(allTargets) + # res['sankey'] = { + # # 'sessions': sessions, + # 'links': links, + # 'nodes': node_map + # } + + res['nodes'] = node_map + res['links'] = links with open('sankey.json', 'w') as outfile: - json.dump(res, outfile, sort_keys=False) + json.dump(res, outfile, sort_keys=False, indent=4) # with open('data.txt', 'w') as outfile: # json.dump(intervalSessions, outfile, indent=4, sort_keys=False) @@ -388,12 +496,22 @@ def generate_bargraph(data, filename='bargraph.json'): results = [] for target in data: target_name = target['key'] + types = [] + counts = [] type_bucket = target['events']['buckets'] for t in type_bucket: - event = t['key'] - event_count = t['doc_count'] - res = {"target": target_name, "count": event_count, "type": event} - results.append(res) + types.append(t['key']) + counts.append(t['doc_count']) + + top_bucket = target['top_target']['hits']['hits'][0] + path_length = top_bucket['fields']['path_length'][0] + res = { + "target": target_name, + "counts": counts, + "types": types, + "pathLength": path_length + } + results.append(res) return results @@ -401,4 +519,4 @@ def generate_bargraph(data, filename='bargraph.json'): # json.dump(results, outfile, indent=4, sort_keys=False) def generate_sankey(data, filename='sankey.json'): - pass \ No newline at end of file + pass